Hi, I'm developing a destination connector in Pyth...
# help-connector-development
r
Hi, I'm developing a destination connector in Python. And I found my connector works wired with a CDC source (I've tested postgres and mysql). It seems airbyte failed to remember WAL position no matter how many syncs have been performed successfully:
Copy code
2023-06-09 03:10:46 [44msource[0m > INFO i.d.c.p.PostgresStreamingChangeEventSource(searchWalPosition):348 WAL resume position 'null' discovered
As I continue debugging, I found something suspicious in log, it seems the state message indicating the framework to remember WAL position is not well processed:
Copy code
2023-06-09 02:40:26 [32mINFO[m i.a.w.g.ReplicationWorkerHelper(processMessageFromDestination):97 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@5cccb520[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@520b40c9[type=GLOBAL,stream=<null>,global=<null>,data={"cdc":true,"cdc_state":{"state":{"[\"robert\",{\"server\":\"robert\"}]":"{\"transaction_id\":null,\"lsn\":29930608,\"txId\":748,\"ts_usec\":1686278411722406}"}},"streams":[{"stream_name":"person","stream_namespace":"public","cursor_field":[]}]},additionalProperties={global_={shared_state={state={["robert",{"server":"robert"}]={"transaction_id":null,"lsn":29930608,"txId":748,"ts_usec":1686278411722406}}}, stream_states=[{stream_descriptor={name=person, namespace=public}, stream_state={stream_name=person, stream_namespace=public, cursor_field=[]}}]}}],trace=<null>,control=<null>,additionalProperties={}]
Copy code
2023-06-09 02:40:26 [33mWARN[m i.a.w.i.b.DefaultSyncStatsTracker(updateDestinationStateStats):195 - The message tracker encountered an issue that prevents committed record counts from being reliably computed. This only impacts metadata and does not indicate a problem with actual sync data.
io.airbyte.workers.internal.book_keeping.StateDeltaTracker$StateDeltaTrackerException: Delta was not stored for state hash 1727035794
Copy code
2023-06-09 02:40:26 [33mWARN[m i.a.w.i.b.DefaultSyncStatsTracker(updateDestinationStateStats):205 - The state message tracker was unable to match the destination state message to a corresponding source state message.This only impacts metrics and does not indicate a problem with actual sync data.
io.airbyte.workers.internal.book_keeping.StateMetricsTracker$StateMetricsTrackerNoStateMatchException: Destination state message cannot be matched to corresponding Source state message.
The code that processing state message in my connector is quote simple:
Copy code
def write(
        self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
    ) -> Iterable[AirbyteMessage]:
        ...
        for message in input_messages:
            if message.type == Type.STATE:
                <http://logger.info|logger.info>(message)
                yield message
            elif message.type == Type.RECORD:
                data = message.record.data
                stream = message.record.stream
                ...
I built a postgres to postgres cdc + dedup connection and found that the state message was different from what my connector processed:
Copy code
# pg dest connector's state message
io.airbyte.protocol.models.AirbyteMessage@523f0336[
  type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,
  state=io.airbyte.protocol.models.AirbyteStateMessage@5cf8f701[
    type=GLOBAL,stream=<null>,global=io.airbyte.protocol.models.AirbyteGlobalState@58c76846[sharedState={"state":{"[\"robert\",{\"server\":\"robert\"}]":"{\"transaction_id\":null,\"lsn\":30093368,\"txId\":752,\"ts_usec\":1686279863738929}"}},streamStates=[io.airbyte.protocol.models.AirbyteStreamState@1dc5fd3a[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@37497b07[name=person,namespace=public,additionalProperties={}],streamState={"stream_name":"person","stream_namespace":"public","cursor_field":[]},additionalProperties={}]],additionalProperties={}],
    data={"cdc":true,"cdc_state":{"state":{"[\"robert\",{\"server\":\"robert\"}]":"{\"transaction_id\":null,\"lsn\":30093368,\"txId\":752,\"ts_usec\":1686279863738929}"}},"streams":[{"stream_name":"person","stream_namespace":"public","cursor_field":[]}]},
    additionalProperties={}],trace=<null>,control=<null>,additionalProperties={}]

# my connector's state message
io.airbyte.protocol.models.AirbyteMessage@5cccb520[
  type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,
  state=io.airbyte.protocol.models.AirbyteStateMessage@520b40c9[
    type=GLOBAL,stream=<null>,global=<null>,
    data={"cdc":true,"cdc_state":{"state":{"[\"robert\",{\"server\":\"robert\"}]":"{\"transaction_id\":null,\"lsn\":29930608,\"txId\":748,\"ts_usec\":1686278411722406}"}},"streams":[{"stream_name":"person","stream_namespace":"public","cursor_field":[]}]},
    additionalProperties={global_={shared_state={state={["robert",{"server":"robert"}]={"transaction_id":null,"lsn":29930608,"txId":748,"ts_usec":1686278411722406}}}, stream_states=[{stream_descriptor={name=person, namespace=public}, stream_state={stream_name=person, stream_namespace=public, cursor_field=[]}}]}}],trace=<null>,control=<null>,additionalProperties={}]
However I've no clue to how to go further. Any ideas?
k
A message from kapa.ai
@Robert That's a great question, but unfortunately it is too long. Can you ask me again but make it shorter? I'm here to help!
r
I took a look into worker's log, and I'm almost sure that the root cause is a NPE when parsing the state message.
Copy code
2023-06-09 14:03:18 2023-06-09 06:03:18 WARN i.a.w.i.s.SyncPersistenceImpl(flush):311 - Failed to persist state for connectionId 446ba8ee-412b-4e09-b126-a3ee39e178a3, it will be retried as part of the next flush
2023-06-09 14:03:18 io.airbyte.api.client.invoker.generated.ApiException: createOrUpdateState call failed with: 500 - {"message":"Internal Server Error: Cannot invoke \"io.airbyte.protocol.models.AirbyteStateMessage.getGlobal()\" because the return value of \"io.airbyte.config.StateWrapper.getGlobal()\" is null","exceptionClassName":"java.lang.NullPointerException","exceptionStack":["java.lang.NullPointerException: Cannot invoke \"io.airbyte.protocol.models.AirbyteStateMessage.getGlobal()\" because the return value of \"io.airbyte.config.StateWrapper.getGlobal()\" is null","\tat io.airbyte.config.persistence.StatePersistence.lambda$updateOrCreateState$1(StatePersistence.java:103)","\tat io.airbyte.db.Database.lambda$transaction$0(Database.java:27)","\tat org.jooq.impl.DefaultDSLContext.lambda$transactionResult0$3(DefaultDSLContext.java:552)","\tat org.jooq.impl.Tools$3$1.block(Tools.java:5854)","\tat java.base/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3745)","\tat java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3690)","\tat org.jooq.impl.Tools$3.get(Tools.java:5851)","\tat org.jooq.impl.DefaultDSLContext.transactionResult0(DefaultDSLContext.java:600)","\tat org.jooq.impl.DefaultDSLContext.transactionResult(DefaultDSLContext.java:524)","\tat io.airbyte.db.Database.transaction(Database.java:27)","\tat io.airbyte.db.ExceptionWrappingDatabase.transaction(ExceptionWrappingDatabase.java:47)","\tat io.airbyte.config.persistence.StatePersistence.updateOrCreateState(StatePersistence.java:98)","\tat io.airbyte.commons.server.handlers.StateHandler.createOrUpdateState(StateHandler.java:41)","\tat io.airbyte.server.apis.StateApiController.lambda$createOrUpdateState$0(StateApiController.java:39)","\tat io.airbyte.server.apis.ApiHelper.execute(ApiHelper.java:26)","\tat io.airbyte.server.apis.StateApiController.createOrUpdateState(StateApiController.java:39)","\tat io.airbyte.server.apis.$StateApiController$Definition$Exec.dispatch(Unknown Source)","\tat io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:371)","\tat io.micronaut.context.DefaultBeanContext$4.invoke(DefaultBeanContext.java:594)","\tat io.micronaut.web.router.AbstractRouteMatch.execute(AbstractRouteMatch.java:303)","\tat io.micronaut.web.router.RouteMatch.execute(RouteMatch.java:111)","\tat io.micronaut.http.context.ServerRequestContext.with(ServerRequestContext.java:103)","\tat io.micronaut.http.server.RouteExecutor.lambda$executeRoute$14(RouteExecutor.java:659)","\tat reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:49)","\tat reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)","\tat reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)","\tat io.micronaut.reactive.reactor.instrument.ReactorInstrumentation.lambda$init$0(ReactorInstrumentation.java:62)","\tat reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)","\tat reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)","\tat io.micronaut.scheduling.instrument.InvocationInstrumenterWrappedCallable.call(InvocationInstrumenterWrappedCallable.java:53)","\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)","\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)","\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)","\tat java.base/java.lang.Thread.run(Thread.java:1589)"]}