tutorial
Creating a New JavaScript Source Plugin from Scratch
In this blog post, I will walk you through the steps to create a new source plugin using CloudQuery JavaScript SDK. The purpose is to give you a basic overview of how to create tables and columns and how to sync the actual data.
What You Need to Know #
Make sure you go over the core concepts of CloudQuery and the JavaScript SDK.
I will be using TypeScript for IDE autocompletion and code type safety.
Prerequisites #
To build and release the plugin for public use, you will need Docker to be installed locally.
The Plan #
At the high-level, this is what I would like to achieve:
- The plugin should support reading a set of CSV files from a directory.
- Each file will be synced as a single table in the destination.
- The column names will be read from the first row.
- The plugin should recognize basic types - I will start with numbers and everything else will be a string.
Here's what I imagine the configuration will look like:
kind: source
spec:
name: 'text-file'
registry: 'grpc' # for local testing only, this will change once the plugin is published
path: 'localhost:7777' # same as above
version: 'v1.0.0'
tables: ['*']
destinations:
- 'sqlite'
spec:
path: 'test_data/' # specify folder or a single file
csvDelimiter: ';'
---
kind: destination
spec:
name: sqlite
path: cloudquery/sqlite
registry: cloudquery
version: 'v2.9.17'
spec:
connection_string: ./db.sql
Getting Started #
We will not start completely from scratch but rather use a template that needs filling in the blanks. There is a javascript-plugin-template starter repository that you can clone.
To install all dependencies and test whether it is running, run the following commands:
npm install
npm run dev # this will run the plugin in dev mode as a server listening on localhost:7777
cloudquery sync sync.yml #run this in a new terminal tab
This will create a new SQLite database file named
db.sql
with table Names
and two records. You can open this file using DB Browser for SQLite or any other SQLite client.What's inside #
The main content is in the
src
folder:main.ts
- The main wrapper for the plugin. It's responsibility is to start the plugin the CLI will communicate with when running the sync.plugin.ts
- The body of the plugin. The main responsibility of this module is to return an initialized instance of the plugin to serve.spec.ts
- This module is responsible for handling and validating plugin configuration.tables.ts
- This is the module where the main work happens and the one we'll be working with the most.tables.test.ts
- Sample unit tests for the module above.
How it all works #
When CloudQuery runs a sync, it will call the plugin clients'
sync
function and expect the plugin to load the tables. We will be implementing the functions to return the tables back to the plugin client.There are other functions that the The SDK provides a basic implementation for the plugins so we will not need to implement anything else and we will rely on the implementation returned by the
newPlugin
function.The code below (see
plugin.ts
) takes care of creating a newClient
function that will be called by the SDK and it will connect the plugin configuration passed in spec
with the getTables
where the actual implementation will happen.const newClient: NewClientFunction = async (logger, spec, { noConnection }) => {
pluginClient.spec = parseSpec(spec);
pluginClient.client = { id: () => 'cq-js-sample' };
if (noConnection) {
pluginClient.allTables = [];
return pluginClient;
}
pluginClient.allTables = await getTables();
return pluginClient;
};
pluginClient.plugin = newPlugin('cq-js-sample', version, newClient);
The
getTables
is a function exported from the tables
module. It is an asynchronous function returning an array of tables that will be passed to the destination plugin. In this sample plugin, it only returns one table.// tables.ts
export const getTables = async (): Promise<Table[]> => {
const table = await getTable();
return [table];
};
The table returned is specified by a definition of columns and a
tableResolver
, which takes care of writing to a stream provided by the SDK.const columnNames = ['First Name', 'Last Name'];
const tableRecords = [
{ 'First Name': 'Jack', 'Last Name': 'Bauer' },
{ 'First Name': 'Thomas', 'Last Name': 'Kirkman' },
];
// ...
const tableResolver: TableResolver = (clientMeta, parent, stream) => {
for (const r of tableRecords) stream.write(r);
return Promise.resolve();
};
return createTable({ name: 'Names', columns: columnDefinitions, resolver: tableResolver });
We will need to change this to get the
tableRecords
from a CSV file.Let's Code! #
Plugin configuration #
We will start with the plugin configuration. To parse a set of CSV files, we'll need the following:
- a path to where the files are; mandatory
- a CSV delimiter; defaults to ","
This is what the spec will look like in the YAML for CloudQuery CLI:
spec:
path: 'test_data/'
csvDelimiter: ';'
We will define the spec in the
spec.ts
file:const spec = {
type: "object",
properties: {
concurrency: { type: "integer" },
path: { type: "string" },
csvDelimiter: {type: "string" },
},
required: ["path"],
};
const ajv = new Ajv.default();
const validate = ajv.compile(spec);
export type Spec = {
concurrency: number;
path: string;
csvDelimiter: string;
};
export const parseSpec = (spec: string): Spec => {
const parsed = JSON.parse(spec) as Partial<Spec>;
const valid = validate(parsed);
if (!valid) {
throw new Error(`Invalid spec: ${JSON.stringify(validate.errors)}`);
}
const {
concurrency = 10_000,
path = "",
csvDelimiter = ",",
} = camelcaseKeys(parsed);
! return { concurrency, path, csvDelimiter };
};
This will now get passed to our
getTables
function in tables.ts
, so we can add it as an argument there. We'll also add a logger and pass it from the plugin initialization.export const getTables = async (logger: Logger, spec: Spec): Promise<Table[]> => {
const table = await getTable();
return [table];
};
Reading the CSV files #
This should be fairly straightforward. We will use the
fs
module to get the list of files in the provided path
and then load the files using csv-parse
. We will add a function to get the CSV file paths and another one to parse a file.Note: Install the
csv-parse
package using npm install csv-parse
.//update imports:
import type { Logger } from 'winston';
import fs from 'node:fs/promises';
import Path from 'node:path';
// ...
const getCsvFiles = async (logger: Logger, path: string): Promise<string[]> => {
const stats = await fs.stat(path);
if (stats.isDirectory()) {
const files = await fs.readdir(path, { withFileTypes: true });
return files.filter((f) => f.isFile()).map((f) => Path.join(path, f.name));
}
logger.error('Target path is not a directory.');
return [];
};
const parseCsvFile = async (path: string, csvDelimiter: string): Promise<string[][]> => {
const content = await fs.readFile(path);
return new Promise<string[][]>((resolve, reject) => {
parse(content, { delimiter: csvDelimiter }, (error, records) => {
if (error) {
reject(error);
return;
}
resolve(records);
});
});
};
Converting the raw CSV data to a table #
Our
getTables
function needs to an array of Table
objects so we will need to convert our raw data. We can use the SDK's createTable
function which requires us to pass a few options:- table name
- column definitions
- table resolver - a function that will write the actual records to a provided stream
We will start with the column definitions. First, we need to prepare a column resolver function. Column resolvers are responsible for mapping your data into the columns of the table. In most cases, you will not need to implement anything special, but in some cases, you may need to implement a custom column resolver to fetch additional data or do custom transformations.
In our case, we will just record the data as it comes (this resolver is already in place):
const getColumnResolver = (c: string): ColumnResolver => {
return (meta, resource) => {
const dataItem = resource.getItem();
resource.setColumData(c, (dataItem as Record<string, unknown>)[c]);
return Promise.resolve();
};
};
We will use the first row of the CSV file to get the column names. We will use the
Utf8
as the default column type for now.The starter project already contains a function
getTable
that serves as an example so let's modify it to write the data from the CSV file into the stream:const getTable = async (rows: string[][], tableName: string): Promise<Table> => {
if (rows.length === 0) {
throw new Error('No rows found');
}
const getRecordObjectFromRow = (row: string[]) => {
const record: Record<string, string> = {};
for (const [index, element] of row.entries()) {
record[columnNames[index]] = element;
}
return record;
};
const columnNames = rows[0];
// convert all rows except column definitions to an array of Record<string, string> objects
const tableRecords = rows.filter((_, index) => index > 0).map((r) => getRecordObjectFromRow(r));
const columnDefinitions: Column[] = columnNames.map((c) => ({
name: c,
type: new Utf8(),
description: '',
primaryKey: false,
notNull: false,
incrementalKey: false,
unique: false,
ignoreInTests: false,
resolver: getColumnResolver(c),
}));
const tableResolver: TableResolver = (clientMeta, parent, stream) => {
for (const r of tableRecords) stream.write(r);
return Promise.resolve();
};
return createTable({ name: tableName, columns: columnDefinitions, resolver: tableResolver });
};
Connecting it all together #
We now have all the pieces in place. We just need to modify the
getTables
function to load the CSV files and convert them to tables. This is the place where I will use pMap
to use the concurrency provided in the spec. It is probably not necessary in this case but it is a good practice to use it when you are doing any IO operations.import pMap from 'p-map';
// ...
export const getTables = async (logger: Logger, spec: Spec): Promise<Table[]> => {
const { path, csvDelimiter, concurrency } = spec;
const files = await getCsvFiles(logger, path);
logger.info(`done discovering files. Found ${files.length} files`);
const allTables = await pMap(
files,
async (filePath) => {
const csvFile = await parseCsvFile(filePath, csvDelimiter);
return getTable(csvFile, Path.basename(filePath, '.csv'));
},
{
concurrency,
},
);
return allTables;
};
We also need to go back to
plugin.ts
and update the call to getTables
to pass the spec:const newClient: NewClientFunction = async (logger, spec, { noConnection }) => {
pluginClient.spec = parseSpec(spec);
pluginClient.client = { id: () => 'cq-js-sample' };
if (noConnection) {
pluginClient.allTables = [];
return pluginClient;
}
pluginClient.allTables = await getTables(logger, pluginClient.spec);
return pluginClient;
};
First test #
Let's see if this all works. First, we need to modify our
sync.yml
to add the required configuration:kind: source
spec:
name: 'text-file'
registry: 'grpc'
path: 'localhost:7777'
version: 'v1.0.0'
tables: ['*']
destinations:
- 'sqlite'
spec:
path: 'test_data'
csvDelimiter: ','
#...
Make sure you specify a path to a directory with some CSV files. Here's an example CSV file you can use:
Name,Count,Age
Peter,3,1.5
Mike,5,2.3
Jack,2,3.2
Now we can start the plugin in a listening mode and then run cloudquery with the sync.yml in the project.
Start the plugin:
$ npm run dev
> @cloudquery/[email protected] dev
> ts-node --esm src/main.ts serve
[2023-09-28T09:57:13.324Z] info server running on port: 7777
And in a new terminal window, run
$ cloudquery sync sync.yml
Loading spec(s) from sync.yml
Starting sync for: text-file (grpc@localhost:7777) -> [sqlite (v2.4.9)]
/ Syncing resources... (12772/-, 2141 resources/s) [11s]
Sync completed successfully. Resources: 22463, Errors: 0, Warnings: 0, Time: 14s
Once the sync finished, the
db.sql
file should contain new tables with the data from the CSV files. If you imported the example above, it should be created as a table with CloudQuery columns _cq_sync_time
and _cq_source_name
and three columns of type TEXT
with the names Name
, Count
, and Age
.Custom column types #
Let's do one last thing: we will improve the plugin by adding an automatic detection of columns with numbers. We will do this for files that have at least one row with data (i.e. they have at least two rows) and we will determine the column types by reading the values from the second row.
The types for column definitions are Apache Arrow types exported from CloudQuery JS SDK. We will use the
Float64
type for floating-point numbers and Int64
for integers.First, we'll create two functions to convert a row represented by an array of strings to an array of
DataType
objects.import type { DataType } from '@cloudquery/plugin-sdk-javascript/arrow';
import { Utf8, Int64, Float64 } from '@cloudquery/plugin-sdk-javascript/arrow';
//...
const getColumnType = (value: string): DataType => {
const number = Number(value);
if (Number.isNaN(number)) return new Utf8();
if (Number.isInteger(number)) return new Int64();
return new Float64();
};
const getColumnTypes = (row: string[]): DataType[] => {
return row.map((value) => getColumnType(value));
};
Then we will update the
getTable
function to use the getColumnTypes
and to determine the proper type for the data records:const getTable = async (
rows: string[][],
tableName: string,
): Promise<Table> => {
if (rows.length === 0) {
throw new Error("No rows found");
}
const columnNames = rows[0];
const getRecordObjectFromRow = (row: string[]) => {
const record: Record<string, string | number> = {};
for (const [index, element] of row.entries()) {
record[columnNames[index]] = Number.isNaN(Number(element)) ? element : Number(element);
}
return record;
};
const columnTypes = rows.length > 1 ? getColumnTypes(rows[1]) : rows[0].map(()=>new Utf8());
// convert all rows except column definitions to an array of Record<string, string> objects
const tableRecords = rows.filter((_, index) => index > 0).map((r)=>getRecordObjectFromRow(r));
const columnDefinitions: Column[] = columnNames.map((c, index) => ({
name: c,
type: columnTypes[index],
description: "",
primaryKey: false,
notNull: false,
incrementalKey: false,
unique: false,
ignoreInTests: false,
resolver: getColumnResolver(c),
}));
That's it! Now delete the
db.sql
file, restart the plugin, and test the sync again. This time, the db.sql
file should contain the right type for your number columns: TEXT
, INTEGER
, and REAL
.Release #
JavaScript plugins are published and executed using Docker containers. Our starter project already contains a Dockerfile that packages your plugin into a container. Use the
npm run package:container
command to build it.To point CloudQuery to the plugin container, make the following changes to the
sync.yml
file:spec:
name: "cq-js-sample"
registry: "docker"
path: "cq-js-sample:latest" # this is the name and tag of the docker image to be pulled.
version: "v1.0.0"
tables:
["*"]
// ...
To find out how to release your plugin for use by a wider CloudQuery community, see Releasing and Deploying your Plugin in our documentation.
Resources #
Starter repository for this blog post and for new plugins:
JavaScript Plugin Template
Check out the
csv-file-sync
branch of the repository above to get to the final code for our CSV plugin.Other JavaScript Source Plugin Examples #
Airtable Plugin is another (a bit more advanced) example of a plugin using our JavaScript SDK.
Written by Michal Brutvan
Michal is CloudQuery's senior product manager and has responsibility for new features and CloudQuery's product roadmap. He has had a wealth of product ownership roles and prior to that, worked as a software engineer.