Kaustav Ghosh
10/18/2024, 8:42 AM# Copyright (c) 2024 Couchbase, Inc., all rights reserved.
from datetime import datetime
from typing import Any, Iterable, List, Mapping, MutableMapping
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, SyncMode, Type
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import CheckpointMixin
from couchbase.cluster import Cluster
from .queries import get_documents_query
class CouchbaseStream(Stream):
primary_key = "_id"
def __init__(self, cluster: Cluster, bucket: str, scope: str, collection: str):
self.cluster = cluster
self.bucket = bucket
self.scope = scope
self.collection = collection
self._name = f"{bucket}.{scope}.{collection}"
@property
def name(self) -> str:
return self._name
class DocumentStream(CouchbaseStream, CheckpointMixin):
cursor_field = "_ab_cdc_updated_at"
def __init__(self, cluster: Cluster, bucket: str, scope: str, collection: str):
super().__init__(cluster, bucket, scope, collection)
self._state: MutableMapping[str, Any] = {}
@property
def state(self) -> MutableMapping[str, Any]:
return self._state
@state.setter
def state(self, value: MutableMapping[str, Any]):
self._state = value
def get_json_schema(self) -> Mapping[str, Any]:
return {
"$schema": "<http://json-schema.org/draft-07/schema#>",
"type": "object",
"properties": {
"_id": {"type": "string"},
self.cursor_field: {"type": "integer"},
self.collection: {
"type": "object",
"additionalProperties": True
}
}
}
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: AirbyteRecordMessage) -> Mapping[str, Any]:
latest_cursor_value = latest_record.data.get(self.cursor_field)
current_cursor_value = current_stream_state.get(self.cursor_field)
if latest_cursor_value is not None and (current_cursor_value is None or int(latest_cursor_value) > int(current_cursor_value)):
return {self.cursor_field: int(latest_cursor_value)}
return current_stream_state
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[AirbyteMessage]:
cursor_value = stream_state.get(self.cursor_field, 0) if stream_state else 0
query = get_documents_query(
self.bucket,
self.scope,
self.collection,
self.cursor_field,
cursor_value if sync_mode == SyncMode.incremental else None
)
for row in self.cluster.query(query):
record = AirbyteRecordMessage(
stream=self.name,
data=row,
emitted_at=int(datetime.now().timestamp()) * 1000
)
yield AirbyteMessage(type=Type.RECORD, record=record)
if sync_mode == SyncMode.incremental:
self.state = self.get_updated_state(self.state, record)
user
10/18/2024, 8:43 AM<destination schema>
, it might be because the namespace is not being properly set or propagated in your connector. In the code you provided, I don't see any explicit handling of the namespace. The namespace can be included in the AirbyteRecordMessage
as shown in this [Github issue](https://github.com/airbytehq/airbyte/issues/11180) where a user had a similar issue: python message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis, namespace=namespace)
However, you need to ensure that the namespace is correctly set and propagated throughout your connector. If you're using a custom namespace, make sure it's applied to your schema and propagated to the necessary functions/classes to support custom namespaces for Streams. If you're still having issues, I recommend checking out the [Airbyte documentation](https://docs.airbyte.com/using-airbyte/schema-change-management#review-detected-breaking-schema-changes) on schema change management for more information on how to handle schema changes and namespaces.Kaustav Ghosh
10/18/2024, 8:56 AM{index_name}
ON {bucket}
.{scope}
.`{collection}`"
logging.debug(f"Executing query to ensure primary index: {query}")
try:
cluster.query(query).execute()
logging.debug(f"Successfully ensured primary index for {bucket}.{scope}.{collection}")
except Exception as e:
logging.warning(f"Failed to create primary index for {bucket}.{scope}.{collection}: {str(e)}")
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
self._set_config_values(config)
try:
cluster = self._get_cluster()
bucket = cluster.bucket(self.bucket_name)
bucket.ping()
logger.info("Successfully connected to Couchbase cluster and bucket")
return True, None
except Exception as e:
logger.error(f"Connection check failed: {str(e)}")
return False, f"Connection check failed: {str(e)}"
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._set_config_values(config)
cluster = self._get_cluster()
bucket = cluster.bucket(self.bucket_name)
streams = []
for scope in bucket.collections().get_all_scopes():
for collection in scope.collections:
self._ensure_primary_index(cluster, self.bucket_name, scope.name, collection.name)
stream = DocumentStream(
cluster,
self.bucket_name,
scope.name,
collection.name
)
streams.append(stream)
logging.info(f"Added stream for {scope.name}.{collection.name}")
logging.info(f"Generated {len(streams)} streams")
return streams
here is my source.pyuser
10/18/2024, 8:56 AMuser
10/18/2024, 8:56 AMSourceCouchbase
class, you're creating DocumentStream
instances for each collection in each scope of your bucket. However, in the DocumentStream
class definition provided in your previous message, there's no handling of the namespace. The namespace should be included in the AirbyteRecordMessage
as shown in this [Github issue](https://github.com/airbytehq/airbyte/issues/11180) where a user had a similar issue: python message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis, namespace=namespace)
In your case, you might want to consider using the bucket, scope, or collection name as the namespace, depending on your use case. You would need to modify the DocumentStream
class to accept a namespace parameter and use it when creating AirbyteRecordMessage
instances. If you're still having issues, I recommend checking out the [Airbyte documentation](https://docs.airbyte.com/using-airbyte/schema-change-management#review-detected-breaking-schema-changes) on schema change management for more information on how to handle schema changes and namespaces.