https://linen.dev logo
Join Slack
Powered by
# ask-ai
  • c

    Christopher Greene

    10/28/2024, 8:20 PM
    assume my deployment is air gapped and can't pull from docker hub. What are all the images I need mirror into my environment to make sure custom connector builder can work
    u
    u
    • 3
    • 3
  • m

    Mounika Naga

    10/28/2024, 8:46 PM
    @kapa.ai How to set the cursor_field for airbyte connection while using terraform
    u
    • 2
    • 1
  • f

    Francisco García

    10/28/2024, 9:54 PM
    how to upgrade airbyte from 0.44.5 to 1.1 version
    u
    • 2
    • 1
  • s

    Slackbot

    10/28/2024, 10:01 PM
    This message was deleted.
    c
    u
    • 3
    • 2
  • v

    Vasil Boshnakov

    10/28/2024, 10:26 PM
    @kapa.ai I'm trying to create a connection between Redshift source and destination. Both targets were created successfully but when I try to create the connection, I'm receiving the following error: Discovering schema failed Something went wrong in the connector. See the logs for more details.
    u
    u
    +2
    • 5
    • 5
  • m

    Marco Rodriguez

    10/28/2024, 10:59 PM
    airbyte helm chart v1.1.1 having this error when running a sync
    ERROR i.a.c.ConnectorWatche'/;hgjkr(handleException):175 - Error performing operation: io.airbyte.workers.workload.exception.DocStoreAccessException
    48
    io.airbyte.workers.workload.exception.DocStoreAccessException: Unable to write output for decd338e-5647-4c0b-adf4-da0e75f5a750_4821_4_check
    49
    at io.airbyte.workers.workload.JobOutputDocStore.writeOutput(JobOutputDocStore.kt:72) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
    50
    at y6t543io.airbyte.workers.workload.JobOutputDocStore.write(JobOutputDocStore.kt:38) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
    51
    at io.airbyte.connectorSidecar.ConnectorWatcher.saveConnectorOutput(ConnectorWatcher.kt:163) [io.airbyte-airbyte-connector-sidecar-1.1.0.jar:?]
    52
    at io.airbyte.connectorSidecar.ConnectorWatcher.run(ConnectorWatcher.kt:72) [io.airbyte-airbyte-connector-sidecar-1.1.0.jar:?]
    53
    at io.airbyte.connectorSidecar.ApplicationKt.main(Application.kt:32) [io.airbyte-airbyte-connector-sidecar-1.1.0.jar:?]
    54
    at io.airbyte.connectorSidecar.ApplicationKt.main(Application.kt) [io.airbyte-airbyte-connector-sidecar-1.1.0.jar:?]
    55
    Caused by: software.amazon.awssdk.services.s3.model.S3Exception: The authorization header is malformed; the region 'us-east-1' is wrong; expecting 'eu-west-1' (Service: S3, Status Code: 400, Request ID: CES6NK545QS4ZJAR, Extended Request ID: 4cGYQArQH+hhUsH+pDYRPQzgDYuFZpOl+RmGy/fDqG95w2qYnI3PzY6UctmCiCPSuc72+E4poQE=)
    56
    at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156) ~[aws-xml-protocol-2.27.8.jar:?]
    57
    at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108) ~[aws-xml-protocol-2.27.8.jar:?]
    58
    at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85) ~[aws-xml-protocol-2.27.8.jar:?]
    59
    at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43) ~[aws-xml-protocol-2.27.8.jar:?]
    60
    at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:93) ~[aws-core-2.27.8.jar:?]
    61
    at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:279) ~[sdk-core-2.27.8.jar:?]
    62
    at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50) ~[sdk-core-2.27.8.jar:?]
    u
    u
    +2
    • 5
    • 7
  • c

    Carlos Bernal Carvajal

    10/29/2024, 12:01 AM
    Hello @kapa.ai, One of our sync jobs suddenly started failing with a not so clear error message. It simply says
    Source process read attempt failed
    . Taking a look at the logs, it seems like the job is stuck on an infinite loop. The following 3 messages are very frequent, until the job eventually fails:
    Copy code
    2024-10-28 19:11:26 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=97, polls=0
    2024-10-28 19:11:26 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT10.022736437S after its previous call which was also logged.
    2024-10-28 19:11:26 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: no progress since last heartbeat.
    I have refreshed the connection multiple times, but it always fails on the next sync. Can someone help me figure out what the problem is? Attached you’ll finde the attempt logs. Airbyte version: v0.63.13 Source: Postgres - v3.6.22 Destination: Postrges - v2.4.0 (edited)
    u
    • 2
    • 1
  • b

    Ben Rohald

    10/29/2024, 12:05 AM
    @kapa.ai I am using jinja templating to inject the pagination variable
    "Limit": "{{ next_page_token['next_page_token'] }}"
    into my request. Even when I have pagination enabled, the variable is empty when sending the request
    u
    • 2
    • 1
  • p

    Phạm Mạnh Hùng

    10/29/2024, 1:39 AM
    Can you help me write a custom destination function in Airbyte to write to S3 and merge all synced data into a single file?
    u
    u
    +18
    • 21
    • 31
  • a

    Andrew Nada

    10/29/2024, 3:23 AM
    In airbyte when i sync bigquery to pubsub the value in stream comes in a string but when i sync github or hubspot to pubsub the value came as a sequence. What dictates the type of the value sent to a connector? how can i ensure it's consistent or at least predict it?
    u
    u
    • 3
    • 3
  • q

    Quang Nguyen

    10/29/2024, 3:45 AM
    Copy code
    at io.airbyte.api.client.auth.AirbyteAuthHeaderInterceptor.intercept(AirbyteAuthHeaderInterceptor.kt:28) ~[io.airbyte.airbyte-api-commons-0.63.14.jar:?]
                    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) ~[okhttp-4.12.0.jar:?]
                    at io.airbyte.workload.api.client.auth.WorkloadApiAuthenticationInterceptor.intercept(WorkloadApiAuthenticationInterceptor.kt:36) ~[io.airbyte.airbyte-api-workload-api-0.63.14.jar:?]
                    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) ~[okhttp-4.12.0.jar:?]
                    at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201) ~[okhttp-4.12.0.jar:?]
                    at okhttp3.internal.connection.RealCall.execute(RealCall.kt:154) ~[okhttp-4.12.0.jar:?]
                    at io.airbyte.workload.api.client.generated.WorkloadApi.workloadHeartbeatWithHttpInfo(WorkloadApi.kt:2748) ~[io.airbyte.airbyte-api-workload-api-0.63.14.jar:?]
                    at io.airbyte.workload.api.client.generated.WorkloadApi.workloadHeartbeat(WorkloadApi.kt:429) ~[io.airbyte.airbyte-api-workload-api-0.63.14.jar:?]
                    at io.airbyte.workers.general.ReplicationWorkerHelper.getWorkloadStatusHeartbeat$lambda$1(ReplicationWorkerHelper.kt:153) ~[io.airbyte-airbyte-commons-worker-0.63.14.jar:?]
                    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
                    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
                    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
                    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
            Caused by: java.io.EOFException: \n not found: limit=0 content=…
                    at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.kt:335) ~[okio-jvm-3.6.0.jar:?]
                    at okhttp3.internal.http1.HeadersReader.readLine(HeadersReader.kt:29) ~[okhttp-4.12.0.jar:?]
                    at okhttp3.internal.http1.Http1ExchangeCodec.readResponseHeaders(Http1ExchangeCodec.kt:180) ~[okhttp-4.12.0.jar:?]
                    ... 26 more
    Caused by: java.net.ConnectException: Connection refused
            at java.base/sun.nio.ch.Net.pollConnect(Native Method) ~[?:?]
            at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:682) ~[?:?]
            at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:549) ~[?:?]
            at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:592) ~[?:?]
            at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327) ~[?:?]
            at java.base/java.net.Socket.connect(Socket.java:751) ~[?:?]
            at okhttp3.internal.platform.Platform.connectSocket(Platform.kt:128) ~[okhttp-4.12.0.jar:?]
            at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.kt:295) ~[okhttp-4.12.0.jar:?]
            ... 28 more
    u
    • 2
    • 1
  • j

    james 98

    10/29/2024, 8:01 AM
    at io.airbyte.workers.process.DockerProcessFactory.create(DockerProcessFactory.java:117) ~[io.airbyte-airbyte-commons-worker-0.63.8.jar:?] at io.airbyte.workers.process.AirbyteIntegrationLauncher.check(AirbyteIntegrationLauncher.java:147) ~[io.airbyte-airbyte-commons-worker-0.63.8.jar:?] at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:71) ~[io.airbyte-airbyte-commons-worker-0.63.8.jar:?] at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:44) ~[io.airbyte-airbyte-commons-worker-0.63.8.jar:?] at io.airbyte.workers.temporal.TemporalAttemptExecution.get(TemporalAttemptExecution.java:138) ~[io.airbyte-airbyte-workers-0.63.8.jar:?] at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.lambda$runWithJobOutput$1(CheckConnectionActivityImpl.java:230) ~[io.airbyte-airbyte-workers-0.63.8.jar:?] at io.airbyte.commons.temporal.HeartbeatUtils.withBackgroundHeartbeat(HeartbeatUtils.java:57) ~[io.airbyte-airbyte-commons-temporal-core-0.63.8.jar:?] at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.runWithJobOutput(CheckConnectionActivityImpl.java:215) ~[io.airbyte-airbyte-workers-0.63.8.jar:?] at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[?:?] at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[?:?] at io.temporal.internal.activity.RootActivityInboundCallsInterceptor$POJOActivityInboundCallsInterceptor.executeActivity(RootActivityInboundCallsInterceptor.java:64) ~[temporal-sdk-1.22.3.jar:?] at io.temporal.internal.activity.RootActivityInboundCallsInterceptor.execute(RootActivityInboundCallsInterceptor.java:43) ~[temporal-sdk-1.22.3.jar:?] at io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor.execute(ActivityTaskExecutors.java:107) ~[temporal-sdk-1.22.3.jar:?] at io.temporal.internal.activity.ActivityTaskHandlerImpl.handle(ActivityTaskHandlerImpl.java:124) ~[temporal-sdk-1.22.3.jar:?] at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:278) ~[temporal-sdk-1.22.3.jar:?] at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:243) ~[temporal-sdk-1.22.3.jar:?] at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:216) ~[temporal-sdk-1.22.3.jar:?] at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:105) ~[temporal-sdk-1.22.3.jar:?] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] at java.base/java.lang.Thread.run(Thread.java:1583) [?:?] 2024-10-29 075854 platform > (postgres destination)
    u
    • 2
    • 1
  • l

    Leo Salayog

    10/29/2024, 8:07 AM
    When I get the tasks from clickup API, i get this error:
    Copy code
    Client error : 400 Bad Request {"exceptionStack":"Traceback (most recent call last):\n File \"/home/airbyte/.pyenv/versions/3.9.19/lib/python3.9/site-packages/airbyte_cdk/connector_builder/connector_builder_handler.py\", line 67, in read_stream\n stream_read = handler.get_message_groups(source, config, configured_catalog, state, limits.max_records)\n File \"/home/airbyte/.pyenv/versions/3.9.19/lib/python3.9/site-packages/airbyte_cdk/connector_builder/message_grouper.py\", line 125, in get_message_groups\n schema = schema_inferrer.get_stream_schema(configured_stream.stream.name)\n File \"/home/airbyte/.pyenv/versions/3.9.19/lib/python3.9/site-packages/airbyte_cdk/utils/schema_inferrer.py\", line 245, in get_stream_schema\n self._add_required_properties(self._clean(self.stream_to_builder[stream_name].to_schema()))\n File \"/home/airbyte/.pyenv/versions/3.9.19/lib/python3.9/site-packages/airbyte_cdk/utils/schema_inferrer.py\", line 144, in _clean\n
    u
    • 2
    • 2
  • j

    james 98

    10/29/2024, 8:20 AM
    how to check airbyte is running on linux ubuntu
    u
    • 2
    • 1
  • g

    Gerald

    10/29/2024, 8:25 AM
    how to find location local json docker
    u
    • 2
    • 1
  • s

    Slackbot

    10/29/2024, 8:32 AM
    This message was deleted.
    u
    • 2
    • 1
  • g

    Geoffrey LOUASSE

    10/29/2024, 8:33 AM
    Hey, could you tell me what this error is related to? : 2024-10-29 082442 source > WARN pool-3-thread-1 i.d.p.ChangeEventSourceCoordinator(stop):320 Coordinator didn't stop in the expected time, shutting down executor now 943 2024-10-29 082453 source > ERROR debezium-sqlserverconnector-BDDWH_PRD_LZ_MER-change-event-source-coordinator i.d.p.ErrorHandler(setProducerThrowable):52 Producer failure org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {event_serial_no=2, commit_lsn=000018bf000162480014, change_lsn=000018bf000161d000e0}
    u
    u
    +4
    • 7
    • 10
  • q

    Quentin CHURET

    10/29/2024, 8:37 AM
    Hi, Is it possible to create a Client ID/Secret that is not linked to a user account ? More like a service account qui some specific rights ?
    u
    • 2
    • 2
  • k

    Kaustav Ghosh

    10/29/2024, 8:53 AM
    @kapa.ai
    Copy code
    {
      "id": "assignees::38b29dcc-e683-425d-84a5-db91a2de3999",
      "type": "airbyte_record",
      "stream": "assignees",
      "data": {
        "login": "teetangh",
        "id": 44238657,
        "node_id": "MDQ6VXNlcjQ0MjM4NjU3",
        "avatar_url": "<https://avatars.githubusercontent.com/u/44238657?v=4>",
        "gravatar_id": "",
        "url": "<https://api.github.com/users/teetangh>",
        "html_url": "<https://github.com/teetangh>",
        "followers_url": "<https://api.github.com/users/teetangh/followers>",
        "following_url": "<https://api.github.com/users/teetangh/following{/other_user}>",
        "gists_url": "<https://api.github.com/users/teetangh/gists{/gist_id}>",
        "starred_url": "<https://api.github.com/users/teetangh/starred{/owner}{/repo}>",
        "subscriptions_url": "<https://api.github.com/users/teetangh/subscriptions>",
        "organizations_url": "<https://api.github.com/users/teetangh/orgs>",
        "repos_url": "<https://api.github.com/users/teetangh/repos>",
        "events_url": "<https://api.github.com/users/teetangh/events{/privacy}>",
        "received_events_url": "<https://api.github.com/users/teetangh/received_events>",
        "type": "User",
        "user_view_type": "public",
        "site_admin": false,
        "repository": "Kaggle-Workspace/Gradient-Descent-Algorithms"
      }
    }
    is this the correct format the destination must receive the data.Tell me some improvements
    u
    • 2
    • 1
  • k

    Kaustav Ghosh

    10/29/2024, 8:57 AM
    @kapa.ai is this logic correct
    Copy code
    class DestinationCouchbase(Destination):
    
        def write(
            self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
        ) -> Iterable[AirbyteMessage]:
    
            cluster = self._get_cluster(config)
            bucket_name = config["bucket"]
            scope_name = config.get("scope", "_default")
            batch_size = config.get("batch_size", 1000)
    
            # Log connection config structure
            <http://logger.info|logger.info>(f"Connection Config Structure: {{'bucket': '{bucket_name}', 'scope': '{scope_name}'}}")
    
            # Get stream configurations including sync modes and primary keys
            stream_configs = {}
            for stream in configured_catalog.streams:
                stream_configs[stream.stream.name] = {
                    'collection_name': self._sanitize_collection_name(stream.stream.name),
                    'sync_mode': stream.destination_sync_mode,
                    'primary_key': stream.stream.source_defined_primary_key
                }
            
            <http://logger.info|logger.info>(f"Starting write to Couchbase with {len(stream_configs)} streams")
    
            # Set up collections and clear data for overwrite mode
            collections = {}
            for stream_name, stream_config in stream_configs.items():
                collection = self._setup_collection(cluster, bucket_name, scope_name, stream_config['collection_name'])
                if stream_config['sync_mode'] == DestinationSyncMode.overwrite:
                    self._clear_collection(cluster, bucket_name, scope_name, stream_config['collection_name'])
                collections[stream_name] = collection
    
            buffer = {}
            
            for message in input_messages:
                if message.type == Type.STATE:
                    try:
                        <http://logger.info|logger.info>("Received state message")
                        self._flush_buffer(collections, buffer, stream_configs)
                        yield message
                    except Exception as e:
                        logger.error(f"Error processing state message: {str(e)}")
                        raise
    
                elif message.type == Type.RECORD:
                    <http://logger.info|logger.info>(f"Processing record for stream: {message.record.stream}")
                    data = message.record.data
                    stream = message.record.stream
                    
                    try:
                        if stream not in stream_configs:
                            logger.warning(f"Encountered new stream {stream}. Adding to streams and creating collection.")
                            collection_name = self._sanitize_collection_name(stream)
                            stream_configs[stream] = {
                                'collection_name': collection_name,
                                'sync_mode': DestinationSyncMode.append,
                                'primary_key': []
                            }
                            collections[stream] = self._setup_collection(cluster, bucket_name, scope_name, collection_name)
                        
                        if stream not in buffer:
                            buffer[stream] = []
                        
                        document = self._prepare_document(
                            stream, 
                            data, 
                            stream_configs[stream]['primary_key'],
                            stream_configs[stream]['sync_mode']
                        )
                        
                        if len(buffer[stream]) == 0:
                            <http://logger.info|logger.info>(f"Document Structure for stream '{stream}': {document}")
                        
                        buffer[stream].append(document)
    
                        if len(buffer[stream]) >= batch_size:
                            self._flush_buffer(collections, {stream: buffer[stream]}, stream_configs)
                            buffer[stream] = []
                    except Exception as e:
                        logger.error(f"Error processing record for stream {stream}: {str(e)}")
                        raise
    
            # Flush any remaining messages
            if any(buffer.values()):
                try:
                    self._flush_buffer(collections, buffer, stream_configs)
                except Exception as e:
                    logger.error(f"Error in final buffer flush: {str(e)}")
                    raise
    
        @staticmethod
        def _get_cluster(config: Mapping[str, Any]) -> Cluster:
            auth = PasswordAuthenticator(config["username"], config["password"])
            cluster = Cluster(config["connection_string"], ClusterOptions(auth))
            cluster.wait_until_ready(timedelta(seconds=5))
            return cluster
    
        @staticmethod
        def _sanitize_collection_name(name: str) -> str:
            # Replace invalid characters with underscores
            sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', name)
            # Ensure the name starts with a letter
            if not sanitized[0].isalpha():
                sanitized = 'c_' + sanitized
            return sanitized
    
        @classmethod
        def _setup_collection(cls, cluster, bucket_name, scope_name, collection_name):
            try:
                bucket = cluster.bucket(bucket_name)
                bucket_manager = bucket.collections()
                
                # Check if collection exists, create if it doesn't
                collections = bucket_manager.get_all_scopes()
                collection_exists = any(
                    scope.name == scope_name and collection_name in [col.name for col in scope.collections]
                    for scope in collections
                )
                
                if not collection_exists:
                    <http://logger.info|logger.info>(f"Collection '{collection_name}' does not exist. Creating it...")
                    bucket_manager.create_collection(scope_name, collection_name)
                    <http://logger.info|logger.info>(f"Collection '{collection_name}' created successfully.")
                else:
                    <http://logger.info|logger.info>(f"Collection '{collection_name}' already exists. Skipping creation.")
                
                collection = bucket.scope(scope_name).collection(collection_name)
                
                # Ensure primary index exists with retry logic
                max_retries = 5
                retry_delay = 1  # seconds
                
                for attempt in range(max_retries):
                    try:
                        query = f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{bucket_name}`.`{scope_name}`.`{collection_name}`"
                        cluster.query(query).execute()
                        <http://logger.info|logger.info>("Primary index present or created successfully.")
                        break
                    except KeyspaceNotFoundException as e:
                        if attempt < max_retries - 1:
                            logger.warning(f"KeyspaceNotFoundException on attempt {attempt + 1}. Retrying in {retry_delay} seconds...")
                            time.sleep(retry_delay)
                            retry_delay *= 2  # Exponential backoff
                        else:
                            logger.error(f"Error creating primary index after {attempt + 1} attempts: {str(e)}")
                            raise
                    except Exception as e:
                        logger.error(f"Error creating primary index: {str(e)}")
                        raise
                    
                return collection
            except Exception as e:
                raise RuntimeError(f"Error setting up collection: {str(e)}")
    
        @staticmethod
        def _clear_collection(cluster, bucket_name, scope_name, collection_name):
            try:
                query = f"DELETE FROM `{bucket_name}`.`{scope_name}`.`{collection_name}`"
                cluster.query(query).execute()
                logger.info("All documents cleared from the collection.")
            except Exception as e:
                logger.warning(f"Error while clearing documents: {str(e)}. The collection might be empty.")
    
        @classmethod
        def _prepare_document(
            cls,
            stream: str, 
            data: Mapping[str, Any], 
            primary_key: List[List[str]],
            sync_mode: DestinationSyncMode = DestinationSyncMode.append
        ) -> Mapping[str, Any]:
    
            logger.debug(f"Preparing document for stream '{stream}' with sync mode {sync_mode}")
            
            # Only use primary key for append_dedup mode
            if sync_mode == DestinationSyncMode.append_dedup and primary_key:
                composite_key = cls._extract_composite_key(data, primary_key)
                if composite_key:
                    doc_id = f"{stream}::{composite_key}"
                    logger.debug(f"Created document ID using composite key: {doc_id}")
                else:
                    doc_id = f"{stream}::{str(uuid4())}"
                    logger.warning(f"Failed to extract composite key for append_dedup mode, falling back to UUID: {doc_id}")
            else:
                # For append and overwrite modes, just use UUID
                doc_id = f"{stream}::{str(uuid4())}"
                logger.debug(f"Using UUID for document ID (sync mode: {sync_mode})")
    
            document = {
                "id": doc_id,
                "type": "airbyte_record",
                "stream": stream,
                "data": data
            }
            return document
        
        @staticmethod
        def _extract_composite_key(data: Mapping[str, Any], primary_key: List[List[str]]) -> Optional[str]:
    
            try:
                key_parts = []
                for key_field in primary_key:
                    if not key_field:  # Skip empty key paths
                        continue
                        
                    value = data
                    # For each part of the key path (handling nested fields)
                    for key_part in key_field:
                        if not isinstance(value, Mapping) or key_part not in value:
                            return None
                        value = value[key_part]
                    
                    if value is None:  # Skip None values
                        return None
                        
                    key_parts.append(str(value))
                    
                if not key_parts:  # If no valid key parts were found
                    return None
                    
                return "::".join(key_parts)
            except Exception as e:
                logger.error(f"Error extracting composite key: {str(e)}")
                return None
    
        @staticmethod
        def _flush_buffer(collections, buffer: Mapping[str, list], stream_configs: Mapping[str, dict]):
            for stream, documents in buffer.items():
                if documents:
                    collection = collections[stream]
                    batch = {doc["id"]: doc for doc in documents}
                    try:
                        # Set a longer timeout for the entire batch operation
                        timeout = timedelta(seconds=len(batch) * 2.5)  # 2.5 seconds per document
                        options = UpsertMultiOptions(timeout=timeout)
                        
                        result = collection.upsert_multi(batch, options)
                        if not result.all_ok:
                            for doc_id, ex in result.exceptions.items():
                                if isinstance(ex, DocumentExistsException):
                                    if stream_configs[stream]['sync_mode'] == DestinationSyncMode.append_dedup:
                                        # For append_dedup, document exists is expected and not an error
                                        <http://logger.info|logger.info>(f"Skipping duplicate document '{doc_id}' for stream {stream}")
                                    else:
                                        logger.warning(f"Document with ID '{doc_id}' already exists in the collection for stream {stream}")
                                else:
                                    logger.error(f"Failed to upsert document '{doc_id}' for stream {stream}. Error: {ex}")
                        <http://logger.info|logger.info>(f"Successfully loaded {len(batch)} documents for stream {stream}")
                    except CouchbaseException as e:
                        logger.error(f"Error occurred while loading documents for stream {stream}: {e}")
                        logger.error(f"Full exception details: {repr(e)}")
    
            buffer.clear()  # Clear the buffer after flushing
    u
    • 2
    • 1
  • k

    Kaustav Ghosh

    10/29/2024, 9:40 AM
    @kapa.ai
    Copy code
    def write(
            self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
        ) -> Iterable[AirbyteMessage]:
            """
            Reads the input stream of messages, config, and catalog to write data to Couchbase.
            """
            cluster = self._get_cluster(config)
            bucket_name = config["bucket"]
            scope_name = config.get("scope", "_default")
            batch_size = config.get("batch_size", 1000)
    
            # Log connection config structure
            <http://logger.info|logger.info>(f"Connection Config Structure: {{'bucket': '{bucket_name}', 'scope': '{scope_name}'}}")
    
            # Get stream configurations including sync modes and primary keys
            stream_configs = {}
            for stream in configured_catalog.streams:
                stream_configs[stream.stream.name] = {
                    'collection_name': self._sanitize_collection_name(stream.stream.name),
                    'sync_mode': stream.destination_sync_mode,
                    'primary_key': stream.stream.source_defined_primary_key
                }
            
            <http://logger.info|logger.info>(f"Starting write to Couchbase with {len(stream_configs)} streams")
    
            # Set up collections and clear data for overwrite mode
            collections = {}
            for stream_name, stream_config in stream_configs.items():
                collection = self._setup_collection(cluster, bucket_name, scope_name, stream_config['collection_name'])
                if stream_config['sync_mode'] == DestinationSyncMode.overwrite:
                    self._clear_collection(cluster, bucket_name, scope_name, stream_config['collection_name'])
                collections[stream_name] = collection
    
            buffer = {}
            
            for message in input_messages:
                # json dumps for logging purposes
                logger.debug(f"Processing message: {json.dumps(message.to_dict())}")
                if message.type == Type.STATE:
                    try:
                        self._flush_buffer(collections, buffer, stream_configs)
                        yield message
                    except Exception as e:
                        logger.error(f"Error processing state message: {str(e)}")
                        raise
    
                elif message.type == Type.RECORD:
                    data = message.record.data
                    stream = message.record.stream
                    
                    try:
                        if stream not in stream_configs:
                            logger.warning(f"Encountered new stream {stream}. Adding to streams and creating collection.")
                            collection_name = self._sanitize_collection_name(stream)
                            stream_configs[stream] = {
                                'collection_name': collection_name,
                                'sync_mode': DestinationSyncMode.append,
                                'primary_key': []
                            }
                            collections[stream] = self._setup_collection(cluster, bucket_name, scope_name, collection_name)
                        
                        if stream not in buffer:
                            buffer[stream] = []
                        
                        document = self._prepare_document(
                            stream, 
                            data, 
                            stream_configs[stream]['primary_key'],
                            stream_configs[stream]['sync_mode']
                        )
                        
                        if len(buffer[stream]) == 0:
                            <http://logger.info|logger.info>(f"Document Structure for stream '{stream}': {document}")
                        
                        buffer[stream].append(document)
    
                        if len(buffer[stream]) >= batch_size:
                            self._flush_buffer(collections, {stream: buffer[stream]}, stream_configs)
                            buffer[stream] = []
                    except Exception as e:
                        logger.error(f"Error processing record for stream {stream}: {str(e)}")
                        raise
    
            # Flush any remaining messages
            if any(buffer.values()):
                try:
                    self._flush_buffer(collections, buffer, stream_configs)
                except Exception as e:
                    logger.error(f"Error in final buffer flush: {str(e)}")
                    raise
    can json dump work with message
    u
    u
    +9
    • 12
    • 16
  • h

    henryd

    10/29/2024, 10:13 AM
    @kapa.ai is it worth it to deploy in kubernetes (eks aws) or scale vertically using ec2 is enough?
    u
    u
    +3
    • 6
    • 7
  • d

    david balli

    10/29/2024, 10:31 AM
    can we extract metadata from excel file names that we import ?
    u
    • 2
    • 2
  • d

    david balli

    10/29/2024, 10:39 AM
    can we import excel files using wildcards ?
    u
    • 2
    • 1
  • d

    david balli

    10/29/2024, 10:40 AM
    while importing multiple files using wildcards, is the filename metadata available in the data ?
    u
    • 2
    • 1
  • o

    Oisin McKnight

    10/29/2024, 10:46 AM
    Review Changes in Airbyte not saving after updating Airbyte Instance
    u
    • 2
    • 1
  • h

    henryd

    10/29/2024, 10:49 AM
    @kapa.ai abctl local install --values secrets-manager.yml is this correct if i want to redeploy and what if i already have existing airbyte in ec2. Basically i want to to move using secrets manager
    u
    u
    +11
    • 14
    • 17
  • m

    Mani Vellaisamy

    10/29/2024, 12:16 PM
    @kapa.ai what are all the support offering provided by airbyte for open source airbyte community and how the price is calculated for every support offering ? give me detailed pricing for support .
    u
    • 2
    • 1
  • a

    Antonio Lee

    10/29/2024, 2:38 PM
    @kapa.ai the normalization queries made by Airbyte on the Snowflake destination connector are on PythonConnector 2.7.1 which is no longer supported by Snowflake. Is there a way to upgrade the connector version
    u
    u
    +3
    • 6
    • 7
  • y

    Yannick Sacherer

    10/29/2024, 2:46 PM
    @kapa.ai where would I usually configure the limit&offset of for the api request?
    u
    • 2
    • 1
1...414243...48Latest