Kaustav Ghosh
10/29/2024, 8:57 AMclass DestinationCouchbase(Destination):
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
cluster = self._get_cluster(config)
bucket_name = config["bucket"]
scope_name = config.get("scope", "_default")
batch_size = config.get("batch_size", 1000)
# Log connection config structure
<http://logger.info|logger.info>(f"Connection Config Structure: {{'bucket': '{bucket_name}', 'scope': '{scope_name}'}}")
# Get stream configurations including sync modes and primary keys
stream_configs = {}
for stream in configured_catalog.streams:
stream_configs[stream.stream.name] = {
'collection_name': self._sanitize_collection_name(stream.stream.name),
'sync_mode': stream.destination_sync_mode,
'primary_key': stream.stream.source_defined_primary_key
}
<http://logger.info|logger.info>(f"Starting write to Couchbase with {len(stream_configs)} streams")
# Set up collections and clear data for overwrite mode
collections = {}
for stream_name, stream_config in stream_configs.items():
collection = self._setup_collection(cluster, bucket_name, scope_name, stream_config['collection_name'])
if stream_config['sync_mode'] == DestinationSyncMode.overwrite:
self._clear_collection(cluster, bucket_name, scope_name, stream_config['collection_name'])
collections[stream_name] = collection
buffer = {}
for message in input_messages:
if message.type == Type.STATE:
try:
<http://logger.info|logger.info>("Received state message")
self._flush_buffer(collections, buffer, stream_configs)
yield message
except Exception as e:
logger.error(f"Error processing state message: {str(e)}")
raise
elif message.type == Type.RECORD:
<http://logger.info|logger.info>(f"Processing record for stream: {message.record.stream}")
data = message.record.data
stream = message.record.stream
try:
if stream not in stream_configs:
logger.warning(f"Encountered new stream {stream}. Adding to streams and creating collection.")
collection_name = self._sanitize_collection_name(stream)
stream_configs[stream] = {
'collection_name': collection_name,
'sync_mode': DestinationSyncMode.append,
'primary_key': []
}
collections[stream] = self._setup_collection(cluster, bucket_name, scope_name, collection_name)
if stream not in buffer:
buffer[stream] = []
document = self._prepare_document(
stream,
data,
stream_configs[stream]['primary_key'],
stream_configs[stream]['sync_mode']
)
if len(buffer[stream]) == 0:
<http://logger.info|logger.info>(f"Document Structure for stream '{stream}': {document}")
buffer[stream].append(document)
if len(buffer[stream]) >= batch_size:
self._flush_buffer(collections, {stream: buffer[stream]}, stream_configs)
buffer[stream] = []
except Exception as e:
logger.error(f"Error processing record for stream {stream}: {str(e)}")
raise
# Flush any remaining messages
if any(buffer.values()):
try:
self._flush_buffer(collections, buffer, stream_configs)
except Exception as e:
logger.error(f"Error in final buffer flush: {str(e)}")
raise
@staticmethod
def _get_cluster(config: Mapping[str, Any]) -> Cluster:
auth = PasswordAuthenticator(config["username"], config["password"])
cluster = Cluster(config["connection_string"], ClusterOptions(auth))
cluster.wait_until_ready(timedelta(seconds=5))
return cluster
@staticmethod
def _sanitize_collection_name(name: str) -> str:
# Replace invalid characters with underscores
sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', name)
# Ensure the name starts with a letter
if not sanitized[0].isalpha():
sanitized = 'c_' + sanitized
return sanitized
@classmethod
def _setup_collection(cls, cluster, bucket_name, scope_name, collection_name):
try:
bucket = cluster.bucket(bucket_name)
bucket_manager = bucket.collections()
# Check if collection exists, create if it doesn't
collections = bucket_manager.get_all_scopes()
collection_exists = any(
scope.name == scope_name and collection_name in [col.name for col in scope.collections]
for scope in collections
)
if not collection_exists:
<http://logger.info|logger.info>(f"Collection '{collection_name}' does not exist. Creating it...")
bucket_manager.create_collection(scope_name, collection_name)
<http://logger.info|logger.info>(f"Collection '{collection_name}' created successfully.")
else:
<http://logger.info|logger.info>(f"Collection '{collection_name}' already exists. Skipping creation.")
collection = bucket.scope(scope_name).collection(collection_name)
# Ensure primary index exists with retry logic
max_retries = 5
retry_delay = 1 # seconds
for attempt in range(max_retries):
try:
query = f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{bucket_name}`.`{scope_name}`.`{collection_name}`"
cluster.query(query).execute()
<http://logger.info|logger.info>("Primary index present or created successfully.")
break
except KeyspaceNotFoundException as e:
if attempt < max_retries - 1:
logger.warning(f"KeyspaceNotFoundException on attempt {attempt + 1}. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
logger.error(f"Error creating primary index after {attempt + 1} attempts: {str(e)}")
raise
except Exception as e:
logger.error(f"Error creating primary index: {str(e)}")
raise
return collection
except Exception as e:
raise RuntimeError(f"Error setting up collection: {str(e)}")
@staticmethod
def _clear_collection(cluster, bucket_name, scope_name, collection_name):
try:
query = f"DELETE FROM `{bucket_name}`.`{scope_name}`.`{collection_name}`"
cluster.query(query).execute()
logger.info("All documents cleared from the collection.")
except Exception as e:
logger.warning(f"Error while clearing documents: {str(e)}. The collection might be empty.")
@classmethod
def _prepare_document(
cls,
stream: str,
data: Mapping[str, Any],
primary_key: List[List[str]],
sync_mode: DestinationSyncMode = DestinationSyncMode.append
) -> Mapping[str, Any]:
logger.debug(f"Preparing document for stream '{stream}' with sync mode {sync_mode}")
# Only use primary key for append_dedup mode
if sync_mode == DestinationSyncMode.append_dedup and primary_key:
composite_key = cls._extract_composite_key(data, primary_key)
if composite_key:
doc_id = f"{stream}::{composite_key}"
logger.debug(f"Created document ID using composite key: {doc_id}")
else:
doc_id = f"{stream}::{str(uuid4())}"
logger.warning(f"Failed to extract composite key for append_dedup mode, falling back to UUID: {doc_id}")
else:
# For append and overwrite modes, just use UUID
doc_id = f"{stream}::{str(uuid4())}"
logger.debug(f"Using UUID for document ID (sync mode: {sync_mode})")
document = {
"id": doc_id,
"type": "airbyte_record",
"stream": stream,
"data": data
}
return document
@staticmethod
def _extract_composite_key(data: Mapping[str, Any], primary_key: List[List[str]]) -> Optional[str]:
try:
key_parts = []
for key_field in primary_key:
if not key_field: # Skip empty key paths
continue
value = data
# For each part of the key path (handling nested fields)
for key_part in key_field:
if not isinstance(value, Mapping) or key_part not in value:
return None
value = value[key_part]
if value is None: # Skip None values
return None
key_parts.append(str(value))
if not key_parts: # If no valid key parts were found
return None
return "::".join(key_parts)
except Exception as e:
logger.error(f"Error extracting composite key: {str(e)}")
return None
@staticmethod
def _flush_buffer(collections, buffer: Mapping[str, list], stream_configs: Mapping[str, dict]):
for stream, documents in buffer.items():
if documents:
collection = collections[stream]
batch = {doc["id"]: doc for doc in documents}
try:
# Set a longer timeout for the entire batch operation
timeout = timedelta(seconds=len(batch) * 2.5) # 2.5 seconds per document
options = UpsertMultiOptions(timeout=timeout)
result = collection.upsert_multi(batch, options)
if not result.all_ok:
for doc_id, ex in result.exceptions.items():
if isinstance(ex, DocumentExistsException):
if stream_configs[stream]['sync_mode'] == DestinationSyncMode.append_dedup:
# For append_dedup, document exists is expected and not an error
<http://logger.info|logger.info>(f"Skipping duplicate document '{doc_id}' for stream {stream}")
else:
logger.warning(f"Document with ID '{doc_id}' already exists in the collection for stream {stream}")
else:
logger.error(f"Failed to upsert document '{doc_id}' for stream {stream}. Error: {ex}")
<http://logger.info|logger.info>(f"Successfully loaded {len(batch)} documents for stream {stream}")
except CouchbaseException as e:
logger.error(f"Error occurred while loading documents for stream {stream}: {e}")
logger.error(f"Full exception details: {repr(e)}")
buffer.clear() # Clear the buffer after flushing
user
10/29/2024, 8:57 AM