Ian Hansen
07/23/2025, 4:26 PMkapa.ai
07/23/2025, 4:26 PMpython from dagster import job from dagster_airbyte import airbyte_resource, airbyte_sync_op # Configure the Airbyte resource with your Airbyte instance details my_airbyte_resource = airbyte_resource.configured( { "host": {"env": "AIRBYTE_HOST"}, "port": {"env": "AIRBYTE_PORT"}, } ) # Configure the first Airbyte sync op sync_first = airbyte_sync_op.configured( {"connection_id": "first-connection-uuid"}, name="sync_first" ) # Configure the second Airbyte sync op sync_second = airbyte_sync_op.configured( {"connection_id": "second-connection-uuid"}, name="sync_second" ) @job(resource_defs={"airbyte": my_airbyte_resource}) def my_sequential_airbyte_job(): first_result = sync_first() sync_second(first_result)
In this setup: - sync_first
triggers the first Airbyte sync. - sync_second
will only run after sync_first
completes, because it takes the output of sync_first
as its input, establishing the dependency. Replace "first-connection-uuid"
and "second-connection-uuid"
with your actual Airbyte connection IDs. You can find these in the Airbyte UI by clicking on a connection and copying the last part of the URL. To execute the job, run: dagster job execute -f your_dagster_file.py
This approach ensures that the second sync only starts after the first one has finished, as required by your dependency chain. For more details, see the [official Dagster integration documentation](https://docs.airbyte.com/platform/operator-guides/using-dagster-integration) and the [tutorial on orchestrating Airbyte pipelines with Dagster](https://airbyte.com/tutorials/orchestrate-data-ingestion-and-transformation-pipelines#step-4).Ian Hansen
07/23/2025, 4:27 PMkapa.ai
07/23/2025, 4:27 PMkapa.ai
07/23/2025, 4:27 PMload_assets_from_connections
or load_assets_from_airbyte_instance
functions from the dagster_airbyte
library. This approach treats each Airbyte connection as a Dagster asset, allowing you to specify dependencies between them. Here’s an example of how to define two Airbyte syncs as assets, with the second depending on the first: python from dagster_airbyte import load_assets_from_connections, AirbyteResource from dagster import Definitions, AssetKey # Define your Airbyte instance airbyte_instance = AirbyteResource( host="localhost", port="8000", ) # Define the two Airbyte connections as assets first_connection = {"connection_id": "first-connection-uuid"} second_connection = {"connection_id": "second-connection-uuid"} first_asset = load_assets_from_connections( airbyte=airbyte_instance, connections=[first_connection], key_prefix=["airbyte"], ) second_asset = load_assets_from_connections( airbyte=airbyte_instance, connections=[second_connection], key_prefix=["airbyte"], # Specify dependency on the first asset upstream_assets={AssetKey(["airbyte", "first-connection-uuid"])} ) defs = Definitions( assets=[first_asset, second_asset] )
In this setup: - Each Airbyte connection is loaded as a Dagster asset. - The second asset (second_asset
) is configured to depend on the first asset by specifying upstream_assets
. - When you materialize the second asset in Dagster, it will ensure the first sync runs and completes successfully before starting the second. For more details and examples, see the [official Airbyte tutorial on configuring Airbyte connections as Dagster assets](https://airbyte.com/tutorials/configure-airbyte-with-python-dagster) and the [AI stack tutorial](https://airbyte.com/tutorials/ai-stack-with-airbyte-langchain-and-dagster). > Note: The exact parameter for specifying dependencies (upstream_assets
) may require adjustment based on your Dagster version and the API. The knowledge sources provide the general approach, but do not show a full example with explicit asset dependencies between Airbyte connections. If you need more advanced dependency management, you may need to consult the [Dagster documentation](https://docs.dagster.io/concepts/assets/software-defined-assets) for the latest API details.Ian Hansen
07/23/2025, 4:28 PMkapa.ai
07/23/2025, 4:28 PMkapa.ai
07/23/2025, 4:28 PMdagster dev
2. Access the Dagster UI Open your browser and go to [http://127.0.0.1:3000](http://127.0.0.1:3000). You should see your Airbyte assets listed. 3. Materialize Assets - In the Dagster UI, you can select both assets and click "Materialize". - If you select the downstream asset (the one that depends on the first), Dagster will automatically materialize both in the correct order. This process is described in several tutorials, where after defining Airbyte assets, you use the Dagster UI to "Materialize all" or select specific assets to run. Dagster handles the orchestration and dependencies for you, so you do not need to define a separate job for running the assets together—materializing the downstream asset will trigger the upstream one as needed [Building an E-commerce Data Pipeline: A Hands-On Guide to Using Airbyte, dbt, Dagster, and BigQuery](https://airbyte.com/tutorials/building-an-e-commerce-data-pipeline-a-hands-on-guide-to-using-airbyte-dbt-dagster-and-bigquery), [Customer Segmentation Analytics Stack With Shopify, Airbyte, dbt, Dagster and BigQuery](https://airbyte.com/tutorials/customer-segmentation-analytics-stack-with-shopify-airbyte-dbt-dagster-and-bigquery). If you prefer the CLI, you can use Dagster's asset materialization commands, but the UI is the most straightforward way to trigger and monitor asset runs. > The knowledge sources do not provide a code example for running these assets as a job, but they do confirm that materializing assets in the Dagster UI is the standard approach.Ian Hansen
07/23/2025, 4:30 PMkapa.ai
07/23/2025, 4:30 PMkapa.ai
07/23/2025, 4:31 PMpython from dagster_airbyte import load_assets_from_connections, AirbyteResource airbyte_instance = AirbyteResource( host="localhost", port="8000", ) first_asset = load_assets_from_connections( airbyte=airbyte_instance, connections=[{"connection_id": "first-connection-uuid"}], key_prefix=["airbyte"], ) second_asset = load_assets_from_connections( airbyte=airbyte_instance, connections=[{"connection_id": "second-connection-uuid"}], key_prefix=["airbyte"], # (Dependency management is handled by Dagster if you set up upstream assets) ) from dagster import Definitions defs = Definitions( assets=[first_asset, second_asset] )
Once these assets are defined, you use the Dagster UI to materialize them, and Dagster will handle the execution order based on dependencies [Configure Airbyte Connections with Python (Dagster)](https://airbyte.com/tutorials/configure-airbyte-with-python-dagster). There is no example in the knowledge sources of wrapping these assets in a custom job and then including that job in Definitions. The asset-based orchestration is the recommended and documented approach. If you need to run both assets together, you simply materialize the downstream asset (or both) in the Dagster UI, and Dagster will ensure the correct execution order. If you need more advanced orchestration (such as custom jobs that include assets), the knowledge sources do not provide enough information to show how to do this with Airbyte assets. Summary: I'm sorry, but the knowledge sources do not contain enough information to show how to package two Airbyte sync assets into a custom Dagster job and include that job in a Definitions object. The documented approach is to define Airbyte syncs as assets and use Dagster's asset materialization features to orchestrate them.Ian Hansen
07/23/2025, 4:32 PMkapa.ai
07/23/2025, 4:32 PMkapa.ai
07/23/2025, 4:32 PMload_assets_from_connections
. This makes each sync a first-class citizen in Dagster’s asset graph. 2. Dependency Management: If you specify dependencies between assets (for example, making one Airbyte sync depend on another), Dagster will understand the execution order. When you materialize the downstream asset, Dagster will automatically materialize any upstream assets first. 3. Materialization via UI: In the Dagster UI, you can select one or more assets and click "Materialize". If you select the downstream asset, Dagster will trigger the upstream asset(s) first, ensuring the correct order of execution. This is described in multiple tutorials, where you are instructed to "Materialize all" or select specific assets, and Dagster handles the orchestration for you [Building an E-commerce Data Pipeline: A Hands-On Guide to Using Airbyte, dbt, Dagster, and BigQuery](https://airbyte.com/tutorials/building-an-e-commerce-data-pipeline-a-hands-on-guide-to-using-airbyte-dbt-dagster-and-bigquery), [Customer Segmentation Analytics Stack With Shopify, Airbyte, dbt, Dagster and BigQuery](https://airbyte.com/tutorials/customer-segmentation-analytics-stack-with-shopify-airbyte-dbt-dagster-and-bigquery). 4. Lineage Visualization: The UI also provides a "global asset lineage" view, letting you see how your Airbyte assets (and any other assets, like dbt models) are connected and how data flows through your pipeline. 5. No Custom Job Required: You do not need to define a custom job to orchestrate asset execution. Dagster’s asset materialization system will handle the orchestration based on the dependencies you define between assets. Summary: By defining your Airbyte syncs as assets and specifying dependencies, Dagster’s asset materialization features will automatically orchestrate their execution in the correct order when you trigger materialization from the UI or CLI. This approach is the standard and recommended way to manage Airbyte syncs in Dagster according to the knowledge sources.Ian Hansen
07/23/2025, 4:33 PMkapa.ai
07/23/2025, 4:33 PMkapa.ai
07/23/2025, 4:33 PMIan Hansen
07/23/2025, 6:11 PMkapa.ai
07/23/2025, 6:11 PMkapa.ai
07/23/2025, 6:12 PMIan Hansen
07/23/2025, 6:13 PMIan Hansen
07/23/2025, 6:13 PMkapa.ai
07/23/2025, 6:13 PMkapa.ai
07/23/2025, 6:13 PMairbyte_sync_op
. According to the knowledge sources, when you use airbyte_sync_op
in your Dagster job, it will trigger the Airbyte sync and wait until it completes. The op will fail if the Airbyte sync fails, causing the Dagster job to fail as well. This means that the success or failure of the Airbyte sync is automatically checked by Dagster when using this op. Here’s an example from the documentation: python from dagster import job from dagster_airbyte import airbyte_resource, airbyte_sync_op my_airbyte_resource = airbyte_resource.configured( { "host": {"env": "AIRBYTE_HOST"}, "port": {"env": "AIRBYTE_PORT"}, } ) sync_foobar = airbyte_sync_op.configured({"connection_id": "your-connection-uuid"}, name="sync_foobar") @job(resource_defs={"airbyte": my_airbyte_resource}) def my_simple_airbyte_job(): sync_foobar()
When this job runs, Dagster will only mark the op as successful if the Airbyte sync completes successfully. If the sync fails, the op (and thus the job) will fail, and you can see this in the Dagster UI or logs. This behavior is described in the [official Dagster integration documentation](https://docs.airbyte.com/platform/operator-guides/using-dagster-integration). If you are using software-defined assets (SDAs) with Airbyte, the same principle applies: materializing the asset will fail if the underlying Airbyte sync fails. Summary: If you use Dagster’s Airbyte integration ops (airbyte_sync_op
or assets loaded via dagster_airbyte
), Dagster will automatically check for Airbyte sync success and fail the pipeline if the sync fails. You do not need to add extra checks—this is handled for you by the integration. If you want to see more details, you can always check the Airbyte job logs in the Airbyte UI for further troubleshooting [Using the Dagster Integration](https://docs.airbyte.com/platform/operator-guides/using-dagster-integration).Ian Hansen
07/23/2025, 10:02 PMIan Hansen
07/23/2025, 10:02 PMkapa.ai
07/23/2025, 10:02 PMkapa.ai
07/23/2025, 10:02 PMairbyte_resource
in Dagster, where you can specify the username
and password
fields. The password can be securely provided via an environment variable (e.g., AIRBYTE_PASSWORD
). Here’s how you can rewrite the example to include authorization: python from dagster import job from dagster_airbyte import airbyte_resource, airbyte_sync_op my_airbyte_resource = airbyte_resource.configured( { "host": "localhost", "port": "8000", "username": "airbyte", "password": {"env": "AIRBYTE_PASSWORD"}, } ) sync_foobar = airbyte_sync_op.configured( {"connection_id": "your-connection-uuid"}, name="sync_foobar" ) @job(resource_defs={"airbyte": my_airbyte_resource}) def my_simple_airbyte_job(): sync_foobar()
Note: - Set the AIRBYTE_PASSWORD
environment variable on your machine before running Dagster. - The default username is usually airbyte
, and the default password is password
for local instances, but use your actual credentials if they differ. This approach is demonstrated in the [Configure Airbyte Connections with Python (Dagster)](https://airbyte.com/tutorials/configure-airbyte-with-python-dagster) tutorial.Ian Hansen
07/23/2025, 10:05 PMkapa.ai
07/23/2025, 10:05 PMkapa.ai
07/23/2025, 10:05 PMabctl local credentials
command if you are running Airbyte locally. If you are running Airbyte on Kubernetes, you can retrieve the credentials from the airbyte-auth-secrets
secret and decode them as described in the documentation [Authentication](https://docs.airbyte.com/platform/deploying-airbyte/integrations/authentication). If you are receiving an "unauthorized" message, it is recommended to: - Double-check that you are using the correct username and password. - Retrieve the current credentials using the appropriate method for your deployment. - Ensure that the credentials are correctly passed to Dagster, for example, via environment variables. If the credentials are correct and you still receive an unauthorized error, you may want to try logging into the Airbyte UI directly with those credentials to confirm they work outside of Dagster. Unfortunately, the knowledge sources do not mention a specific "validate connection" feature for authentication beyond these steps.Ian Hansen
07/23/2025, 10:13 PMkapa.ai
07/23/2025, 10:13 PMkapa.ai
07/23/2025, 10:13 PM/api/public/v1/workspaces
endpoint on your EC2 machine where Airbyte is running. Steps: 1. Ensure Airbyte is accessible: Make sure the airbyte-server
service is running and accessible from your network. If you are running Airbyte on EC2, you may need to open the appropriate ports in your security group. 2. Obtain an access token: For Airbyte OSS, you need to authenticate using an access token. First, create an application in the Airbyte UI (Settings → Account → Applications) to get a client_id
and client_secret
. Then, request an access token: bash curl -X POST "http://<YOUR_EC2_PUBLIC_IP>:8000/api/v1/applications/token" \ -H "Content-Type: application/json" \ -d '{"client_id": "<YOUR_CLIENT_ID>", "client_secret": "<YOUR_CLIENT_SECRET>"}'
The response will include an access_token
. 3. List workspaces using the public API: Use the access token to call the public API endpoint: bash curl -X GET "http://<YOUR_EC2_PUBLIC_IP>:8000/api/public/v1/workspaces" \ -H "Authorization: Bearer <YOUR_ACCESS_TOKEN>" \ -H "Content-Type: application/json"
This will return a list of your workspaces. Note: - Replace <YOUR_EC2_PUBLIC_IP>
, <YOUR_CLIENT_ID>
, <YOUR_CLIENT_SECRET>
, and <YOUR_ACCESS_TOKEN>
with your actual values. - The /api/public/v1
path prefix is required for OSS users [API documentation](https://docs.airbyte.com/platform/api-documentation). For more details, see the official documentation on [Configuring API Access](https://docs.airbyte.com/platform/using-airbyte/configuring-api-access) and [API documentation](https://docs.airbyte.com/platform/api-documentation).Ian Hansen
07/23/2025, 10:14 PMkapa.ai
07/23/2025, 10:14 PMkapa.ai
07/23/2025, 10:14 PMbash ssh -L 8000:<ec2-private-ip>:8000 ec2-user@<ec2-public-ip> -i <your-key.pem>
You can then make API requests to <http://localhost:8000>
on your local machine, and they will be forwarded to the Airbyte server running on your EC2 instance. So, to list your workspaces, you would: 1. Obtain an access token: bash curl -X POST "<http://localhost:8000/api/v1/applications/token>" \ -H "Content-Type: application/json" \ -d '{"client_id": "<YOUR_CLIENT_ID>", "client_secret": "<YOUR_CLIENT_SECRET>"}'
2. List workspaces: bash curl -X GET "<http://localhost:8000/api/public/v1/workspaces>" \ -H "Authorization: Bearer <YOUR_ACCESS_TOKEN>" \ -H "Content-Type: application/json"
This approach is confirmed in the knowledge sources, where users set up SSH tunnels to access the Airbyte UI and APIs via localhost
on their local machine [Octavia CLI connect to Airbyte instance through bastion host](https://discuss.airbyte.io/t/1546), [Unable to access Airbyte UI](https://discuss.airbyte.io/t/1314). Just ensure your SSH tunnel is active and mapped to the correct port.Ian Hansen
07/23/2025, 10:18 PMkapa.ai
07/23/2025, 10:18 PMkapa.ai
07/23/2025, 10:18 PMconnectionId
in your API request. After setting up your SSH tunnel and obtaining an access token as described earlier, you can trigger a sync for a specific connection with the following command: bash curl -X POST "<http://localhost:8000/api/v1/connections/sync>" \ -H "Authorization: Bearer <YOUR_ACCESS_TOKEN>" \ -H "Content-Type: application/json" \ -d '{"connectionId":"<YOUR_CONNECTION_ID>"}'
Replace <YOUR_ACCESS_TOKEN>
with the token you obtained, and <YOUR_CONNECTION_ID>
with the ID of the Airbyte connection you want to trigger. You can find the connection ID in the Airbyte UI URL when viewing the connection details. This method is confirmed in the tutorials, which show how to trigger a sync for a specific connection using the API by providing the connectionId
in the request body [How to use Airflow and Airbyte together](https://airbyte.com/tutorials/how-to-use-airflow-and-airbyte-together).Ian Hansen
07/23/2025, 10:19 PMkapa.ai
07/23/2025, 10:19 PMkapa.ai
07/23/2025, 10:20 PMaccess_token
using your client_id
and client_secret
, the token is a Bearer Token that is valid for a limited time: - In most recent Airbyte OSS and Self-Managed deployments, the access_token
is valid for 3 minutes. After it expires, you must request a new token from the /api/v1/applications/token
endpoint to continue making API requests ([Using Airbyte: Configuring API Access](https://docs.airbyte.com/platform/using-airbyte/configuring-api-access)). - In some Self-Managed Enterprise deployments, the token may be valid for 60 minutes ([Enterprise Setup: API Access](https://docs.airbyte.com/platform/enterprise-setup/api-access-config)). Always check the documentation for your specific Airbyte version and deployment type to confirm the token lifetime. > Access tokens are short-lived (3 minutes) so we recommend fetching a new token before each request. > — [Using Airbyte: Configuring API Access](https://docs.airbyte.com/platform/using-airbyte/configuring-api-access) Once the token expires, you must generate a new one to continue accessing the API.