Shubham
10/25/2024, 7:20 AMConfiguration check failed
"Forbidden. You don't have permission to access this resource."
Arif Chaudhary
10/25/2024, 7:21 AMWorkload dbf246d7-8seg-4c20-b226-2d0fec5f6454_135_0_sync is pending
Phạm Mạnh Hùng
10/25/2024, 7:23 AMPhạm Mạnh Hùng
10/25/2024, 7:42 AMHenrik Nilsson
10/25/2024, 7:44 AMEuan Blackledge
10/25/2024, 7:49 AMEuan Blackledge
10/25/2024, 7:50 AMuser
10/25/2024, 7:50 AMDEBUG
logs for any non-testing use of Self-Managed Airbyte. Failing to do while running at-scale syncs may result in the server
pod being overloaded, preventing most of the deployment for operating as normal. 2. Logging Level for JDBC Sources: There's an ongoing issue related to excessive logging in SQL integrations (MySQL, Postgres, BigQuery, etc.). A proposal has been made to change the logging level from INFO to DEBUG for these sources to reduce the number of log entries. This is discussed in a [GitHub issue](https://github.com/airbytehq/airbyte/issues/24983). 3. API Rate Limiting: If you're making a large number of API calls, you might be hitting rate limits. This could potentially cause errors and increased logging. As mentioned in one of the issues, a user experienced problems when triggering around 200 syncs/second through the Airbyte API [GitHub issue](https://github.com/airbytehq/airbyte/issues/44833). To address your specific situation: 1. Verify that DEBUG logs are indeed turned off, as you mentioned setting the logging level to error. 2. Check if you're using any JDBC sources, as these might be contributing to the log flood. 3. Review your API usage and consider implementing rate limiting or spreading out your sync job requests if you're triggering many syncs in a short period. 4. If the problem persists, you might want to consider opening a new issue on the Airbyte GitHub repository or reaching out to their support team with your specific error messages and configuration details. Remember, while these suggestions are based on similar issues reported by other users, your specific situation might require a more tailored solution. If the problem continues, it would be best to engage directly with Airbyte support or the community for more targeted assistance.Julian Andersen
10/25/2024, 8:24 AMTorsten Fraust
10/25/2024, 8:38 AMBalaji Seetharaman
10/25/2024, 9:00 AMInvalid Protocol Schema: The endpoint that data is being requested from is using an invalid or insecure. Exception: requests.exceptions.InvalidSchema - Traceback (most recent call last): File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py", line 133, in read yield from self._read_stream( File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py", line 239, in _read_stream for record_data_or_message in record_iterator: File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/streams/core.py", line 192, in read for record_data_or_message in records: File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 137, in read_records yield from self.retriever.read_records(self.get_json_schema(), stream_slice) # type: ignore # records are of the correct type File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 378, in read_records for stream_data in self._read_pages(record_generator, self.state, _slice): File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 301, in _read_pages response = self._fetch_next_page(stream_state, stream_slice, next_page_token) File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 487, in _fetch_next_page return self.requester.send_request( File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/requesters/http_requester.py", line 305, in send_request request, response = self._http_client.send_request( File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/streams/http/http_client.py", line 406, in send_request response: requests.Response = self._send_with_retry( File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/streams/http/http_client.py", line 236, in _send_with_retry response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(request, request_kwargs, log_formatter=log_formatter, exit_on_rate_limit=exit_on_rate_limit) # type: ignore # mypy can't infer that backoff_handler wraps _send File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/backoff/_sync.py", line 105, in retry ret = target(*args, **kwargs) File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/backoff/_sync.py", line 105, in retry ret = target(*args, **kwargs) File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/backoff/_sync.py", line 105, in retry ret = target(*args, **kwargs) File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/streams/http/http_client.py", line 290, in _send self._handle_error_resolution( File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/streams/http/http_client.py", line 326, in _handle_error_resolution raise MessageRepresentationAirbyteTracedErrors( airbyte_cdk.sources.streams.http.http_client.MessageRepresentationAirbyteTracedErrors: Invalid Protocol Schema: The endpoint that data is being requested from is using an invalid or insecure. Exception: requests.exceptions.InvalidSchema
ERROR
None - Traceback (most recent call last): File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/connector_builder/message_grouper.py", line 302, in _read_stream yield from AirbyteEntrypoint(source).read(source.spec(self.logger), config, configured_catalog, state) File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py", line 191, in read for message in self.source.read(self.logger, config, catalog, state): File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/manifest_declarative_source.py", line 165, in read yield from super().read(logger, config, catalog, state) File "/home/airbyte/.pyenv/versions/3.10.14/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py", line 177, in read raise AirbyteTracedException(message=error_message, failure_type=FailureType.config_error) airbyte_cdk.utils.traced_exception.AirbyteTracedException: None
Kaustav Ghosh
10/25/2024, 9:35 AM# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
import logging
import re
from datetime import timedelta
from typing import Any, Iterable, Mapping
from uuid import uuid4
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import DocumentExistsException, CouchbaseException
from couchbase.options import ClusterOptions, UpsertMultiOptions
logger = logging.getLogger("airbyte")
class DestinationCouchbase(Destination):
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
"""
Reads the input stream of messages, config, and catalog to write data to Couchbase.
"""
cluster = self._get_cluster(config)
bucket_name = config["bucket"]
scope_name = config.get("scope", "_default")
streams = {s.stream.name: self._sanitize_collection_name(s.stream.name) for s in configured_catalog.streams}
logger.info(f"Starting write to Couchbase with {len(streams)} streams")
collections = {stream: self._setup_collection(cluster, bucket_name, scope_name, sanitized_name)
for stream, sanitized_name in streams.items()}
buffer = {}
buffer_size = 1000 # Adjust as needed
for message in input_messages:
if message.type == Type.STATE:
self._flush_buffer(collections, buffer)
yield message
elif message.type == Type.RECORD:
data = message.record.data
stream = message.record.stream
if stream not in streams:
logger.warning(f"Encountered new stream {stream}. Adding to streams and creating collection.")
sanitized_name = self._sanitize_collection_name(stream)
streams[stream] = sanitized_name
collections[stream] = self._setup_collection(cluster, bucket_name, scope_name, sanitized_name)
if stream not in buffer:
buffer[stream] = []
buffer[stream].append(self._prepare_document(stream, data))
if len(buffer[stream]) >= buffer_size:
self._flush_buffer(collections, {stream: buffer[stream]})
buffer[stream] = []
# Flush any remaining messages
self._flush_buffer(collections, buffer)
@staticmethod
def _get_cluster(config: Mapping[str, Any]) -> Cluster:
auth = PasswordAuthenticator(config["username"], config["password"])
cluster = Cluster(config["connection_string"], ClusterOptions(auth))
cluster.wait_until_ready(timedelta(seconds=5))
return cluster
@staticmethod
def _sanitize_collection_name(name: str) -> str:
# Replace invalid characters with underscores
sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', name)
# Ensure the name starts with a letter
if not sanitized[0].isalpha():
sanitized = 'c_' + sanitized
return sanitized
@classmethod
def _setup_collection(cls, cluster, bucket_name, scope_name, collection_name):
try:
bucket = cluster.bucket(bucket_name)
bucket_manager = bucket.collections()
# Check if collection exists, create if it doesn't
collections = bucket_manager.get_all_scopes()
collection_exists = any(
scope.name == scope_name and collection_name in [col.name for col in scope.collections]
for scope in collections
)
if not collection_exists:
logger.info(f"Collection '{collection_name}' does not exist. Creating it...")
bucket_manager.create_collection(scope_name, collection_name)
logger.info(f"Collection '{collection_name}' created successfully.")
else:
logger.info(f"Collection '{collection_name}' already exists. Skipping creation.")
collection = bucket.scope(scope_name).collection(collection_name)
# Ensure primary index exists
try:
cluster.query(f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{bucket_name}`.`{scope_name}`.`{collection_name}`").execute()
logger.info("Primary index present or created successfully.")
except Exception as e:
logger.warning(f"Error creating primary index: {str(e)}")
# Clear all documents in the collection
try:
query = f"DELETE FROM `{bucket_name}`.`{scope_name}`.`{collection_name}`"
cluster.query(query).execute()
logger.info("All documents cleared from the collection.")
except Exception as e:
logger.warning(f"Error while clearing documents: {str(e)}. The collection might be empty.")
return collection
except Exception as e:
raise RuntimeError(f"Error setting up collection: {str(e)}")
@staticmethod
def _prepare_document(stream: str, data: Mapping[str, Any]) -> Mapping[str, Any]:
return {
"id": f"{stream}::{str(uuid4())}",
"type": "airbyte_record",
"stream": stream,
"data": data
}
@staticmethod
def _flush_buffer(collections, buffer: Mapping[str, list]):
for stream, documents in buffer.items():
if documents:
collection = collections[stream]
batch = {doc["id"]: doc for doc in documents}
try:
# Set a longer timeout for the entire batch operation
timeout = timedelta(seconds=len(batch) * 2.5) # 2.5 seconds per document
options = UpsertMultiOptions(timeout=timeout)
result = collection.upsert_multi(batch, options)
if not result.all_ok:
for doc_id, ex in result.exceptions.items():
if isinstance(ex, DocumentExistsException):
logger.warning(f"Document with ID '{doc_id}' already exists in the collection for stream {stream}")
else:
logger.error(f"Failed to upsert document '{doc_id}' for stream {stream}. Error: {ex}")
logger.info(f"Successfully loaded {len(batch)} documents for stream {stream}")
except CouchbaseException as e:
logger.error(f"Error occurred while loading documents for stream {stream}: {e}")
logger.error(f"Full exception details: {repr(e)}")
buffer.clear() # Clear the buffer after flushing
Slackbot
10/25/2024, 9:52 AMAnanth Kumar
10/25/2024, 10:25 AMYannick Sacherer
10/25/2024, 10:26 AMKaustav Ghosh
10/25/2024, 10:50 AMjava.lang.NullPointerException: Cannot invoke "io.airbyte.protocol.models.AirbyteGlobalState.getStreamStates()" because the return value of "io.airbyte.protocol.models.AirbyteStateMessage.getGlobal()" is null
at io.airbyte.workers.internal.bookkeeping.ParallelStreamStatsTracker.updateDestinationStateStats(ParallelStreamStatsTracker.kt:130) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.internal.syncpersistence.SyncPersistenceImpl.updateDestinationStateStats(SyncPersistence.kt:322) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.internal.bookkeeping.AirbyteMessageTracker.acceptFromDestination(AirbyteMessageTracker.kt:65) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.ReplicationWorkerHelper.internalProcessMessageFromDestination(ReplicationWorkerHelper.kt:443) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.ReplicationWorkerHelper.processMessageFromDestination(ReplicationWorkerHelper.kt:317) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:488) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$2(BufferedReplicationWorker.java:215) ~[io.airbyte-airbyte-commons-worker-1.1.0.jar:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Makar Emeliyanov
10/25/2024, 11:05 AMread_records
get called?Michael Schaidnagel
10/25/2024, 11:06 AMvalue
is not empty && cannot patch "airbyte-cron" with kind Deployment: Deployment.apps "airbyte-cron" is invalid: spec.template.s
pec.containers[0].env[9].valueFrom: Invalid value: "": may not be specified when value
is not empty && failed to create patch: The order in patch list:
│ [map[name:TRACKING_STRATEGY value:logging] map[name:TRACKING_STRATEGY valueFrommap[configMapKeyRefmap[key:TRACKING_STRATEGY name:airbyte-airbyte-env]]] map[name:SECRET_PERSISTENCE v
alue:<nil>]]
│ doesn't match $setElementOrder list:
│ [map[name:LOG_LEVEL] map[name:LOG4J_CONFIGURATION_FILE] map[name:AIRBYTE_API_HOST] map[name:AIRBYTE_VERSION] map[name:AIRBYTE_EDITION] map[name:AIRBYTE_URL] map[name:CONFIG_ROOT] map[
name:MICROMETER_METRICS_ENABLED] map[name:MICROMETER_METRICS_STATSD_FLAVOR] map[name:MICRONAUT_ENVIRONMENTS] map[name:SEGMENT_WRITE_KEY] map[name:STATSD_HOST] map[name:STATSD_PORT] map[
name:TRACKING_STRATEGY] map[name:WORKER_ENVIRONMENT] map[name:WORKSPACE_ROOT] map[name:WEBAPP_URL] map[name:TEMPORAL_HOST] map[name:JOB_MAIN_CONTAINER_CPU_REQUEST] map[name:JOB_MAIN_CON
TAINER_CPU_LIMIT] map[name:JOB_MAIN_CONTAINER_MEMORY_REQUEST] map[name:JOB_MAIN_CONTAINER_MEMORY_LIMIT] map[name:CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION] map[name:JOBS_DATABAS
E_MINIMUM_FLYWAY_MIGRATION_VERSION] map[name:KEYCLOAK_INTERNAL_HOST] map[name:CONNECTOR_BUILDER_SERVER_API_HOST] map[name:AIRBYTE_API_AUTH_HEADER_NAME] map[name:AIRBYTE_API_AUTH_HEADER_
VALUE] map[name:ENTERPRISE_SOURCE_STUBS_URL] map[name:SECRET_PERSISTENCE] map[name:S3_PATH_STYLE_ACCESS] map[name:STORAGE_TYPE] map[name:STORAGE_BUCKET_ACTIVITY_PAYLOAD] map[name:STORAG
E_BUCKET_LOG] map[name:STORAGE_BUCKET_STATE] map[name:STORAGE_BUCKET_WORKLOAD_OUTPUT] map[name:AZURE_STORAGE_CONNECTION_STRING] map[name:DATABASE_HOST] map[name:DATABASE_PORT] map[name:
DATABASE_DB] map[name:DATABASE_USER] map[name:DATABASE_PASSWORD] map[name:DATABASE_URL] map[name:TRACKING_STRATEGY]]
│ && cannot patch "airbyte-webapp" with kind Deployment: Deployment.apps "airbyte-webapp" is invalid: spec.template.spec.containers[0].env[0].valueFrom: Invalid value: "": may not be s
pecified when value
is not empty && failed to create patch: The order in patch list:
│ [map[name:TRACKING_STRATEGY value:logging] map[name:TRACKING_STRATEGY valueFrommap[configMapKeyRefmap[key:TRACKING_STRATEGY name:airbyte-airbyte-env]]] map[name:SECRET_PERSISTENCE v
alue:<nil>]]
│ doesn't match $setElementOrder list:
│ [map[name:AIRBYTE_VERSION] map[name:CONFIG_ROOT] map[name:LOG_LEVEL] map[name:LOG4J_CONFIGURATION_FILE] map[name:MICROMETER_METRICS_ENABLED] map[name:MICROMETER_METRICS_STATSD_FLAVOR]
map[name:SEGMENT_WRITE_KEY] map[name:STATSD_HOST] map[name:STATSD_PORT] map[name:TRACKING_STRATEGY] map[name:WORKSPACE_DOCKER_MOUNT] map[name:WORKSPACE_ROOT] map[name:LOCAL_ROOT] map[n
ame:WEBAPP_URL] map[name:TEMPORAL_HOST] map[name:TEMPORAL_WORKER_PORTS] map[name:JOB_KUBE_NAMESPACE] map[name:JOB_KUBE_SERVICEACCOUNT] map[name:JOB_MAIN_CONTAINER_CPU_REQUEST] map[name:
JOB_MAIN_CONTAINER_CPU_LIMIT] map[name:JOB_MAIN_CONTAINER_MEMORY_REQUEST] map[name:JOB_MAIN_CONTAINER_MEMORY_LIMIT] map[name:INTERNAL_API_HOST] map[name:WORKLOAD_API_HOST] map[name:WORK
LOAD_API_BEARER_TOKEN] map[name:CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION] map[name:JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION] map[name:METRIC_CLIENT] map[name:OTEL_COLLECT
OR_ENDPOINT] map[name:ACTIVITY_MAX_ATTEMPT] map[name:ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS] map[name:ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS] map[name:WORKFLOW_FAILURE_RES
TART_DELAY_SECONDS] map[name:SHOULD_RUN_NOTIFY_WORKFLOWS] map[name:MICRONAUT_ENVIRONMENTS] map[name:SECRET_PERSISTENCE] map[name:S3_PATH_STYLE_ACCESS] map[name:STORAGE_TYPE] map[name:ST
ORAGE_BUCKET_ACTIVITY_PAYLOAD] map[name:STORAGE_BUCKET_LOG] map[name:STORAGE_BUCKET_STATE] map[name:STORAGE_BUCKET_WORKLOAD_OUTPUT] map[name:AZURE_STORAGE_CONNECTION_STRING] map[name:DA
TABASE_HOST] map[name:DATABASE_PORT] map[name:DATABASE_DB] map[name:DATABASE_USER] map[name:DATABASE_PASSWORD] map[name:DATABASE_URL] map[name:CONTAINER_ORCHESTRATOR_ENABLED] map[name:T
RACKING_STRATEGY]]
│ && cannot patch "airbyte-workload-launcher" with kind Deployment: Deployment.apps "airbyte-workload-launcher" is invalid: spec.template.spec.containers[0].env[18].valueFrom: Invalid
value: "": may not be specified when value
is not emptyMichael Schaidnagel
10/25/2024, 11:07 AMAnthony LEBLEU
10/25/2024, 11:41 AMBrian Webb
10/25/2024, 12:07 PMYannick Sacherer
10/25/2024, 12:44 PMEric Gottschalk
10/25/2024, 1:17 PMHenrik Nilsson
10/25/2024, 1:54 PMYannick Sacherer
10/25/2024, 2:13 PMBrian Webb
10/25/2024, 2:53 PMYannick Sacherer
10/25/2024, 3:05 PMLubomyr Kachko
10/25/2024, 3:11 PMOisin McKnight
10/25/2024, 5:20 PMTarcisio Pacheco
10/25/2024, 7:19 PM