engineering

Using CloudQuery Within Apache Airflow

Mariano Gappa

Mariano Gappa

If your team's workflow is based on Apache Airflow, introducing CloudQuery syncs into it is simple!
In this blog post, we'll go through setting up a basic Airflow DAG that syncs the whole catalog of XKCD comics into a local SQLite database. This DAG can be run entirely locally, and you don't need to pre-download CloudQuery, SQLite or log in to anything.
We're gonna define two tasks in our DAG:
  • Download CloudQuery binary: it will inspect the host's OS & ARCH and get the appropriate binary for the supplied version.
  • Run the Sync: it will read our spec and run the CloudQuery sync.
The DAG will require two parameters:
  1. The version of the CloudQuery binary. It defaults to the current latest release: v6.4.1.
  2. The path to the sync's specification file. It defaults to the folder where the dag file is.
Let's use this basic spec, which uses XKCD as a source and SQLite as a destination
---
kind: source
spec:
  name: "xkcd"
  path: "cloudquery/xkcd"
  version: "v1.0.6"
  tables: ['*']
  destinations:
    - "sqlite"
  spec:
---
kind: destination
spec:
  name: sqlite
  path: cloudquery/sqlite
  registry: cloudquery
  version: "v2.9.4"
  spec:
    # Make sure to use an absolute path here, because it's
    # unlikely that Airflow will run this job as your user.
    connection_string: "/your/user/path/Desktop/newdatabase.db"
The code required to run the sync is surprisingly simple:
# Task to run the CloudQuery sync command
@task(
  dag=dag,
  task_id='run_xkcd_to_sqlite_sync',
  doc_md="""### Run XKCD to SQLite Sync
This task runs the CloudQuery sync command using the provided `spec_file_path`
parameter to sync XKCD data into a SQLite database.
It uses the path of the CloudQuery binary from the previous task.
""")
def run_xkcd_to_sqlite_sync(spec_file_path: str, cloudquery_path: str):
    result = subprocess.run(
      [cloudquery_path, 'sync', spec_file_path], capture_output=True
    )
    if result.returncode != 0:
        raise Exception(
          f"CloudQuery sync failed with return code {result.returncode}." + 
          " Output: {result.stdout.decode()}"
        )
Upon running the DAG in the Airflow UI, the download task succeeds:
You can visualize the DAG and wait until it completes:
And voilà! The whole XKCD catalog is in our local SQLite database!
Subscribe to product updates

Be the first to know about new features.