Announcing the Python SDK for CloudQuery Plugin Development
Herman Schaaf • Aug 17, 2023
We're excited to announce the first release of a Python SDK for CloudQuery plugin development! This SDK provides a high-level toolkit for developing CloudQuery plugins in Python.
CloudQuery is designed with a pluggable architecture and uses Apache Arrow over gRPC for communication between plugins. Source and destination plugins are independent of one another, and this architecture allows plugins to be written in different languages but still communicate with one another. Until now, we've only provided an SDK for writing plugins in Go, but we're excited to announce that we now have an SDK for writing plugins in Python as well.
The Python SDK provides a number of features to make plugin development easier, and make plugins written in Python performant.
The most basic functionality provided by the Python SDK is to start a gRPC plugin server that supports all the flags expected by the CloudQuery CLI. This allows you to write a plugin in Python and run it using the same command line interface as any other plugin.
The following example shows how to create a plugin server that runs a plugin called
import sys from cloudquery.sdk import serve from plugin import MyPlugin def main(): p = MyPlugin() serve.PluginCommand(p).run(sys.argv[1:]) if __name__ == "__main__": main()
MyPluginis a class that extends
cloudquery.sdk.plugin.Plugin. We'll look at the
Pluginclass in more detail next.
A CloudQuery Python source plugin, like
MyPluginabove, should subclass
cloudquery.sdk.plugin.Pluginand needs to implement three methods:
initmethod is called when the plugin is started, and is where you can do any initialization work. The
get_tablesmethod should return a list of tables that the plugin supports. The
syncmethod is called when a table needs to be synced, and is where the SDK scheduler can be used to manage the syncing of all the supported tables.
The scheduler's main responsibilities are to manage concurrent execution of requests and the order in which tables are synced to avoid dependency issues. It also place limits on the number of concurrent requests and memory usage.
To invoke the scheduler, the
syncmethod of a plugin should pass a list of its tables and options to the scheduler. The scheduler will take care of the rest. Here is an example from the Typeform plugin:
from typing import Generator from cloudquery.sdk import message from cloudquery.sdk import plugin from cloudquery.sdk.scheduler import TableResolver # ... class TypeformPlugin(plugin.Plugin): # ... def sync( self, options: plugin.SyncOptions ) -> Generator[message.SyncMessage, None, None]: resolvers: list[TableResolver] =  for table in self.get_tables( plugin.TableOptions( tables=options.tables, skip_tables=options.skip_tables, skip_dependent_tables=options.skip_dependent_tables, ) ): resolvers.append(table.resolver) return self._scheduler.sync( self._client, resolvers, options.deterministic_cq_id )
Table columns are defined using the Apache Arrow type system, a powerful and flexible way to define data types. CloudQuery destinations support almost all Arrow types, and the Python SDK provides support for a few additional types, such as UUID, IP address and JSON. For example, here is the definition of the
typeform_formstable from the Typeform plugin, that uses
import pyarrow as pa from cloudquery.sdk.schema import Column from cloudquery.sdk.schema import Table from cloudquery.sdk.types import JSONType from plugin.tables.form_responses import FormResponses class Forms(Table): def __init__(self) -> None: super().__init__( name="typeform_forms", title="Typeform Forms", columns=[ Column("id", pa.string(), primary_key=True), Column("created_at", pa.timestamp(unit="s")), Column("last_updated_at", pa.timestamp(unit="s")), Column("self", JSONType()), Column("type", pa.string()), Column("settings", JSONType()), Column("theme", JSONType()), Column("title", pa.string()), Column("_links", JSONType()), ], relations=[FormResponses()], )
The Python SDK introduces a transformer that can convert fields from an OpenAPI specification into CloudQuery-compatible table schemas. This can greatly reduce the manual work needed to develop plugins for APIs that have an OpenAPI spec. Check out the Square plugin source code for a great example of this in action.
To support cross-platform packaging of Python plugins (and other languages in the future) in
v3.12.0we introduced a new
dockerregistry type to the CloudQuery CLI. Where Go-based plugins are downloaded as binaries from GitHub releases, Python plugins are downloaded as Docker images from the specified Docker registry. This allows CloudQuery to support multiple platforms, and also makes it easier to distribute plugins that have dependencies on external libraries.
If you're keen to dive right in and develop a source plugin in Python, check out the Python Plugin Development Guide and sample plugins on GitHub:
We will also be adding more documentation and examples in the coming weeks, so stay tuned!
Work is already underway to add SDKs for more languages. We won't spoil the surprise here, but we're excited to share more details soon. Be sure to follow us on Twitter or subscribe to our newsletter below 👇 to get the latest updates.
The first release of the Python SDK only officially supports source plugins. Writing a destination plugin in Python is possible using the low-level gRPC APIs, but is not yet officially supported by the Python SDK.