engineering
Using CloudQuery Within Apache Airflow
Mariano Gappa •
If your team's workflow is based on Apache Airflow, introducing CloudQuery syncs into it is simple!
In this blog post, we'll go through setting up a basic Airflow DAG that syncs the whole catalog of XKCD comics into a local SQLite database. This DAG can be run entirely locally, and you don't need to pre-download CloudQuery, SQLite or log in to anything.
We're gonna define two tasks in our DAG:
- Download CloudQuery binary: it will inspect the host's OS & ARCH and get the appropriate binary for the supplied version.
- Run the Sync: it will read our spec and run the CloudQuery sync.
The DAG will require two parameters:
- The version of the CloudQuery binary. It defaults to the current latest release: v6.4.1.
- The path to the sync's specification file. It defaults to the folder where the dag file is.
Let's use this basic spec, which uses XKCD as a source and SQLite as a destination
---
kind: source
spec:
name: "xkcd"
path: "cloudquery/xkcd"
version: "v1.0.6"
tables: ['*']
destinations:
- "sqlite"
spec:
---
kind: destination
spec:
name: sqlite
path: cloudquery/sqlite
registry: cloudquery
version: "v2.9.4"
spec:
# Make sure to use an absolute path here, because it's
# unlikely that Airflow will run this job as your user.
connection_string: "/your/user/path/Desktop/newdatabase.db"
The code required to run the sync is surprisingly simple:
# Task to run the CloudQuery sync command
@task(
dag=dag,
task_id='run_xkcd_to_sqlite_sync',
doc_md="""### Run XKCD to SQLite Sync
This task runs the CloudQuery sync command using the provided `spec_file_path`
parameter to sync XKCD data into a SQLite database.
It uses the path of the CloudQuery binary from the previous task.
""")
def run_xkcd_to_sqlite_sync(spec_file_path: str, cloudquery_path: str):
result = subprocess.run(
[cloudquery_path, 'sync', spec_file_path], capture_output=True
)
if result.returncode != 0:
raise Exception(
f"CloudQuery sync failed with return code {result.returncode}." +
" Output: {result.stdout.decode()}"
)
Upon running the DAG in the Airflow UI, the download task succeeds:
You can visualize the DAG and wait until it completes:
And voilà! The whole XKCD catalog is in our local SQLite database!
Check the full example here: https://github.com/cloudquery/airflow-dag-cloudquery