Alex Johnson
07/26/2025, 12:55 AMAlexei Kozhushkov
07/29/2025, 9:41 AMDpathExtractor
yaml config:
• Given following array as input
[{
"name": {
"common": "Comoros",
"official": "Union of the Comoros",
"nativeName": {
"ara": {
"official": "الاتحاد القمري",
"common": "القمر"
},
"fra": {
"official": "Union des Comores",
"common": "Comores"
},
"zdj": {
"official": "Udzima wa Komori",
"common": "Komori"
}
}
},
"cca2": "KM",
"cca3": "COM"
}]
• how would one extract
[{
"name": { "common": "Comoros" },
"common_name": "Comoros",
"cca2": "KM",
"cca3": "COM"
}]
Please advise 🙏Olivia Natasha
07/29/2025, 3:51 PMMohith
07/31/2025, 9:46 AMARRAY
data type in our PostgreSQL source are being read as strings when synced to our Snowflake destination using the PostgreSQL connector.
Is this expected behavior? If not, could anyone familiar with the PostgreSQL connector share how they’ve handled array-type columns in similar setups?
Any insights or suggestions would be greatly appreciated.
Thanks in advance!Prajjval Mishra
07/31/2025, 6:11 PMAlexei Kozhushkov
08/01/2025, 7:21 AMMorgan Kerle
08/05/2025, 12:27 AMAlexei Kozhushkov
08/05/2025, 9:17 AM<https://allegro.pl/auth/oauth/authorize?response_type=code&client_id=a21...6be&redirect_uri=http://exemplary.redirect.uri>
• Token
curl -X POST \
<https://allegro.pl/auth/oauth/token> \
-H 'Authorization: Basic base64(clientId:secret)' \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'grant_type=authorization_code&code=pOPEy9Tq94aEss540azzC7xL6nCJDWto&redirect_uri=<http://exemplary.redirect.uri>'
Thank you!Grivine Ochieng'
08/05/2025, 2:54 PM{
"url": "<https://backstage.taboola.com/backstage/api/1.0/sinoinc-nal-plaudus-sc/reports/campaign-summary/dimensions/day?start_date=2025-01-01&end_date=2025-08-05>",
"headers": {
"User-Agent": "python-requests/2.32.4",
"Accept-Encoding": "gzip, deflate",
"Accept": "*/*",
"Connection": "keep-alive",
"Authorization": "Bearer ****"
},
"http_method": "GET",
"body": ""
}
Response payload:
{"status": 200,
"body": {
"last-used-rawdata-update-time": "2025-08-05 05:00:00.0",
"last-used-rawdata-update-time-gmt-millisec": 1754395200000,
"timezone": "PDT",
"results": [
{
"date": "2025-08-05 00:00:00.0",
"date_end_period": "2025-08-05 00:00:00.0",
"clicks": 628,
"impressions": 271757,
"visible_impressions": 147123,
"spent": 105.26,
"conversions_value": 0,
"roas": 0,
"roas_clicks": 0,
"roas_views": 0,
"ctr": 0.2310888036002752,
"vctr": 0.4268537210361398,
"cpm": 0.39,
"vcpm": 0.72,
"cpc": 0.168,
"campaigns_num": 20,
"cpa": 6.192,
"cpa_clicks": 6.579,
"cpa_views": 105.264,
"cpa_actions_num": 17,
"cpa_actions_num_from_clicks": 16,
"cpa_actions_num_from_views": 1,
"cpa_conversion_rate": 2.7070063694267517,
"cpa_conversion_rate_clicks": 2.5477707006369426,
"cpa_conversion_rate_views": 0.1592356687898089,
"currency": "USD"
},
Carmela Beiro
08/05/2025, 3:54 PMsource-declarative-manifest
as the base and copying the manifest.yaml generated with the Custom Builder UI? Can't find documentation about itMateo Colina
08/05/2025, 6:27 PMSebastian Miranda
08/07/2025, 8:25 PMPatrick McCoy
08/11/2025, 4:12 PMOfek Eliahu
08/12/2025, 2:46 PM{
"access_token": "c97d1fe52119f38c7f67f0a14db68d60caa35ddc86fd12401718b649dcfa9c68",
"token_type": "bearer",
"expires_in": 7200,
"refresh_token": "803c1fd487fec35562c205dac93e9d8e08f9d3652a24079d704df3039df1158f",
"created_at": 1628711391
}
To re-authenticate, I need to use the new refresh token from the response. I’m using the SingleUseRefreshTokenOauth2Authenticator
to handle this, which saves the new refresh_token
, access_token
, and expire_time
in memory for the next authentication.
The issue is that while these config values are correctly saved and used in memory, they are not being persisted in storage for future runs. When I create a new source, a validation check is performed, which passes and creates the source. However, after this check, the OAuth refresh token becomes invalid, and the new one isn’t saved to storage. As a result, I can’t create a new connection based on this source since the refresh token isn’t being updated in storage.
Is anyone faced this issue before and knows how to solve it?Will Skelton
08/12/2025, 3:30 PMColin
08/16/2025, 1:15 PMAidan Lister
08/17/2025, 9:02 PMdocker run --rm \
-v $(pwd)/connector.yaml:/airbyte/connector.yaml \
-v $(pwd)/config.json:/config.json \
airbyte/source-yaml-connector:latest \
check --config /config.json
Unable to find image 'airbyte/source-yaml-connector:latest' locally
docker: Error response from daemon: pull access denied for airbyte/source-yaml-connector, repository does not exist or may require 'docker login': denied: requested access to the resource is denied
Anthony
08/18/2025, 10:16 AMRafal Fronczyk
08/18/2025, 4:28 PMAirbyteStateMessage
that the destination emits. With a regular RabbitMQ queue (where messages are removed on ack), that makes it hard to ack only after the destination has safely persisted the record. If the source acks early and something fails downstream, messages could be lost.
• RabbitMQ Streams might be a better fit because they don’t delete messages on read. The source could emit state with an incremental offset, and the destination could emit state once writes are durable. On recoveries, the source would resume from the last committed offset without relying on RabbitMQ acks.
◦ Open question: should the destination validate monotonic offsets / detect gaps (“holes”) to guard against missed messages, or does Airbyte’s internal delivery/ordering guarantee make that unnecessary in practice?
Replay / backfill:
Is it supported (and recommended) to “replay” from a specific timestamp/offset by editing the connection state in the UI/API? My understanding is that editing state is possible; is this the right mechanism for time/offset-based reprocessing with Streams?
Does this line of thinking make sense? In your experience, is Airbyte a good fit for this use case, and are there any challenges you’d foresee with this approach?Jacob Dunning
08/18/2025, 11:25 PMCarolina Buckler
08/19/2025, 7:32 PMCarmela Beiro
08/20/2025, 2:41 PMlookback_window
of 32243
into the state that was not specified. Can this be changed?Dennis Zwier
08/21/2025, 6:53 AMInternal Server Error: java.net.SocketTimeoutException: timeout
What’s confusing is that sometimes the exact same flow works (even 10+ times in a row), and then suddenly the error pops up again during the polling phase. It looks like Airbyte is not even making the polling request when this happens, as if it times out internally before sending it.
We already tried increasing the polling timeout significantly (up to 2000 minutes), but the error still appears, especially overnight runs.
Has anyone experienced similar issues with the Amazon Ads connector (or other async-reporting APIs) where Airbyte gives a SocketTimeoutException
even though the API itself is responsive? Could this be related to Airbyte’s internal HTTP client or how long it keeps connections alive?
Any insights, workarounds, or configuration tips would be really appreciated.
Thanks!yingting
08/22/2025, 4:35 PMio.airbyte.cdk.integrations.source.relationaldb.state.FailedRecordIteratorException: java.lang.RuntimeException: java.lang.RuntimeException: org.postgresql.util.PSQLException: ERROR: feature not supported on beam relations
. This does not happen when i am extracting from a normal table.
If i were to develop a fork of postgres connector that works with this beam engine, can anyone point me to where to start?Thomas Niederberger
08/23/2025, 11:42 PMChinthana Jayasekara
08/26/2025, 10:21 PMlenin
08/27/2025, 3:56 AMSource: Custom Report API (custom connector)
Sync Requirements:
First sync: Full Refresh | Overwrite (pull all historical data)
Subsequent syncs: Append only (pull only previous fiscal year data)
Using Basic Normalization
Tracking sync state in external database (sync_completed_date)
Sample Code
class Report(ReportStream):
report_name = "Report"
primary_key = ""
def path(self, ...):
if not self.sync_completed_date:
# First sync - get all historical data
start_year = date.today().year - 30
start_date = f"{start_year}-01-01"
else:
# Subsequent syncs - get only previous fiscal year data
start_date, end_date = self.get_fiscal_year_dates(self.fiscal_month)`
return (
f"/reports/{self.report_name}&"
f"start_date={start_date}&"
f"end_date={end_date}&"
f"columns={self.output_columns}"
)
Issue
When I set the stream to "Full Refresh | Overwrite" in the UI:
First sync works correctly (pulls all historical data)
Second sync:
Connector correctly pulls only previous fiscal year data
But normalization still performs overwrite, clearing all historical data
Results in losing data from before the previous fiscal year
Question
How can I control normalization behavior to append data after first sync, even when UI is set to "Full Refresh | Overwrite"?
Is there a recommended pattern for implementing this kind of mixed sync behaviour (full refresh first, then always append)?
Aymen NEGUEZ
09/01/2025, 10:02 AMAnder Aranburu
09/01/2025, 3:12 PMsumit raina
09/02/2025, 8:28 AMimport airbyte as ab
from airbyte.cloud import CloudWorkspace
from airbyte.cloud.connectors import CloudSource
from airbyte_api import Workspaces
from anyio.to_interpreter import run_sync
source: ab.Source = ab.get_source("source-hubspot",version="5.8.20", docker_image=True, install_if_missing=True)
source.set_config(
config={
"credentials": {
"credentials_title": "Private App Credentials",
"access_token": "[token]"
}
}
)
source.select_streams(streams=["contacts"])
source.read()
# Optionally, set a custom cursor field
source.set_cursor_key("contacts", "updatedAt")
#read_result = source.read()
destination:ab.Destination = ab.get_destination("destination-s3")
destination.set_config(
config={
"s3_bucket_region": "us-east-1",
"format": {
"format_type": "JSONL",
"flattening": "No flattening"
},
"access_key_id": "ACCESS KEY",
"secret_access_key": "SECRET KEY",
"s3_bucket_name": "sumit-digi",
"s3_bucket_path": "data_sync/hubspot_contacts/data",
"file_name_pattern": "{date:yyyy_MM}_{timestamp}_{part_number}.{sync_id}"
}
)
workspace = CloudWorkspace(client_secret="secret",
client_id="client id",
workspace_id="workspace id",
api_root="<https://api.airbyte.com/v1>",
)
import uuid
cloudsource:CloudSource = workspace.deploy_source(source=source,name="sumit-final")
clouddest = workspace.deploy_destination(destination=destination, name="raina-final")
conn = workspace.deploy_connection(connection_name="sumit-raina-vastika",source=cloudsource,selected_streams=["contacts"],destination=clouddest)
conn.run_sync()
print("connection created")
Here we can select streams and set to source
source.select_streams(streams=["contacts"])