<@U04SE163WC8> I am using postgres source but when...
# ask-ai
a
@kapa.ai I am using postgres source but when i sync the data wal logs suddenly increase and the postgres connector starts full refresh sync where the connected source is cdc any reason for it ?
k
When using the Postgres source with CDC (Change Data Capture) mode, the connector is designed to perform an initial full refresh sync to capture a snapshot of all existing data. Subsequent syncs should be incremental, reading from the write-ahead logs (WAL). However, there are scenarios where you might observe full refresh syncs even after the initial run. This can happen due to the following reasons: 1. WAL Files Removal: - Scenario: The WAL files needed for the incremental sync have been removed by Postgres. - Causes: - High Volume of Database Updates: If there are lots of database updates, it can result in more WAL files than allowed in the
pg_wal
directory. Postgres will then purge or archive these WAL files. - Solution: - Sync the data source more frequently to avoid WAL file accumulation. - Increase the
wal_keep_size
parameter to retain more WAL files. Note that this will require more disk space. You can find more details on configuring
wal_keep_size
[here](https://www.postgresql.org/docs/current/runtime-config-replication.html#GUC-WAL-KEEP-SIZE). 2. Destination Connector Failure: - Scenario: The Postgres connector successfully reads the WAL and acknowledges it to Postgres, but the destination connector fails to consume the data. - Causes: - When the Postgres connector tries to read the same WAL again, it may have been removed by Postgres since the WAL record is already acknowledged. - Solution: - This scenario is rare and currently, there is no way to prevent it. The correct behavior in this case is to perform a full refresh. For more detailed troubleshooting steps, you can refer to the [Postgres Troubleshooting documentation](https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs). If you continue to face issues, you might also want to check if the initial CDC waiting time is sufficient. The default initial wait time is 5 minutes (300 seconds), but it can be adjusted based on your needs. More information on setting up the initial CDC waiting time can be found [here](https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#advanced-setting-up-initial-cdc-waiting-time). By addressing these potential issues, you should be able to mitigate the occurrence of unexpected full refresh syncs in your Postgres source with CDC mode.
a
@kapa.ai i am receiving logs like this but still wal logs not decreased
Copy code
2024-06-03 13:17:36 [INFO ] - INFO i.d.c.c.BaseSourceTask(logStatistics):195 1 records sent during previous 00:02:26.368, last recorded offset of {server=airbyte_db} partition is {transaction_id=null, lsn_proc=8846192079064, messageType=INSERT, lsn_commit=88461920792480, lsn=88461920792480, txId=2869469553, ts_usec=1717420196436993}
2024-06-03 13:17:36 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:17:46 [INFO ] - INFO i.d.c.c.BaseSourceTask(logStatistics):195 1 records sent during previous 00:00:10.025, last recorded offset of {server=airbyte_db} partition is {transaction_id=null, lsn_proc=88461920779064, messageType=INSERT, lsn_commit=88461920792480, lsn=88461920792480, txId=2869469553, ts_usec=1717420196436993}
2024-06-03 13:17:46 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:17:56 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:18:06 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:18:16 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:18:26 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:18:37 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:18:47 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:18:57 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:19:07 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:19:17 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:19:27 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:19:37 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:19:47 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:19:57 [INFO ] - INFO i.d.c.p.PostgresStreamingChangeEventSource(commitOffset):412 Received offset commit request on 'LSN{5074/A55D6FA0}', but ignoring it. LSN flushing is not allowed yet
2024-06-03 13:20:04 [INFO ] - INFO i.d.c.p.c.AbstractMessageDecoder(shouldMessageBeSkipped):54 Streaming requested from LSN LSN{5074/A55D3B38}, received LSN LSN{5074/A55D3B38} identified as already processed
2024-06-03 13:20:04 [INFO ] - INFO i.d.c.p.c.WalPositionLocator(skipMessage):152 Message with LSN 'LSN{5074/A55D7DD0}' arrived, switching off the filtering
2024-06-03 13:20:25 [INFO ] - INFO i.d.c.c.BaseSourceTask(logStatistics):195 1047 records sent during previous 00:02:39.042, last recorded offset of {server=airbyte_db} partition is {transaction_id=null, lsn_proc=88463400925432, messageType=INSERT, lsn_commit=88463400490112, lsn=88463400925432, txId=2869675398, ts_usec=1717420433125971}
2024-06-03 13:20:37 [INFO ] - INFO i.a.i.d.i.p.PostgresCdcTargetPosition(reachedTargetPosition):67 Signalling close because record's LSN : PgLsn{lsn=88463821796832} is after target LSN : PgLsn{lsn=88463821569488}
2024-06-03 13:20:37 [INFO ] - INFO i.a.i.d.i.DebeziumRecordIterator(requestClose):208 Closing: Change event reached target position
2024-06-03 13:20:37 [INFO ] - INFO i.d.e.EmbeddedEngine(stop):1146 Stopping the embedded engine
2024-06-03 13:20:37 [INFO ] - INFO i.d.e.EmbeddedEngine(stop):1153 Waiting for PT5M for connector to stop
2024-06-03 13:20:37 [INFO ] - INFO i.d.e.EmbeddedEngine(run):944 Stopping the task and engine
2024-06-03 13:20:37 [INFO ] - INFO i.d.c.c.BaseSourceTask(stop):278 Stopping down connector
2024-06-03 13:20:37 [INFO ] - INFO i.d.j.JdbcConnection(lambda$doClose$4):947 Connection gracefully closed
2024-06-03 13:20:37 [INFO ] - INFO i.d.j.JdbcConnection(lambda$doClose$4):947 Connection gracefully closed
2024-06-03 13:20:37 [INFO ] - INFO i.d.p.ChangeEventSourceCoordinator(streamEvents):175 Finished streaming
2024-06-03 13:20:37 [INFO ] - INFO i.d.p.ChangeEventSourceCoordinator(streamingConnected):240 Connected metrics set to 'false'
2024-06-03 13:20:37 [INFO ] - INFO o.a.k.c.s.FileOffsetBackingStore(stop):66 Stopped FileOffsetBackingStore
2024-06-03 13:20:37 [INFO ] - INFO i.a.i.d.i.DebeziumRecordPublisher(lambda$start$1):73 Debezium engine shutdown.
2024-06-03 13:20:37 [INFO ] - INFO i.a.i.d.i.DebeziumRecordPublisher(lambda$start$1):74 Connector 'io.debezium.connector.postgresql.PostgresConnector' completed normally.
2024-06-03 13:20:47 [INFO ] - INFO i.a.i.s.p.PostgresCdcStateHandler(saveState):37 debezium state: {"[\"airbyte_db\",{\"server\":\"airbyte_db\"}]":"{\"transaction_id\":null,\"lsn_proc\":88463848996328,\"messageType\":\"INSERT\",\"lsn_commit\":88463848986200,\"lsn\":88463848996328,\"txId\":2869744801,\"ts_usec\":1717420516181808}"}
2024-06-03 13:20:47 [INFO ] - INFO i.a.i.s.r.AbstractDbSource(lambda$read$1):173 Closing database connection pool.
2024-06-03 13:20:47 [INFO ] - INFO c.z.h.HikariDataSource(close):350 HikariPool-1 - Shutdown initiated...
2024-06-03 13:20:47 [INFO ] - INFO c.z.h.HikariDataSource(close):352 HikariPool-1 - Shutdown completed.
2024-06-03 13:20:47 [INFO ] - INFO i.a.i.s.r.AbstractDbSource(lambda$read$1):175 Closed database connection pool.
2024-06-03 13:20:47 [INFO ] - INFO i.a.i.b.IntegrationRunner(runInternal):195 Completed integration: io.airbyte.integrations.base.ssh.SshWrappedSource
2024-06-03 13:20:47 [INFO ] - INFO i.a.i.s.p.PostgresSource(main):539 completed source: class io.airbyte.integrations.source.postgres.PostgresSource
2024-06-03 13:20:47 [INFO ] - Stream [conversation_labels] Flushing batch [0] - adding 1279 objects to [pg_cdc_conversation_labels]. Key fields=[id] Storage=[k5sb7vdhus4nj6qsgo03o9.096623a0d401405e9c48da]
2024-06-03 13:20:47 [DEBUG] - Size of current batch is 957.69 KB
2024-06-03 13:20:47 [DEBUG] - Size of flattened batch is 953.87 KB
2024-06-03 13:20:47 [DEBUG] - Ingestion Memory Usage -> Alloc: 0.29 GB, TotalAlloc: 70563.97 GB, Sys: 2.24 GB, NumGC: 488919
2024-06-03 13:20:47 [INFO ] - Took 13ms for inserting into clickhouse table[pg_cdc_conversation_labels]; total objects [1279]
2024-06-03 13:20:48 [INFO ] - [k5sb7vdhus4nj6qsgo03o9.096623a0d401405e9c48da] Inserted [1279] rows in [0.03] seconds
2024-06-03 13:20:48 [INFO ] - Stream [conversation_labels] 1279 objects stored to [pg_cdc_conversation_labels]. Columns count: 10. Time: 117ms, Rows/sec: 10914.37. Storage=[k5sb7vdhus4nj6qsgo03o9.096623a0d401405e9c48da]
2024-06-03 13:20:48 [INFO ] - Saving state: {"cdc":false,"cdc_state":{"state":{"[\"airbyte_db\",{\"server\":\"airbyte_db\"}]":"{\"transaction_id\":null,\"lsn_proc\":88463848996328,\"messageType\":\"INSERT\",\"lsn_commit\":88463848986200,\"lsn\":88463848996328,\"txId\":2869744801,\"ts_usec\":1717420516181808}"}},"streams":[{"cursor_field":[],"stream_name":"conversation_labels","stream_namespace":"public"}]}
2024-06-03 13:20:48 [INFO ] - Sync finished in 5m43s (storage time: 0s), 1279 records processed, avg speed: 3.73 records per sec
2024-06-03 13:20:48 [INFO ] - Config saved.
2024-06-03 13:20:48 [INFO ] - FINISHED SUCCESSFULLY in [346.92] seconds (~ 5.78 minutes)
k
A message from kapa.ai
Sorry @Ankit Kumar, I seem to be having a bit of trouble. Don't worry though, I've pinged my support folks and they will try to fix me up! 🧑‍💻 Perhaps try again in 5mins?
a
@kapa.ai?
k
A message from kapa.ai
When using the Postgres source with CDC (Change Data Capture) mode, the connector is designed to perform an initial full refresh sync to capture a snapshot of all existing data. Subsequent syncs should be incremental, reading from the write-ahead logs (WAL). However, there are scenarios where you might observe full refresh syncs even after the initial run. This can happen due to the following reasons: 1. WAL Files Removal: - Scenario: The WAL files needed for the incremental sync have been removed by Postgres. - Causes: - High Volume of Database Updates: If there are lots of database updates, it can result in more WAL files than allowed in the
pg_wal
directory. Postgres will then purge or archive these WAL files. - Solution: - Sync the data source more frequently to avoid WAL file accumulation. - Increase the
wal_keep_size
parameter to retain more WAL files. Note that this will require more disk space. You can find more details on configuring
wal_keep_size
[here](https://www.postgresql.org/docs/current/runtime-config-replication.html#GUC-WAL-KEEP-SIZE). 2. Destination Connector Failure: - Scenario: The Postgres connector successfully reads the WAL and acknowledges it to Postgres, but the destination connector fails to consume the data. - Causes: - When the Postgres connector tries to read the same WAL again, it may have been removed by Postgres since the WAL record is already acknowledged. - Solution: - This scenario is rare and currently, there is no way to prevent it. The correct behavior in this case is to perform a full refresh. For more detailed troubleshooting steps, you can refer to the [Postgres Troubleshooting documentation](https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs). If you continue to face issues, you might also want to check if the initial CDC waiting time is sufficient. The default initial wait time is 5 minutes (300 seconds), but it can be adjusted based on your needs. More information on setting up the initial CDC waiting time can be found [here](https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#advanced-setting-up-initial-cdc-waiting-time). By addressing these potential issues, you should be able to mitigate the occurrence of unexpected full refresh syncs in your Postgres source with CDC mode.
a
@[DEPRECATED] Marcos Marx can you help me here