<@U04SE163WC8> is this logic correct ```class Dest...
# ask-ai
k
@kapa.ai is this logic correct
Copy code
class DestinationCouchbase(Destination):

    def write(
        self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
    ) -> Iterable[AirbyteMessage]:

        cluster = self._get_cluster(config)
        bucket_name = config["bucket"]
        scope_name = config.get("scope", "_default")
        batch_size = config.get("batch_size", 1000)

        # Log connection config structure
        <http://logger.info|logger.info>(f"Connection Config Structure: {{'bucket': '{bucket_name}', 'scope': '{scope_name}'}}")

        # Get stream configurations including sync modes and primary keys
        stream_configs = {}
        for stream in configured_catalog.streams:
            stream_configs[stream.stream.name] = {
                'collection_name': self._sanitize_collection_name(stream.stream.name),
                'sync_mode': stream.destination_sync_mode,
                'primary_key': stream.stream.source_defined_primary_key
            }
        
        <http://logger.info|logger.info>(f"Starting write to Couchbase with {len(stream_configs)} streams")

        # Set up collections and clear data for overwrite mode
        collections = {}
        for stream_name, stream_config in stream_configs.items():
            collection = self._setup_collection(cluster, bucket_name, scope_name, stream_config['collection_name'])
            if stream_config['sync_mode'] == DestinationSyncMode.overwrite:
                self._clear_collection(cluster, bucket_name, scope_name, stream_config['collection_name'])
            collections[stream_name] = collection

        buffer = {}
        
        for message in input_messages:
            if message.type == Type.STATE:
                try:
                    <http://logger.info|logger.info>("Received state message")
                    self._flush_buffer(collections, buffer, stream_configs)
                    yield message
                except Exception as e:
                    logger.error(f"Error processing state message: {str(e)}")
                    raise

            elif message.type == Type.RECORD:
                <http://logger.info|logger.info>(f"Processing record for stream: {message.record.stream}")
                data = message.record.data
                stream = message.record.stream
                
                try:
                    if stream not in stream_configs:
                        logger.warning(f"Encountered new stream {stream}. Adding to streams and creating collection.")
                        collection_name = self._sanitize_collection_name(stream)
                        stream_configs[stream] = {
                            'collection_name': collection_name,
                            'sync_mode': DestinationSyncMode.append,
                            'primary_key': []
                        }
                        collections[stream] = self._setup_collection(cluster, bucket_name, scope_name, collection_name)
                    
                    if stream not in buffer:
                        buffer[stream] = []
                    
                    document = self._prepare_document(
                        stream, 
                        data, 
                        stream_configs[stream]['primary_key'],
                        stream_configs[stream]['sync_mode']
                    )
                    
                    if len(buffer[stream]) == 0:
                        <http://logger.info|logger.info>(f"Document Structure for stream '{stream}': {document}")
                    
                    buffer[stream].append(document)

                    if len(buffer[stream]) >= batch_size:
                        self._flush_buffer(collections, {stream: buffer[stream]}, stream_configs)
                        buffer[stream] = []
                except Exception as e:
                    logger.error(f"Error processing record for stream {stream}: {str(e)}")
                    raise

        # Flush any remaining messages
        if any(buffer.values()):
            try:
                self._flush_buffer(collections, buffer, stream_configs)
            except Exception as e:
                logger.error(f"Error in final buffer flush: {str(e)}")
                raise

    @staticmethod
    def _get_cluster(config: Mapping[str, Any]) -> Cluster:
        auth = PasswordAuthenticator(config["username"], config["password"])
        cluster = Cluster(config["connection_string"], ClusterOptions(auth))
        cluster.wait_until_ready(timedelta(seconds=5))
        return cluster

    @staticmethod
    def _sanitize_collection_name(name: str) -> str:
        # Replace invalid characters with underscores
        sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', name)
        # Ensure the name starts with a letter
        if not sanitized[0].isalpha():
            sanitized = 'c_' + sanitized
        return sanitized

    @classmethod
    def _setup_collection(cls, cluster, bucket_name, scope_name, collection_name):
        try:
            bucket = cluster.bucket(bucket_name)
            bucket_manager = bucket.collections()
            
            # Check if collection exists, create if it doesn't
            collections = bucket_manager.get_all_scopes()
            collection_exists = any(
                scope.name == scope_name and collection_name in [col.name for col in scope.collections]
                for scope in collections
            )
            
            if not collection_exists:
                <http://logger.info|logger.info>(f"Collection '{collection_name}' does not exist. Creating it...")
                bucket_manager.create_collection(scope_name, collection_name)
                <http://logger.info|logger.info>(f"Collection '{collection_name}' created successfully.")
            else:
                <http://logger.info|logger.info>(f"Collection '{collection_name}' already exists. Skipping creation.")
            
            collection = bucket.scope(scope_name).collection(collection_name)
            
            # Ensure primary index exists with retry logic
            max_retries = 5
            retry_delay = 1  # seconds
            
            for attempt in range(max_retries):
                try:
                    query = f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{bucket_name}`.`{scope_name}`.`{collection_name}`"
                    cluster.query(query).execute()
                    <http://logger.info|logger.info>("Primary index present or created successfully.")
                    break
                except KeyspaceNotFoundException as e:
                    if attempt < max_retries - 1:
                        logger.warning(f"KeyspaceNotFoundException on attempt {attempt + 1}. Retrying in {retry_delay} seconds...")
                        time.sleep(retry_delay)
                        retry_delay *= 2  # Exponential backoff
                    else:
                        logger.error(f"Error creating primary index after {attempt + 1} attempts: {str(e)}")
                        raise
                except Exception as e:
                    logger.error(f"Error creating primary index: {str(e)}")
                    raise
                
            return collection
        except Exception as e:
            raise RuntimeError(f"Error setting up collection: {str(e)}")

    @staticmethod
    def _clear_collection(cluster, bucket_name, scope_name, collection_name):
        try:
            query = f"DELETE FROM `{bucket_name}`.`{scope_name}`.`{collection_name}`"
            cluster.query(query).execute()
            logger.info("All documents cleared from the collection.")
        except Exception as e:
            logger.warning(f"Error while clearing documents: {str(e)}. The collection might be empty.")

    @classmethod
    def _prepare_document(
        cls,
        stream: str, 
        data: Mapping[str, Any], 
        primary_key: List[List[str]],
        sync_mode: DestinationSyncMode = DestinationSyncMode.append
    ) -> Mapping[str, Any]:

        logger.debug(f"Preparing document for stream '{stream}' with sync mode {sync_mode}")
        
        # Only use primary key for append_dedup mode
        if sync_mode == DestinationSyncMode.append_dedup and primary_key:
            composite_key = cls._extract_composite_key(data, primary_key)
            if composite_key:
                doc_id = f"{stream}::{composite_key}"
                logger.debug(f"Created document ID using composite key: {doc_id}")
            else:
                doc_id = f"{stream}::{str(uuid4())}"
                logger.warning(f"Failed to extract composite key for append_dedup mode, falling back to UUID: {doc_id}")
        else:
            # For append and overwrite modes, just use UUID
            doc_id = f"{stream}::{str(uuid4())}"
            logger.debug(f"Using UUID for document ID (sync mode: {sync_mode})")

        document = {
            "id": doc_id,
            "type": "airbyte_record",
            "stream": stream,
            "data": data
        }
        return document
    
    @staticmethod
    def _extract_composite_key(data: Mapping[str, Any], primary_key: List[List[str]]) -> Optional[str]:

        try:
            key_parts = []
            for key_field in primary_key:
                if not key_field:  # Skip empty key paths
                    continue
                    
                value = data
                # For each part of the key path (handling nested fields)
                for key_part in key_field:
                    if not isinstance(value, Mapping) or key_part not in value:
                        return None
                    value = value[key_part]
                
                if value is None:  # Skip None values
                    return None
                    
                key_parts.append(str(value))
                
            if not key_parts:  # If no valid key parts were found
                return None
                
            return "::".join(key_parts)
        except Exception as e:
            logger.error(f"Error extracting composite key: {str(e)}")
            return None

    @staticmethod
    def _flush_buffer(collections, buffer: Mapping[str, list], stream_configs: Mapping[str, dict]):
        for stream, documents in buffer.items():
            if documents:
                collection = collections[stream]
                batch = {doc["id"]: doc for doc in documents}
                try:
                    # Set a longer timeout for the entire batch operation
                    timeout = timedelta(seconds=len(batch) * 2.5)  # 2.5 seconds per document
                    options = UpsertMultiOptions(timeout=timeout)
                    
                    result = collection.upsert_multi(batch, options)
                    if not result.all_ok:
                        for doc_id, ex in result.exceptions.items():
                            if isinstance(ex, DocumentExistsException):
                                if stream_configs[stream]['sync_mode'] == DestinationSyncMode.append_dedup:
                                    # For append_dedup, document exists is expected and not an error
                                    <http://logger.info|logger.info>(f"Skipping duplicate document '{doc_id}' for stream {stream}")
                                else:
                                    logger.warning(f"Document with ID '{doc_id}' already exists in the collection for stream {stream}")
                            else:
                                logger.error(f"Failed to upsert document '{doc_id}' for stream {stream}. Error: {ex}")
                    <http://logger.info|logger.info>(f"Successfully loaded {len(batch)} documents for stream {stream}")
                except CouchbaseException as e:
                    logger.error(f"Error occurred while loading documents for stream {stream}: {e}")
                    logger.error(f"Full exception details: {repr(e)}")

        buffer.clear()  # Clear the buffer after flushing
u
Sorry @Kaustav Ghosh, I seem to be having a bit of trouble. Don't worry though, I've pinged my support folks and they will try to fix me up! 🧑‍💻 Perhaps try again in 5mins?