Octavia Squidington III
06/05/2023, 7:45 PMNohelia Merino
06/05/2023, 10:57 PMYauheni Anikhouski
06/06/2023, 5:06 AMGet Spec job failed
, but the previous build worked correctly. Difference between images only in libs
error build
Successfully installed Deprecated-1.2.14 Jinja2-3.1.2 MarkupSafe-2.1.3 PyYAML-5.4.1 airbyte-cdk-0.39.2 airbyte-protocol-models-0.3.6 attrs-23.1.0 backoff-2.2.1 cachetools-5.3.1 cattrs-23.1.2 certifi-2023.5.7 charset-normalizer-3.1.0 dpath-2.0.8 exceptiongroup-1.1.1 genson-1.2.2 idna-3.4 isodate-0.6.1 jsonref-0.3.0 jsonschema-3.2.0 pendulum-2.1.2 platformdirs-3.5.1 pydantic-1.9.2 pyrsistent-0.19.3 python-dateutil-2.8.2 pytzdata-2020.1 requests-2.31.0 requests-cache-1.0.1 six-1.16.0 typing-extensions-4.6.3 url-normalize-1.4.3 urllib3-2.0.2 wrapt-1.15.0
working build
Successfully installed Deprecated-1.2.13 Jinja2-3.1.2 MarkupSafe-2.1.2 PyYAML-5.4.1 airbyte-cdk-0.37.0 airbyte-protocol-models-0.3.6 attrs-23.1.0 backoff-2.2.1 cachetools-5.3.0 cattrs-22.2.0 certifi-2023.5.7 charset-normalizer-3.1.0 dpath-2.0.8 exceptiongroup-1.1.1 genson-1.2.2 idna-3.4 isodate-0.6.1 jsonref-0.3.0 jsonschema-3.2.0 pendulum-2.1.2 platformdirs-3.5.1 pydantic-1.9.2 pyrsistent-0.19.3 python-dateutil-2.8.2 pytzdata-2020.1 requests-2.30.0 requests-cache-1.0.1 six-1.16.0 typing-extensions-4.5.0 url-normalize-1.4.3 urllib3-2.0.2 wrapt-1.15.0
2023-06-05 15:37:44 ERROR i.a.s.a.ApiHelper(execute):37 - Unexpected Exception
java.lang.IllegalStateException: Get Spec job failed.
at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.1-jre.jar:?]
at io.airbyte.commons.server.converters.SpecFetcher.getSpecFromJob(SpecFetcher.java:14) ~[io.airbyte-airbyte-commons-server-0.41.0.jar:?]
at io.airbyte.commons.server.handlers.SourceDefinitionsHandler.getSpecForImage(SourceDefinitionsHandler.java:300) ~[io.airbyte-airbyte-commons-server-0.41.0.jar:?]
at io.airbyte.commons.server.handlers.SourceDefinitionsHandler.updateSourceDefinition(SourceDefinitionsHandler.java:245) ~[io.airbyte-airbyte-commons-server-0.41.0.jar:?]
at io.airbyte.server.apis.SourceDefinitionApiController.lambda$updateSourceDefinition$8(SourceDefinitionApiController.java:141) ~[io.airbyte-airbyte-server-0.41.0.jar:?]
at io.airbyte.server.apis.ApiHelper.execute(ApiHelper.java:23) ~[io.airbyte-airbyte-server-0.41.0.jar:?]
at io.airbyte.server.apis.SourceDefinitionApiController.updateSourceDefinition(SourceDefinitionApiController.java:141) ~[io.airbyte-airbyte-server-0.41.0.jar:?]
at io.airbyte.server.apis.$SourceDefinitionApiController$Definition$Exec.dispatch(Unknown Source) ~[io.airbyte-airbyte-server-0.41.0.jar:?]
at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:371) ~[micronaut-inject-3.8.5.jar:3.8.5]
at io.micronaut.context.DefaultBeanContext$4.invoke(DefaultBeanContext.java:594) ~[micronaut-inject-3.8.5.jar:3.8.5]
at io.micronaut.web.router.AbstractRouteMatch.execute(AbstractRouteMatch.java:303) ~[micronaut-router-3.8.5.jar:3.8.5]
at io.micronaut.web.router.RouteMatch.execute(RouteMatch.java:111) ~[micronaut-router-3.8.5.jar:3.8.5]
at io.micronaut.http.context.ServerRequestContext.with(ServerRequestContext.java:103) ~[micronaut-http-3.8.5.jar:3.8.5]
at io.micronaut.http.server.RouteExecutor.lambda$executeRoute$14(RouteExecutor.java:659) ~[micronaut-http-server-3.8.5.jar:3.8.5]
at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:49) ~[reactor-core-3.5.0.jar:3.5.0]
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.5.0.jar:3.5.0]
at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194) ~[reactor-core-3.5.0.jar:3.5.0]
at io.micronaut.reactive.reactor.instrument.ReactorInstrumentation.lambda$init$0(ReactorInstrumentation.java:62) ~[micronaut-runtime-3.8.5.jar:3.8.5]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) ~[reactor-core-3.5.0.jar:3.5.0]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ~[reactor-core-3.5.0.jar:3.5.0]
at io.micronaut.scheduling.instrument.InvocationInstrumenterWrappedCallable.call(InvocationInstrumenterWrappedCallable.java:53) ~[micronaut-context-3.8.5.jar:3.8.5]
at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.lang.Thread.run(Thread.java:1589) ~[?:?]
Jan Pavel
06/06/2023, 7:19 AMRishav Sinha
06/06/2023, 9:05 AMFelixJ
06/06/2023, 9:18 AMSoshi Nakachi仲地早司
06/06/2023, 10:10 AMAbdul Hameed
06/06/2023, 10:32 AMOctavia Squidington III
06/06/2023, 1:45 PMCesar Santos
06/06/2023, 2:50 PMJohn Olinger
06/06/2023, 7:11 PMPrateek Mukhedkar (Airbyte)
06/06/2023, 8:24 PMmaddu kiran
06/07/2023, 5:56 AMSteven Murphy
06/07/2023, 9:18 AMBriac Belin
06/07/2023, 12:46 PM<base_url>/conversations/<page_number>?<request_parameters>
I'm looking for a way to have an incremental paginator for the page_number which is not a request parameter but in the path, and which would increment itself as long as there are still results in the json response
Thank you !Lindsay S
06/07/2023, 6:55 PMJake Kagan
06/07/2023, 7:37 PMpip install
in a venv or do i need to add to setup.py, and how do i add to setup.py any dependenciesOctavia Squidington III
06/07/2023, 7:45 PMAazam Thakur
06/07/2023, 10:32 PM"merge_fields": {
"property1": null,
"property2": null
},
Victor Babichev
06/08/2023, 4:59 AMVincent Bessiere
06/08/2023, 9:14 AMOctavia Squidington III
06/08/2023, 1:45 PMDavid Anderson
06/08/2023, 3:07 PMJake Kagan
06/08/2023, 4:23 PMSlackbot
06/08/2023, 8:04 PMRobert
06/09/2023, 5:37 AM2023-06-09 03:10:46 [44msource[0m > INFO i.d.c.p.PostgresStreamingChangeEventSource(searchWalPosition):348 WAL resume position 'null' discovered
As I continue debugging, I found something suspicious in log, it seems the state message indicating the framework to remember WAL position is not well processed:
2023-06-09 02:40:26 [32mINFO[m i.a.w.g.ReplicationWorkerHelper(processMessageFromDestination):97 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@5cccb520[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@520b40c9[type=GLOBAL,stream=<null>,global=<null>,data={"cdc":true,"cdc_state":{"state":{"[\"robert\",{\"server\":\"robert\"}]":"{\"transaction_id\":null,\"lsn\":29930608,\"txId\":748,\"ts_usec\":1686278411722406}"}},"streams":[{"stream_name":"person","stream_namespace":"public","cursor_field":[]}]},additionalProperties={global_={shared_state={state={["robert",{"server":"robert"}]={"transaction_id":null,"lsn":29930608,"txId":748,"ts_usec":1686278411722406}}}, stream_states=[{stream_descriptor={name=person, namespace=public}, stream_state={stream_name=person, stream_namespace=public, cursor_field=[]}}]}}],trace=<null>,control=<null>,additionalProperties={}]
2023-06-09 02:40:26 [33mWARN[m i.a.w.i.b.DefaultSyncStatsTracker(updateDestinationStateStats):195 - The message tracker encountered an issue that prevents committed record counts from being reliably computed. This only impacts metadata and does not indicate a problem with actual sync data.
io.airbyte.workers.internal.book_keeping.StateDeltaTracker$StateDeltaTrackerException: Delta was not stored for state hash 1727035794
2023-06-09 02:40:26 [33mWARN[m i.a.w.i.b.DefaultSyncStatsTracker(updateDestinationStateStats):205 - The state message tracker was unable to match the destination state message to a corresponding source state message.This only impacts metrics and does not indicate a problem with actual sync data.
io.airbyte.workers.internal.book_keeping.StateMetricsTracker$StateMetricsTrackerNoStateMatchException: Destination state message cannot be matched to corresponding Source state message.
The code that processing state message in my connector is quote simple:
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
...
for message in input_messages:
if message.type == Type.STATE:
<http://logger.info|logger.info>(message)
yield message
elif message.type == Type.RECORD:
data = message.record.data
stream = message.record.stream
...
I built a postgres to postgres cdc + dedup connection and found that the state message was different from what my connector processed:
# pg dest connector's state message
io.airbyte.protocol.models.AirbyteMessage@523f0336[
type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,
state=io.airbyte.protocol.models.AirbyteStateMessage@5cf8f701[
type=GLOBAL,stream=<null>,global=io.airbyte.protocol.models.AirbyteGlobalState@58c76846[sharedState={"state":{"[\"robert\",{\"server\":\"robert\"}]":"{\"transaction_id\":null,\"lsn\":30093368,\"txId\":752,\"ts_usec\":1686279863738929}"}},streamStates=[io.airbyte.protocol.models.AirbyteStreamState@1dc5fd3a[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@37497b07[name=person,namespace=public,additionalProperties={}],streamState={"stream_name":"person","stream_namespace":"public","cursor_field":[]},additionalProperties={}]],additionalProperties={}],
data={"cdc":true,"cdc_state":{"state":{"[\"robert\",{\"server\":\"robert\"}]":"{\"transaction_id\":null,\"lsn\":30093368,\"txId\":752,\"ts_usec\":1686279863738929}"}},"streams":[{"stream_name":"person","stream_namespace":"public","cursor_field":[]}]},
additionalProperties={}],trace=<null>,control=<null>,additionalProperties={}]
# my connector's state message
io.airbyte.protocol.models.AirbyteMessage@5cccb520[
type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,
state=io.airbyte.protocol.models.AirbyteStateMessage@520b40c9[
type=GLOBAL,stream=<null>,global=<null>,
data={"cdc":true,"cdc_state":{"state":{"[\"robert\",{\"server\":\"robert\"}]":"{\"transaction_id\":null,\"lsn\":29930608,\"txId\":748,\"ts_usec\":1686278411722406}"}},"streams":[{"stream_name":"person","stream_namespace":"public","cursor_field":[]}]},
additionalProperties={global_={shared_state={state={["robert",{"server":"robert"}]={"transaction_id":null,"lsn":29930608,"txId":748,"ts_usec":1686278411722406}}}, stream_states=[{stream_descriptor={name=person, namespace=public}, stream_state={stream_name=person, stream_namespace=public, cursor_field=[]}}]}}],trace=<null>,control=<null>,additionalProperties={}]
However I've no clue to how to go further. Any ideas?Lee Danilek
06/09/2023, 4:43 PMstate
property, stream_state
argument, and next_page_token
? We only have one cursor, which I was updating in read_records
. But then check_availability
started calling read_records
and advancing the cursor but discarding results, which makes us skip records.
From the code it looks like checkpoint state is determined by the (deprecated) get_updated_state
after calling read_records
, which means read_records
must advance the cursor. But the availability check calls read_records
and discards the result, which means read_records
must not advance the cursor. This looks impossible to implement correctly.Octavia Squidington III
06/09/2023, 7:45 PMSlackbot
06/10/2023, 3:11 AMChính Bùi Quang
06/10/2023, 3:16 AM