engineering
integration
tutorial

How To Run Efficient Embedded ELT CloudQuery Workflows Inside Dagster

Yevgeny Pats

Yevgeny Pats, Joe Karlsson

In this post, you will learn how to run an open-source high-performance extract, load, transform (ELT) framework, CloudQuery, with an orchestrator, in this case, Dagster. The cool part here is that you will be running CloudQuery completely inside Dagster itself, using Dagster resource management and scheduling capabilities. By running it completely locally, this means that you won’t be incurring any additional cloud costs. This is mostly possible due to CloudQuery and CloudQuery plugins design that can be run as single binaries without any third-party dependencies.

Pre-requirements #

It is recommended that you have a basic understanding of both Dagster and CloudQuery.

Writing a simple data pipeline #

In the first step, you will write the simplest pipeline consisting of two assets one for ingesting HackerNews data and one for creating a table that analyzes the most recent comments using DuckDB.
Full source available here, but let’s go step by step:

Configuring CloudQuery #

Firstly, let’s configure an ingestion pipeline with CloudQuery that ingests data from HackerNews to DuckDB. Configuration will look the following:
kind: source
spec:
name: "hackernews"
path: "cloudquery/hackernews"
registry: "cloudquery"
version: "v3.1.9"
tables: ["*"]
destinations:
  - "duckdb"
spec:
  item_concurrency: 100
  start_time: "2024-02-02T00:00:00Z"
\---
kind: destination
spec:
  name: duckdb
  path: cloudquery/duckdb
  registry: cloudquery
  version: "v5.6.3"
  spec:
    connection_string:  ./example.db
You can run the following simple configuration with CloudQuery locally:
cloudquery sync ./config.yml
This will start fetching all Hacker News items and comments starting from February 2, 2024. This process might take a while, so feel free to cancel the process while it’s running.
If you want to use your MotherDuck account then you just need to make a change to connection_string to be md:?motherduck_token=<token>, as described here.

Creating a software defined asset in Dagster running CloudQuery #

To run CloudQuery inside Dagster we are going to write a software defined asset that will execute CloudQuery with the same configuration as above. The code is available here.
@asset
def ingest_hn(context: AssetExecutionContext) -> None:
 ret = None
 with tempfile.NamedTemporaryFile(mode='w+t') as temp_file:
   temp_file.write(CQ_HN_SPEC)
   temp_file.flush()
   if CQ_MIGRATE_ONLY:
       ret = execute_shell_command(f"cloudquery migrate --log-console {temp_file.name}", output_logging="STREAM", log=context.log)
   else:
       ret = execute_shell_command(f"cloudquery sync --log-console {temp_file.name}", output_logging="STREAM", log=context.log)
   if ret[1] != 0:
     raise Exception(f"cloudquery command failed with exit code {ret[1]}")
In this Dagster asset, you will use [execute_shell_command](https://docs.dagster.io/_apidocs/libraries/dagster-shell#dagster_shell.execute_shell_command) which executes cloudquery sync just like in the prior step with the same configuration. This makes it more convenient to debug, avoids any additional abstraction layer, and gets CloudQuery running inside Dagster with the output being redirected to Dagster logging.
For testing purposes, this asset includes an additional environment variable called CQ_MIGRATE_ONLY which is useful to enable in testing so the pipeline can run CloudQuery with the same config and only create the tables without running a full sync, this will also enable downstream assets to run successfully, as the schema will be available (just without any data).
The spec is defined statically in CQ_HN_SPEC in constants.py and is parametrized using an environment variable we can control called DUCKDB_CONNECTION_STRING and we can set it locally to a local file and in production to MotherDuck database.

Creating a downstream analytics asset #

Now we can create a simple view/table each time we sync new items from CloudQuery by utilizing the standard Dagster DuckDB resource and executing this simple query in a dependent asset:
CREATE OR REPLACE TABLE hackernews_oss_items AS
SELECT
    title,
    score
FROM
    hackernews_items
WHERE
    parent = 0
    AND (
        title ILIKE '%open source%'
        OR
        title ILIKE '%open-source%'
    )
ORDER BY
    score DESC
LIMIT 20;

Testing #

The cool part is that we can run simple smoke tests with a few lines of code and this will execute the whole pipeline 100% locally and with almost identical configuration. This can save tons of money during development and CI (imagine spinning up compute just to run smoke tests for every PR  💸).

Running #

To run this locally, follow the README and just execute dagster dev. To run this in the cloud please check out Dagster Cloud documentation.

Resource Management #

Apart from being able to run everything locally, speed up development, and save costs, we can also take advantage of Dagster CPU management. For example, if you are using the ECS launcher you can define the CPU and memory for each ingestion/CloudStep as described here (Note: This is an advanced Dagster feature, so proceed with caution).

Summary #

This tutorial demonstrated how to run CloudQuery entirely within Dagster, leveraging its resource management and scheduling capabilities to ingest, store, and analyze data locally with DuckDB. This setup accelerated development, reduced cloud costs, and maintained flexibility for various databases, showcasing an efficient and scalable approach to data pipeline orchestration. This example is not limited only to DuckDB, but can work well with any other database that can run locally or in production easily such as PostgreSQL, ClickHouse, and many others.
If you have questions about this post or about CloudQuery, the best way to get connected with us is on the CloudQuery Community Discord.
The best way to get started is by trying CloudQuery Cloud for FREE, or by setting up CloudQuery locally.

FAQs #

Q: Can I use other databases other than DuckDB with CloudQuery? A: Yes, the setup is flexible and can work with other databases that can run locally or in production, such as PostgreSQL, ClickHouse, and others. You just need to adjust the configuration settings to match your chosen database.
Q: How can I test my pipeline without incurring cloud costs? A: You can run the entire pipeline locally using Dagster and CloudQuery, which allows you to test without additional cloud costs. Utilize the CQ_MIGRATE_ONLY environment variable for testing to create the necessary tables without running a full data sync.
Q: What are the benefits of using Dagster with CloudQuery? A: Using Dagster with CloudQuery enables you to use your own orchestrator (especially if you already use Dagster) and use its  enhanced resource management, scheduling capabilities, and the ability to run everything locally. This integration speeds up development, reduces costs, and provides a robust framework for managing and orchestrating data pipelines.
Subscribe to product updates

Be the first to know about new features.