announcement
Announcing the Python SDK for CloudQuery Plugin Development
Herman Schaaf •
We're excited to announce the first release of a Python SDK for CloudQuery plugin development! This SDK provides a high-level toolkit for developing CloudQuery plugins in Python.
Background #
CloudQuery is designed with a pluggable architecture and uses Apache Arrow over gRPC for communication between plugins. Source and destination plugins are independent of one another, and this architecture allows plugins to be written in different languages but still communicate with one another. Until now, we've only provided an SDK for writing plugins in Go, but we're excited to announce that we now have an SDK for writing plugins in Python as well.
Features #
The Python SDK provides a number of features to make plugin development easier, and make plugins written in Python performant.
Plugin Server #
The most basic functionality provided by the Python SDK is to start a gRPC plugin server that supports all the flags expected by the CloudQuery CLI. This allows you to write a plugin in Python and run it using the same command line interface as any other plugin.
The following example shows how to create a plugin server that runs a plugin called
MyPlugin
:import sys
from cloudquery.sdk import serve
from plugin import MyPlugin
def main():
p = MyPlugin()
serve.PluginCommand(p).run(sys.argv[1:])
if __name__ == "__main__":
main()
Here
MyPlugin
is a class that extends cloudquery.sdk.plugin.Plugin
. We'll look at the Plugin
class in more detail next.Plugin Class #
A CloudQuery Python source plugin, like
MyPlugin
above, should subclass cloudquery.sdk.plugin.Plugin
and needs to implement three methods: init
, get_tables
and sync
. The init
method is called when the plugin is started, and is where you can do any initialization work. The get_tables
method should return a list of tables that the plugin supports. The sync
method is called when a table needs to be synced, and is where the SDK scheduler can be used to manage the syncing of all the supported tables.Multi-threaded Scheduler #
The scheduler's main responsibilities are to manage concurrent execution of requests and the order in which tables are synced to avoid dependency issues. It also place limits on the number of concurrent requests and memory usage.
To invoke the scheduler, the
sync
method of a plugin should pass a list of its tables and options to the scheduler. The scheduler will take care of the rest. Here is an example from the Typeform plugin:from typing import Generator
from cloudquery.sdk import message
from cloudquery.sdk import plugin
from cloudquery.sdk.scheduler import TableResolver
# ...
class TypeformPlugin(plugin.Plugin):
# ...
def sync(
self, options: plugin.SyncOptions
) -> Generator[message.SyncMessage, None, None]:
resolvers: list[TableResolver] = []
for table in self.get_tables(
plugin.TableOptions(
tables=options.tables,
skip_tables=options.skip_tables,
skip_dependent_tables=options.skip_dependent_tables,
)
):
resolvers.append(table.resolver)
return self._scheduler.sync(
self._client, resolvers, options.deterministic_cq_id
)
Apache Arrow-based Type System with Custom Types #
Table columns are defined using the Apache Arrow type system, a powerful and flexible way to define data types. CloudQuery destinations support almost all Arrow types, and the Python SDK provides support for a few additional types, such as UUID, IP address and JSON. For example, here is the definition of the
typeform_forms
table from the Typeform plugin, that uses string
, timestamp
and JSON
types:import pyarrow as pa
from cloudquery.sdk.schema import Column
from cloudquery.sdk.schema import Table
from cloudquery.sdk.types import JSONType
from plugin.tables.form_responses import FormResponses
class Forms(Table):
def __init__(self) -> None:
super().__init__(
name="typeform_forms",
title="Typeform Forms",
columns=[
Column("id", pa.string(), primary_key=True),
Column("created_at", pa.timestamp(unit="s")),
Column("last_updated_at", pa.timestamp(unit="s")),
Column("self", JSONType()),
Column("type", pa.string()),
Column("settings", JSONType()),
Column("theme", JSONType()),
Column("title", pa.string()),
Column("_links", JSONType()),
],
relations=[FormResponses()],
)
OpenAPI Transformer #
The Python SDK introduces a transformer that can convert fields from an OpenAPI specification into CloudQuery-compatible table schemas. This can greatly reduce the manual work needed to develop plugins for APIs that have an OpenAPI spec. Check out the Square plugin source code for a great example of this in action.
Docker for Cross-Platform Distribution #
To support cross-platform packaging of Python plugins (and other languages in the future) in
v3.12.0
we introduced a new docker
registry type to the CloudQuery CLI. Where Go-based plugins are downloaded as binaries from GitHub releases, Python plugins are downloaded as Docker images from the specified Docker registry. This allows CloudQuery to support multiple platforms, and also makes it easier to distribute plugins that have dependencies on external libraries.Getting Started #
If you're keen to dive right in and develop a source plugin in Python, check out the Python Plugin Development Guide and sample plugins on GitHub:
We will also be adding more documentation and examples in the coming weeks, so stay tuned!
Future Work #
Work is already underway to add SDKs for more languages. We won't spoil the surprise here, but we're excited to share more details soon. Be sure to follow us on Twitter or subscribe to our newsletter below 👇 to get the latest updates.
The first release of the Python SDK only officially supports source plugins. Writing a destination plugin in Python is possible using the low-level gRPC APIs, but is not yet officially supported by the Python SDK.