<@U04SE163WC8> in my namespace , i get &lt;destina...
# ask-ai
k
@kapa.ai in my namespace , i get <destination schema>. how to fix this
Copy code
# Copyright (c) 2024 Couchbase, Inc., all rights reserved.

from datetime import datetime
from typing import Any, Iterable, List, Mapping, MutableMapping

from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, SyncMode, Type
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import CheckpointMixin
from couchbase.cluster import Cluster

from .queries import get_documents_query

class CouchbaseStream(Stream):
    primary_key = "_id"

    def __init__(self, cluster: Cluster, bucket: str, scope: str, collection: str):
        self.cluster = cluster
        self.bucket = bucket
        self.scope = scope
        self.collection = collection
        self._name = f"{bucket}.{scope}.{collection}"

    @property
    def name(self) -> str:
        return self._name

class DocumentStream(CouchbaseStream, CheckpointMixin):
    cursor_field = "_ab_cdc_updated_at"

    def __init__(self, cluster: Cluster, bucket: str, scope: str, collection: str):
        super().__init__(cluster, bucket, scope, collection)
        self._state: MutableMapping[str, Any] = {}

    @property
    def state(self) -> MutableMapping[str, Any]:
        return self._state

    @state.setter
    def state(self, value: MutableMapping[str, Any]):
        self._state = value

    def get_json_schema(self) -> Mapping[str, Any]:
        return {
            "$schema": "<http://json-schema.org/draft-07/schema#>",
            "type": "object",
            "properties": {
                "_id": {"type": "string"},
                self.cursor_field: {"type": "integer"},
                self.collection: {
                    "type": "object",
                    "additionalProperties": True
                }
            }
        }

    def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: AirbyteRecordMessage) -> Mapping[str, Any]:
        latest_cursor_value = latest_record.data.get(self.cursor_field)
        current_cursor_value = current_stream_state.get(self.cursor_field)

        if latest_cursor_value is not None and (current_cursor_value is None or int(latest_cursor_value) > int(current_cursor_value)):
            return {self.cursor_field: int(latest_cursor_value)}
        return current_stream_state

    def read_records(
        self,
        sync_mode: SyncMode,
        cursor_field: List[str] = None,
        stream_slice: Mapping[str, Any] = None,
        stream_state: Mapping[str, Any] = None,
    ) -> Iterable[AirbyteMessage]:
        cursor_value = stream_state.get(self.cursor_field, 0) if stream_state else 0

        query = get_documents_query(
            self.bucket, 
            self.scope, 
            self.collection, 
            self.cursor_field, 
            cursor_value if sync_mode == SyncMode.incremental else None
        )

        for row in self.cluster.query(query):
            record = AirbyteRecordMessage(
                stream=self.name,
                data=row,
                emitted_at=int(datetime.now().timestamp()) * 1000
            )
            yield AirbyteMessage(type=Type.RECORD, record=record)
            if sync_mode == SyncMode.incremental:
                self.state = self.get_updated_state(self.state, record)
u
It seems like you're having an issue with the namespace in your Airbyte connector. The namespace is typically used to determine the schema where the data will be written in the destination. If you're seeing
<destination schema>
, it might be because the namespace is not being properly set or propagated in your connector. In the code you provided, I don't see any explicit handling of the namespace. The namespace can be included in the
AirbyteRecordMessage
as shown in this [Github issue](https://github.com/airbytehq/airbyte/issues/11180) where a user had a similar issue:
Copy code
python message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis, namespace=namespace)
However, you need to ensure that the namespace is correctly set and propagated throughout your connector. If you're using a custom namespace, make sure it's applied to your schema and propagated to the necessary functions/classes to support custom namespaces for Streams. If you're still having issues, I recommend checking out the [Airbyte documentation](https://docs.airbyte.com/using-airbyte/schema-change-management#review-detected-breaking-schema-changes) on schema change management for more information on how to handle schema changes and namespaces.
k
@kapa.ai that doesnt fix it, # Copyright (c) 2024 Couchbase, Inc., all rights reserved. import logging from datetime import timedelta from typing import Any, List, Mapping, Tuple from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from couchbase.auth import PasswordAuthenticator from couchbase.cluster import Cluster from couchbase.options import ClusterOptions from .streams import DocumentStream class SourceCouchbase(AbstractSource): def __init__(self): super().__init__() self.connection_string = None self.username = None self.password = None self.bucket_name = None @property def name(self) -> str: return "Couchbase" def _set_config_values(self, config: Mapping[str, Any]): self.connection_string = config['connection_string'] self.username = config['username'] self.password = config['password'] self.bucket_name = config['bucket'] def _get_cluster(self) -> Cluster: auth = PasswordAuthenticator(self.username, self.password) options = ClusterOptions(auth) options.apply_profile('wan_development') cluster = Cluster(self.connection_string, options) cluster.wait_until_ready(timedelta(seconds=5)) return cluster @staticmethod def _ensure_primary_index(cluster: Cluster, bucket: str, scope: str, collection: str): index_name = f"{bucket}_{scope}_{collection}_primary_index" query = f"CREATE PRIMARY INDEX IF NOT EXISTS
{index_name}
ON
{bucket}
.
{scope}
.`{collection}`" logging.debug(f"Executing query to ensure primary index: {query}") try: cluster.query(query).execute() logging.debug(f"Successfully ensured primary index for {bucket}.{scope}.{collection}") except Exception as e: logging.warning(f"Failed to create primary index for {bucket}.{scope}.{collection}: {str(e)}") def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: self._set_config_values(config) try: cluster = self._get_cluster() bucket = cluster.bucket(self.bucket_name) bucket.ping() logger.info("Successfully connected to Couchbase cluster and bucket") return True, None except Exception as e: logger.error(f"Connection check failed: {str(e)}") return False, f"Connection check failed: {str(e)}" def streams(self, config: Mapping[str, Any]) -> List[Stream]: self._set_config_values(config) cluster = self._get_cluster() bucket = cluster.bucket(self.bucket_name) streams = [] for scope in bucket.collections().get_all_scopes(): for collection in scope.collections: self._ensure_primary_index(cluster, self.bucket_name, scope.name, collection.name) stream = DocumentStream( cluster, self.bucket_name, scope.name, collection.name ) streams.append(stream) logging.info(f"Added stream for {scope.name}.{collection.name}") logging.info(f"Generated {len(streams)} streams") return streams here is my source.py
u
A message from kapa.ai
u
From the code you've provided, it seems like you're trying to create a custom Airbyte connector for Couchbase. However, you're still having issues with the namespace. In your
SourceCouchbase
class, you're creating
DocumentStream
instances for each collection in each scope of your bucket. However, in the
DocumentStream
class definition provided in your previous message, there's no handling of the namespace. The namespace should be included in the
AirbyteRecordMessage
as shown in this [Github issue](https://github.com/airbytehq/airbyte/issues/11180) where a user had a similar issue:
Copy code
python message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis, namespace=namespace)
In your case, you might want to consider using the bucket, scope, or collection name as the namespace, depending on your use case. You would need to modify the
DocumentStream
class to accept a namespace parameter and use it when creating
AirbyteRecordMessage
instances. If you're still having issues, I recommend checking out the [Airbyte documentation](https://docs.airbyte.com/using-airbyte/schema-change-management#review-detected-breaking-schema-changes) on schema change management for more information on how to handle schema changes and namespaces.