Airbyte ingestion as code#

This feature is considered experimental.

This guide provides an introduction to using Dagster to configure your Airbyte connections. This allows you to centralize the configuration for your data stack, specifying configuration in Python code. You can check-in and version your config with version control or programmatically generate config for more complex use cases.


Prerequisites#

To use this feature, you'll need to install the dagster-airbyte and dagster-managed-elements libraries:

pip install dagster-airbyte dagster-managed-elements

Available as dagster-me, the dagster-managed-elements library includes the base config reconciliation APIs and a CLI.


Step 1: Define a reconciler#

The config for your Airbyte instance is specified in an AirbyteManagedElementReconciler, which is pointed at a specific Airbyte instance using an Airbyte resource. The config is also provided with a list of connections to reconcile, which we'll set up later in the guide.

from dagster_airbyte import AirbyteManagedElementReconciler, airbyte_resource

airbyte_instance = airbyte_resource.configured(
    {
        "host": "localhost",
        "port": "8000",
        # If using basic auth, include username and password:
        "username": "airbyte",
        "password": {"env": "AIRBYTE_PASSWORD"},
    }
)

airbyte_reconciler = AirbyteManagedElementReconciler(
    airbyte=airbyte_instance,
    connections=[],
)

For more info on setting up an Airbyte resource, refer to the Airbyte guide. Additional configuration options for the reconciler are detailed below.

For more information on setting up secrets from the environment, refer to the Environment variables and secrets guide.


Step 2: Define sources and destinations#

Next, we'll define our sources and destinations. Sources and destinations can be constructed manually using the AirbyteSource and AirbyteDestination classes, respectively, but dagster-airbyte also provides generated, typed classes for specific source and destination types. Refer to the Airbyte API docs for the properties required to configure each source and destination type.

In this example, we'll configure a source that reads from a hosted CSV file and a destination that writes it to a local JSON file. To do this, we'll import the generated classes for the File source and Local JSON destination:

from dagster_airbyte.managed.generated.sources import FileSource
from dagster_airbyte.managed.generated.destinations import LocalJsonDestination

cereals_csv_source = FileSource(
    name="cereals-csv",
    url="https://docs.dagster.io/assets/cereal.csv",
    format="csv",
    provider=FileSource.HTTPSPublicWeb(),
    dataset_name="cereals",
)

local_json_destination = LocalJsonDestination(
    name="local-json",
    destination_path="/local/cereals_out.json",
)

Step 3: Define a connection#

Next, we'll define a connection between the source and destination using the AirbyteConnection class:

from dagster_airbyte import AirbyteConnection, AirbyteSyncMode

cereals_connection = AirbyteConnection(
    name="download-cereals",
    source=cereals_csv_source,
    destination=local_json_destination,
    stream_config={"cereals": AirbyteSyncMode.full_refresh_overwrite()},
)

Then, we'll supply the new connection to the reconciler we defined in Step 1:

airbyte_reconciler = AirbyteManagedElementReconciler(
    airbyte=airbyte_instance,
    connections=[cereals_connection],
)

Step 4. Validate changes#

Next, we'll use the dagster-me CLI to sanity-check our reconciler and apply any changes.

The check command prints out differences between the current state of the Airbyte instance and the desired state specified in the reconciler. To invoke the CLI, point it at a module containing the reconciler:

dagster-me check --module my_python_module.my_submodule:reconciler

Found 1 reconciler, checking...
+ cereals-csv:
  + url: https://docs.dagster.io/assets/cereal.csv
  + format: csv
  + dataset_name: cereals
  + provider:
    + user_agent: False
    + storage: HTTPS
+ local-json:
  + destination_path: /local/cereals_out.json
+ download-cereals:
  + source: cereals-csv
  + destination: local-json
  + normalize data: None
  + streams:
    + cereals: FULL_REFRESH_OVERWRITE

Step 5. Apply changes#

As the changes printed out by the check command in the previous step look like what we expect, we can now apply them:

dagster-me apply --module my_python_module.my_submodule:reconciler

Now, we should see our new connection in the Airbyte UI:

instance-overview

Step 6. Load connections into Dagster#

To load managed connections into Dagster, use the load_assets_from_connections utility method. This functions similarly to load_assets_from_airbyte_instance, but validates that the connections passed in match the connections defined in your Airbyte instance:

from dagster_airbyte import load_assets_from_connections, airbyte_resource

airbyte_instance = airbyte_resource.configured(
    {
        "host": "localhost",
        "port": 8000,
        # If using basic auth, include username and password:
        "username": "airbyte",
        "password": {"env": "AIRBYTE_PASSWORD"},
    }
)

airbyte_assets = load_assets_from_connections(
    airbyte=airbyte_instance, connections=[cereals_connection]
)

For more info on managing Airbyte assets in Dagster, refer to the Airbyte guide.


Additional configuration options#

The Airbyte reconciler also supports some additional configuration options, which can be passed to the AirbyteManagedElementReconciler constructor.

By default, the reconciler will not modify any sources, destinations, or connections which are outside of those specified in the reconciler. This allows you to adopt the reconciler incrementally, without having to reconcile all of your existing Airbyte configuration.

To reconcile all of your existing Airbyte configuration, pass delete_unmentioned_resources=True to the reconciler constructor:

airbyte_reconciler = AirbyteManagedElementReconciler(
    airbyte=airbyte_instance, connections=[...], delete_unmentioned_resources=True
)

This tells the reconciler to clean up any sources, destinations, or connections which are not explicitly defined in Python code.