Christopher Greene
10/28/2024, 8:20 PMMounika Naga
10/28/2024, 8:46 PMFrancisco García
10/28/2024, 9:54 PMSlackbot
10/28/2024, 10:01 PMVasil Boshnakov
10/28/2024, 10:26 PMMarco Rodriguez
10/28/2024, 10:59 PMERROR i.a.c.ConnectorWatche'/;hgjkr(handleException):175 - Error performing operation: io.airbyte.workers.workload.exception.DocStoreAccessException
48
io.airbyte.workers.workload.exception.DocStoreAccessException: Unable to write output for decd338e-5647-4c0b-adf4-da0e75f5a750_4821_4_check
49
at io.airbyte.workers.workload.JobOutputDocStore.writeOutput(JobOutputDocStore.kt:72) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
50
at y6t543io.airbyte.workers.workload.JobOutputDocStore.write(JobOutputDocStore.kt:38) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
51
at io.airbyte.connectorSidecar.ConnectorWatcher.saveConnectorOutput(ConnectorWatcher.kt:163) [io.airbyte-airbyte-connector-sidecar-1.1.0.jar:?]
52
at io.airbyte.connectorSidecar.ConnectorWatcher.run(ConnectorWatcher.kt:72) [io.airbyte-airbyte-connector-sidecar-1.1.0.jar:?]
53
at io.airbyte.connectorSidecar.ApplicationKt.main(Application.kt:32) [io.airbyte-airbyte-connector-sidecar-1.1.0.jar:?]
54
at io.airbyte.connectorSidecar.ApplicationKt.main(Application.kt) [io.airbyte-airbyte-connector-sidecar-1.1.0.jar:?]
55
Caused by: software.amazon.awssdk.services.s3.model.S3Exception: The authorization header is malformed; the region 'us-east-1' is wrong; expecting 'eu-west-1' (Service: S3, Status Code: 400, Request ID: CES6NK545QS4ZJAR, Extended Request ID: 4cGYQArQH+hhUsH+pDYRPQzgDYuFZpOl+RmGy/fDqG95w2qYnI3PzY6UctmCiCPSuc72+E4poQE=)
56
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156) ~[aws-xml-protocol-2.27.8.jar:?]
57
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108) ~[aws-xml-protocol-2.27.8.jar:?]
58
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85) ~[aws-xml-protocol-2.27.8.jar:?]
59
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43) ~[aws-xml-protocol-2.27.8.jar:?]
60
at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:93) ~[aws-core-2.27.8.jar:?]
61
at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:279) ~[sdk-core-2.27.8.jar:?]
62
at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50) ~[sdk-core-2.27.8.jar:?]
Carlos Bernal Carvajal
10/29/2024, 12:01 AMSource process read attempt failed
.
Taking a look at the logs, it seems like the job is stuck on an infinite loop.
The following 3 messages are very frequent, until the job eventually fails:
2024-10-28 19:11:26 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=97, polls=0
2024-10-28 19:11:26 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.022736437S after its previous call which was also logged.
2024-10-28 19:11:26 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
I have refreshed the connection multiple times, but it always fails on the next sync.
Can someone help me figure out what the problem is?
Attached you’ll finde the attempt logs.
Airbyte version: v0.63.13
Source: Postgres - v3.6.22
Destination: Postrges - v2.4.0 (edited)Ben Rohald
10/29/2024, 12:05 AM"Limit": "{{ next_page_token['next_page_token'] }}"
into my request. Even when I have pagination enabled, the variable is empty when sending the requestPhạm Mạnh Hùng
10/29/2024, 1:39 AMAndrew Nada
10/29/2024, 3:23 AMQuang Nguyen
10/29/2024, 3:45 AMat io.airbyte.api.client.auth.AirbyteAuthHeaderInterceptor.intercept(AirbyteAuthHeaderInterceptor.kt:28) ~[io.airbyte.airbyte-api-commons-0.63.14.jar:?]
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) ~[okhttp-4.12.0.jar:?]
at io.airbyte.workload.api.client.auth.WorkloadApiAuthenticationInterceptor.intercept(WorkloadApiAuthenticationInterceptor.kt:36) ~[io.airbyte.airbyte-api-workload-api-0.63.14.jar:?]
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) ~[okhttp-4.12.0.jar:?]
at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201) ~[okhttp-4.12.0.jar:?]
at okhttp3.internal.connection.RealCall.execute(RealCall.kt:154) ~[okhttp-4.12.0.jar:?]
at io.airbyte.workload.api.client.generated.WorkloadApi.workloadHeartbeatWithHttpInfo(WorkloadApi.kt:2748) ~[io.airbyte.airbyte-api-workload-api-0.63.14.jar:?]
at io.airbyte.workload.api.client.generated.WorkloadApi.workloadHeartbeat(WorkloadApi.kt:429) ~[io.airbyte.airbyte-api-workload-api-0.63.14.jar:?]
at io.airbyte.workers.general.ReplicationWorkerHelper.getWorkloadStatusHeartbeat$lambda$1(ReplicationWorkerHelper.kt:153) ~[io.airbyte-airbyte-commons-worker-0.63.14.jar:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: java.io.EOFException: \n not found: limit=0 content=…
at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.kt:335) ~[okio-jvm-3.6.0.jar:?]
at okhttp3.internal.http1.HeadersReader.readLine(HeadersReader.kt:29) ~[okhttp-4.12.0.jar:?]
at okhttp3.internal.http1.Http1ExchangeCodec.readResponseHeaders(Http1ExchangeCodec.kt:180) ~[okhttp-4.12.0.jar:?]
... 26 more
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.pollConnect(Native Method) ~[?:?]
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:682) ~[?:?]
at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:549) ~[?:?]
at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:592) ~[?:?]
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327) ~[?:?]
at java.base/java.net.Socket.connect(Socket.java:751) ~[?:?]
at okhttp3.internal.platform.Platform.connectSocket(Platform.kt:128) ~[okhttp-4.12.0.jar:?]
at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.kt:295) ~[okhttp-4.12.0.jar:?]
... 28 more
james 98
10/29/2024, 8:01 AMLeo Salayog
10/29/2024, 8:07 AMClient error : 400 Bad Request {"exceptionStack":"Traceback (most recent call last):\n File \"/home/airbyte/.pyenv/versions/3.9.19/lib/python3.9/site-packages/airbyte_cdk/connector_builder/connector_builder_handler.py\", line 67, in read_stream\n stream_read = handler.get_message_groups(source, config, configured_catalog, state, limits.max_records)\n File \"/home/airbyte/.pyenv/versions/3.9.19/lib/python3.9/site-packages/airbyte_cdk/connector_builder/message_grouper.py\", line 125, in get_message_groups\n schema = schema_inferrer.get_stream_schema(configured_stream.stream.name)\n File \"/home/airbyte/.pyenv/versions/3.9.19/lib/python3.9/site-packages/airbyte_cdk/utils/schema_inferrer.py\", line 245, in get_stream_schema\n self._add_required_properties(self._clean(self.stream_to_builder[stream_name].to_schema()))\n File \"/home/airbyte/.pyenv/versions/3.9.19/lib/python3.9/site-packages/airbyte_cdk/utils/schema_inferrer.py\", line 144, in _clean\n
james 98
10/29/2024, 8:20 AMGerald
10/29/2024, 8:25 AMSlackbot
10/29/2024, 8:32 AMGeoffrey LOUASSE
10/29/2024, 8:33 AMQuentin CHURET
10/29/2024, 8:37 AMKaustav Ghosh
10/29/2024, 8:53 AM{
"id": "assignees::38b29dcc-e683-425d-84a5-db91a2de3999",
"type": "airbyte_record",
"stream": "assignees",
"data": {
"login": "teetangh",
"id": 44238657,
"node_id": "MDQ6VXNlcjQ0MjM4NjU3",
"avatar_url": "<https://avatars.githubusercontent.com/u/44238657?v=4>",
"gravatar_id": "",
"url": "<https://api.github.com/users/teetangh>",
"html_url": "<https://github.com/teetangh>",
"followers_url": "<https://api.github.com/users/teetangh/followers>",
"following_url": "<https://api.github.com/users/teetangh/following{/other_user}>",
"gists_url": "<https://api.github.com/users/teetangh/gists{/gist_id}>",
"starred_url": "<https://api.github.com/users/teetangh/starred{/owner}{/repo}>",
"subscriptions_url": "<https://api.github.com/users/teetangh/subscriptions>",
"organizations_url": "<https://api.github.com/users/teetangh/orgs>",
"repos_url": "<https://api.github.com/users/teetangh/repos>",
"events_url": "<https://api.github.com/users/teetangh/events{/privacy}>",
"received_events_url": "<https://api.github.com/users/teetangh/received_events>",
"type": "User",
"user_view_type": "public",
"site_admin": false,
"repository": "Kaggle-Workspace/Gradient-Descent-Algorithms"
}
}
is this the correct format the destination must receive the data.Tell me some improvementsKaustav Ghosh
10/29/2024, 8:57 AMclass DestinationCouchbase(Destination):
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
cluster = self._get_cluster(config)
bucket_name = config["bucket"]
scope_name = config.get("scope", "_default")
batch_size = config.get("batch_size", 1000)
# Log connection config structure
<http://logger.info|logger.info>(f"Connection Config Structure: {{'bucket': '{bucket_name}', 'scope': '{scope_name}'}}")
# Get stream configurations including sync modes and primary keys
stream_configs = {}
for stream in configured_catalog.streams:
stream_configs[stream.stream.name] = {
'collection_name': self._sanitize_collection_name(stream.stream.name),
'sync_mode': stream.destination_sync_mode,
'primary_key': stream.stream.source_defined_primary_key
}
<http://logger.info|logger.info>(f"Starting write to Couchbase with {len(stream_configs)} streams")
# Set up collections and clear data for overwrite mode
collections = {}
for stream_name, stream_config in stream_configs.items():
collection = self._setup_collection(cluster, bucket_name, scope_name, stream_config['collection_name'])
if stream_config['sync_mode'] == DestinationSyncMode.overwrite:
self._clear_collection(cluster, bucket_name, scope_name, stream_config['collection_name'])
collections[stream_name] = collection
buffer = {}
for message in input_messages:
if message.type == Type.STATE:
try:
<http://logger.info|logger.info>("Received state message")
self._flush_buffer(collections, buffer, stream_configs)
yield message
except Exception as e:
logger.error(f"Error processing state message: {str(e)}")
raise
elif message.type == Type.RECORD:
<http://logger.info|logger.info>(f"Processing record for stream: {message.record.stream}")
data = message.record.data
stream = message.record.stream
try:
if stream not in stream_configs:
logger.warning(f"Encountered new stream {stream}. Adding to streams and creating collection.")
collection_name = self._sanitize_collection_name(stream)
stream_configs[stream] = {
'collection_name': collection_name,
'sync_mode': DestinationSyncMode.append,
'primary_key': []
}
collections[stream] = self._setup_collection(cluster, bucket_name, scope_name, collection_name)
if stream not in buffer:
buffer[stream] = []
document = self._prepare_document(
stream,
data,
stream_configs[stream]['primary_key'],
stream_configs[stream]['sync_mode']
)
if len(buffer[stream]) == 0:
<http://logger.info|logger.info>(f"Document Structure for stream '{stream}': {document}")
buffer[stream].append(document)
if len(buffer[stream]) >= batch_size:
self._flush_buffer(collections, {stream: buffer[stream]}, stream_configs)
buffer[stream] = []
except Exception as e:
logger.error(f"Error processing record for stream {stream}: {str(e)}")
raise
# Flush any remaining messages
if any(buffer.values()):
try:
self._flush_buffer(collections, buffer, stream_configs)
except Exception as e:
logger.error(f"Error in final buffer flush: {str(e)}")
raise
@staticmethod
def _get_cluster(config: Mapping[str, Any]) -> Cluster:
auth = PasswordAuthenticator(config["username"], config["password"])
cluster = Cluster(config["connection_string"], ClusterOptions(auth))
cluster.wait_until_ready(timedelta(seconds=5))
return cluster
@staticmethod
def _sanitize_collection_name(name: str) -> str:
# Replace invalid characters with underscores
sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', name)
# Ensure the name starts with a letter
if not sanitized[0].isalpha():
sanitized = 'c_' + sanitized
return sanitized
@classmethod
def _setup_collection(cls, cluster, bucket_name, scope_name, collection_name):
try:
bucket = cluster.bucket(bucket_name)
bucket_manager = bucket.collections()
# Check if collection exists, create if it doesn't
collections = bucket_manager.get_all_scopes()
collection_exists = any(
scope.name == scope_name and collection_name in [col.name for col in scope.collections]
for scope in collections
)
if not collection_exists:
<http://logger.info|logger.info>(f"Collection '{collection_name}' does not exist. Creating it...")
bucket_manager.create_collection(scope_name, collection_name)
<http://logger.info|logger.info>(f"Collection '{collection_name}' created successfully.")
else:
<http://logger.info|logger.info>(f"Collection '{collection_name}' already exists. Skipping creation.")
collection = bucket.scope(scope_name).collection(collection_name)
# Ensure primary index exists with retry logic
max_retries = 5
retry_delay = 1 # seconds
for attempt in range(max_retries):
try:
query = f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{bucket_name}`.`{scope_name}`.`{collection_name}`"
cluster.query(query).execute()
<http://logger.info|logger.info>("Primary index present or created successfully.")
break
except KeyspaceNotFoundException as e:
if attempt < max_retries - 1:
logger.warning(f"KeyspaceNotFoundException on attempt {attempt + 1}. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
logger.error(f"Error creating primary index after {attempt + 1} attempts: {str(e)}")
raise
except Exception as e:
logger.error(f"Error creating primary index: {str(e)}")
raise
return collection
except Exception as e:
raise RuntimeError(f"Error setting up collection: {str(e)}")
@staticmethod
def _clear_collection(cluster, bucket_name, scope_name, collection_name):
try:
query = f"DELETE FROM `{bucket_name}`.`{scope_name}`.`{collection_name}`"
cluster.query(query).execute()
logger.info("All documents cleared from the collection.")
except Exception as e:
logger.warning(f"Error while clearing documents: {str(e)}. The collection might be empty.")
@classmethod
def _prepare_document(
cls,
stream: str,
data: Mapping[str, Any],
primary_key: List[List[str]],
sync_mode: DestinationSyncMode = DestinationSyncMode.append
) -> Mapping[str, Any]:
logger.debug(f"Preparing document for stream '{stream}' with sync mode {sync_mode}")
# Only use primary key for append_dedup mode
if sync_mode == DestinationSyncMode.append_dedup and primary_key:
composite_key = cls._extract_composite_key(data, primary_key)
if composite_key:
doc_id = f"{stream}::{composite_key}"
logger.debug(f"Created document ID using composite key: {doc_id}")
else:
doc_id = f"{stream}::{str(uuid4())}"
logger.warning(f"Failed to extract composite key for append_dedup mode, falling back to UUID: {doc_id}")
else:
# For append and overwrite modes, just use UUID
doc_id = f"{stream}::{str(uuid4())}"
logger.debug(f"Using UUID for document ID (sync mode: {sync_mode})")
document = {
"id": doc_id,
"type": "airbyte_record",
"stream": stream,
"data": data
}
return document
@staticmethod
def _extract_composite_key(data: Mapping[str, Any], primary_key: List[List[str]]) -> Optional[str]:
try:
key_parts = []
for key_field in primary_key:
if not key_field: # Skip empty key paths
continue
value = data
# For each part of the key path (handling nested fields)
for key_part in key_field:
if not isinstance(value, Mapping) or key_part not in value:
return None
value = value[key_part]
if value is None: # Skip None values
return None
key_parts.append(str(value))
if not key_parts: # If no valid key parts were found
return None
return "::".join(key_parts)
except Exception as e:
logger.error(f"Error extracting composite key: {str(e)}")
return None
@staticmethod
def _flush_buffer(collections, buffer: Mapping[str, list], stream_configs: Mapping[str, dict]):
for stream, documents in buffer.items():
if documents:
collection = collections[stream]
batch = {doc["id"]: doc for doc in documents}
try:
# Set a longer timeout for the entire batch operation
timeout = timedelta(seconds=len(batch) * 2.5) # 2.5 seconds per document
options = UpsertMultiOptions(timeout=timeout)
result = collection.upsert_multi(batch, options)
if not result.all_ok:
for doc_id, ex in result.exceptions.items():
if isinstance(ex, DocumentExistsException):
if stream_configs[stream]['sync_mode'] == DestinationSyncMode.append_dedup:
# For append_dedup, document exists is expected and not an error
<http://logger.info|logger.info>(f"Skipping duplicate document '{doc_id}' for stream {stream}")
else:
logger.warning(f"Document with ID '{doc_id}' already exists in the collection for stream {stream}")
else:
logger.error(f"Failed to upsert document '{doc_id}' for stream {stream}. Error: {ex}")
<http://logger.info|logger.info>(f"Successfully loaded {len(batch)} documents for stream {stream}")
except CouchbaseException as e:
logger.error(f"Error occurred while loading documents for stream {stream}: {e}")
logger.error(f"Full exception details: {repr(e)}")
buffer.clear() # Clear the buffer after flushing
Kaustav Ghosh
10/29/2024, 9:40 AMdef write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
"""
Reads the input stream of messages, config, and catalog to write data to Couchbase.
"""
cluster = self._get_cluster(config)
bucket_name = config["bucket"]
scope_name = config.get("scope", "_default")
batch_size = config.get("batch_size", 1000)
# Log connection config structure
<http://logger.info|logger.info>(f"Connection Config Structure: {{'bucket': '{bucket_name}', 'scope': '{scope_name}'}}")
# Get stream configurations including sync modes and primary keys
stream_configs = {}
for stream in configured_catalog.streams:
stream_configs[stream.stream.name] = {
'collection_name': self._sanitize_collection_name(stream.stream.name),
'sync_mode': stream.destination_sync_mode,
'primary_key': stream.stream.source_defined_primary_key
}
<http://logger.info|logger.info>(f"Starting write to Couchbase with {len(stream_configs)} streams")
# Set up collections and clear data for overwrite mode
collections = {}
for stream_name, stream_config in stream_configs.items():
collection = self._setup_collection(cluster, bucket_name, scope_name, stream_config['collection_name'])
if stream_config['sync_mode'] == DestinationSyncMode.overwrite:
self._clear_collection(cluster, bucket_name, scope_name, stream_config['collection_name'])
collections[stream_name] = collection
buffer = {}
for message in input_messages:
# json dumps for logging purposes
logger.debug(f"Processing message: {json.dumps(message.to_dict())}")
if message.type == Type.STATE:
try:
self._flush_buffer(collections, buffer, stream_configs)
yield message
except Exception as e:
logger.error(f"Error processing state message: {str(e)}")
raise
elif message.type == Type.RECORD:
data = message.record.data
stream = message.record.stream
try:
if stream not in stream_configs:
logger.warning(f"Encountered new stream {stream}. Adding to streams and creating collection.")
collection_name = self._sanitize_collection_name(stream)
stream_configs[stream] = {
'collection_name': collection_name,
'sync_mode': DestinationSyncMode.append,
'primary_key': []
}
collections[stream] = self._setup_collection(cluster, bucket_name, scope_name, collection_name)
if stream not in buffer:
buffer[stream] = []
document = self._prepare_document(
stream,
data,
stream_configs[stream]['primary_key'],
stream_configs[stream]['sync_mode']
)
if len(buffer[stream]) == 0:
<http://logger.info|logger.info>(f"Document Structure for stream '{stream}': {document}")
buffer[stream].append(document)
if len(buffer[stream]) >= batch_size:
self._flush_buffer(collections, {stream: buffer[stream]}, stream_configs)
buffer[stream] = []
except Exception as e:
logger.error(f"Error processing record for stream {stream}: {str(e)}")
raise
# Flush any remaining messages
if any(buffer.values()):
try:
self._flush_buffer(collections, buffer, stream_configs)
except Exception as e:
logger.error(f"Error in final buffer flush: {str(e)}")
raise
can json dump work with messagehenryd
10/29/2024, 10:13 AMdavid balli
10/29/2024, 10:31 AMdavid balli
10/29/2024, 10:39 AMdavid balli
10/29/2024, 10:40 AMOisin McKnight
10/29/2024, 10:46 AMhenryd
10/29/2024, 10:49 AMMani Vellaisamy
10/29/2024, 12:16 PMAntonio Lee
10/29/2024, 2:38 PMYannick Sacherer
10/29/2024, 2:46 PM