https://linen.dev logo
s

Samkit Shah

02/17/2022, 5:34 AM
Hi, I'm wondering how often the state gets written to the
state
table in the
airbyte
database? My understanding was that this happens every
state_checkpoint_interval
, however, this does not appear to be the case. • I can see in the logs that the state gets set every `state_checkpoint_interval`; • If I check the state in the database after this log appears, no update is made; • Several
STATE
logs appear, yet the database is still not updated. It seems to only happen on job completion, or after some very large number of records have been processed. Can you please provide some further information on how this exactly works, and if this can be configured?
j

Jordan Velich

02/17/2022, 5:49 AM
Hey as far as my understanding state is basically emitted
{type: 'STATE'}
for every state_checkpoint_interval but the worker probably processes it after the source is completely done. @Sherif Nada correct me if I am wrong here?
State is saved only if the destination ACKs it by emitting that state message. It doesn't matter if the source outputs it if the destination didn't.
Thank you, what is the correct way to ACK the message with Python? Currently, we yield it.
It depends on the destination. If the destination outputs a STaTE message then it means it was successfully written and state is persisted
c

Christopher Wu

02/17/2022, 10:44 PM
Okay, can you be any clearer on "output" please -- does this mean to log the state, or return the state message? Yielding does not appear to do the trick. It is a custom Postgres destination.
does this link and this help clarify?
s

Samir Bhattarai

02/20/2022, 11:11 PM
Also checking whether the STATE gets written on job failure? We have observed some instances where the job fails with no indication in the logs why, and the STATE does not get written.
Does this log indicate that the state should have been written to the database? There is no entry in the
airbyte.state
table for this connection.
Copy code
2022-02-21 02:07:29 INFO i.a.w.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$6):312 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@7f1c0a4f[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@234f8b91[data={"order_aud_jpy":{"latest_timestamp":"2017-08-12T10:00:00"},"order_aud_usd":{"latest_timestamp":"2017-07-24T11:40:00"},"order_eur_aud":{"latest_timestamp":"2017-07-26T21:20:00"},"order_eur_chf":{"latest_timestamp":"2017-08-08T13:00:00"},"order_eur_gbp":{"latest_timestamp":"2017-07-24T05:40:00"},"order_eur_jpy":{"latest_timestamp":"2017-07-25T05:40:00"},"order_eur_usd":{"latest_timestamp":"2017-07-24T19:20:00"},"order_gbp_chf":{"latest_timestamp":"2017-07-26T02:00:00"},"order_gbp_jpy":{"latest_timestamp":"2017-07-24T07:00:00"},"order_gbp_usd":{"latest_timestamp":"2017-08-19T05:20:00"},"order_nzd_usd":{"latest_timestamp":"2017-08-07T17:20:00"},"order_usd_cad":{"latest_timestamp":"2017-07-24T12:00:00"},"order_usd_chf":{"latest_timestamp":"2017-07-25T17:00:00"},"order_usd_jpy":{"latest_timestamp":"2017-08-23T20:00:00"},"order_xag_usd":{"latest_timestamp":"2017-07-24T14:00:00"},"order_xau_usd":{"latest_timestamp":"2017-05-31T12:00:00"},"position_aud_jpy":{"latest_timestamp":"2017-07-26T05:00:00"},"position_aud_usd":{"latest_timestamp":"2017-07-25T02:20:00"},"position_eur_aud":{"latest_timestamp":"2017-08-08T00:20:00"},"position_eur_chf":{"latest_timestamp":"2017-08-09T00:20:00"},"position_eur_gbp":{"latest_timestamp":"2017-08-01T08:20:00"},"position_eur_jpy":{"latest_timestamp":"2017-07-24T14:20:00"},"position_eur_usd":{"latest_timestamp":"2017-08-09T11:00:00"},"position_gbp_chf":{"latest_timestamp":"2017-08-04T16:20:00"},"position_gbp_jpy":{"latest_timestamp":"2017-07-28T07:40:00"},"position_gbp_usd":{"latest_timestamp":"2017-07-25T07:00:00"},"position_nzd_usd":{"latest_timestamp":"2017-07-25T20:00:00"},"position_usd_cad":{"latest_timestamp":"2017-07-27T05:40:00"},"position_usd_chf":{"latest_timestamp":"2017-07-24T22:20:00"},"position_usd_jpy":{"latest_timestamp":"2017-09-27T10:40:00"},"position_xag_usd":{"latest_timestamp":"2017-08-03T01:00:00"},"position_xau_usd":{"latest_timestamp":"2017-07-27T17:40:00"}},additionalProperties={}],additionalProperties={}]
l

Luca Crema

02/22/2022, 10:09 PM
@Jordan Velich can you check for a message like this?
Copy code
2022-02-22 08:41:52 INFO i.a.w.DefaultReplicationWorker(run):253 - State capture: Updated state to: Optional[io.airbyte.config.State@6039a1b1[state={"Orders":{"LastUpdateDate":"2022-02-22T08:25:58Z"}}]]
I don't see anything like that unfortunately, I see these ones:
Copy code
2022-02-22 10:55:32 INFO i.a.w.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$6):312 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@6752afd2[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@28df99ac[data={"aud_cad_m1":{"latest_timestamp":"2022-02-22T10:54:00"},...
How can I get those ones to appear?
What Airbyte version are you using?
0.35.18-alpha
Are you able to give me any insight into this final log please?
Copy code
2022-02-23 00:08:43 destination > Processing 2 record messages...
2022-02-23 00:08:43 destination > Processing 4 record messages...
2022-02-23 00:08:51 destination > Processing 7 record messages...
2022-02-23 00:09:04 destination > Processing 7 record messages...
2022-02-23 00:09:10 destination > Processing 7 record messages...
2022-02-23 00:09:11 destination > Processing 7 record messages...
2022-02-23 00:09:16 destination > Processing 7 record messages...
2022-02-23 00:09:27 destination > Processing 1 record messages...
2022-02-23 00:09:27 destination > Processing 6 record messages...
2022-02-23 00:09:39 destination > Processing 7 record messages...
2022-02-23 00:09:49 destination > Processing 7 record messages...
2022-02-23 00:10:01 destination > Processing 7 record messages...
2022-02-23 00:10:18 destination > Processing 7 record messages...
2022-02-23 00:10:24 destination > Processing 2 record messages...
2022-02-23 00:10:24 destination > Processing 5 record messages...
2022-02-23 00:10:32 destination > Processing 7 record messages...
2022-02-23 00:10:38 destination > Processing 7 record messages...
2022-02-23 00:10:40 destination > Processing 7 record messages...
2022-02-23 00:10:49 destination > Processing 7 record messages...
2022-02-23 00:11:43 INFO i.a.a.LoggingTrackingClient(track):43 - track. version: 0.35.18-alpha, userId: f30e85f7-4056-4ec9-b536-6459e706edb4, action: Connector Jobs, metadata: {job_type=sync, config.source.auth_keys=set, namespace_definition=customformat, frequency=manual, connector_source_definition_id=32c9f112-c1a7-4903-932d-5e812b40a329, workspace_id=0cc0d9b0-efcd-4149-bd14-7f6b9bbc2289, attempt_stage=ENDED, attempt_id=0, connector_destination=Postgres Custom, catalog.sync_mode.incremental=set, connector_destination_docker_repository=xxx, table_prefix=false, workspace_name=0cc0d9b0-efcd-4149-bd14-7f6b9bbc2289, catalog.destination_sync_mode.append_dedup=set, connector_source=OANDA Books, config.destination.sync_raw=false, connector_source_docker_repository=xxx, config.destination.connection_uri=set, attempt_completion_status=FAILED, connection_id=398447b0-233a-4f95-8dde-472fcff87582, job_id=13083, connector_source_version=v1.0.3, config.destination.sync_structured=true, config.destination.bulk_size=set, connector_destination_version=v1.0.1, config.source.rate_limit=set, operation_count=0, config.source.workers=set, connector_destination_definition_id=39088b68-816b-46dd-af8d-ee1c0a33e372}
This coincided with the job failure, but no additional logs are present
The airbyte worker and scheduler pod also restarted at this time, however, there are no error logs in either
I only ever see this message when the job completes successfully:
Copy code
2022-02-22 08:41:52 INFO i.a.w.DefaultReplicationWorker(run):253 - State capture: Updated state to: Optional[io.airbyte.config.State@6039a1b1[state={"Orders":{"LastUpdateDate":"2022-02-22T08:25:58Z"}}]]