Creating a New Destination Integration in Go
This guide builds a SQLite destination integration from scratch in stages. At each stage you’ll have something you can actually run — starting with a gRPC server that serves but does nothing, then adding the ability to create tables, then write rows, then handle schema changes.
The same patterns apply to any destination backend: PostgreSQL, BigQuery, a custom database, or an external API that accepts writes. The official SQLite destination follows this exact structure and is a good reference once you’ve finished this guide.
Before starting, review core concepts and the Creating a New Integration overview. The terminology (tables, syncs, write modes) is introduced there.
Prerequisites:
- Go 1.21+ installed (Go Tutorial)
- CloudQuery CLI installed
- No CGO or C compiler required for this guide
How Destinations Differ from Sources
A source integration produces data — it fetches records from a third-party API and sends them to CloudQuery. A destination integration consumes data — it receives a stream of write messages from the CloudQuery engine and stores them somewhere.
When a user runs cloudquery sync, data flows into your destination like this:
cloudquery sync
│
▼
[CloudQuery Engine] ──▶ Write(ctx, res <-chan WriteMessage)
│
[batchwriter] ← SDK helper: buffers and routes
┌─────────┼──────────────────┐
▼ ▼ ▼
MigrateTables() WriteTableBatch() DeleteStale()
before rows batched rows after rows
│ │ │
└────┬────┴───────────────────┘
▼
[Your Database]Your destination receives one call to Write. You pass that channel straight to the SDK’s batchwriter helper, which takes care of buffering and routing. It calls three methods on your client:
| Method | When called | What it does |
|---|---|---|
MigrateTables | Before rows for any table | Create or update the table schema |
WriteTableBatch | When a batch of rows is ready | Insert or upsert the rows |
DeleteStale | After rows, in overwrite-delete-stale mode | Remove rows older than this sync |
You’ll implement these one at a time, building up the destination’s capabilities as you go.
Project Structure
cq-destination-sqlite/
├── main.go
├── go.mod
├── resources/
│ └── plugin/
│ └── plugin.go # Name, Kind, Team, Version constants
└── client/
├── client.go # Client struct, constructor, Write
├── spec.go # Configuration spec
├── migrate.go # MigrateTables
├── write.go # WriteTableBatch, DeleteRecord
└── deletestale.go # DeleteStaleScaffold the Integration
There is no cq-scaffold equivalent for destination integrations — you’ll create the files manually. There aren’t many of them, and this guide walks through each one.
Build the minimum needed to get a gRPC server running. By the end of this section you can start your destination and CloudQuery can connect to it — even though it won’t create tables or write data yet.
Integration Constants
package plugin
var (
Name = "sqlite"
Kind = "destination"
Team = "your-team"
Version = "development"
)Kind must be "destination". These values appear in log output and on the CloudQuery Hub.
Configuration Spec
package client
import "fmt"
const (
defaultBatchSize = int64(10_000)
defaultBatchSizeBytes = int64(10 * 1024 * 1024) // 10 MiB
)
// Spec holds the user-provided configuration for this destination.
type Spec struct {
// Path to the SQLite file, e.g. "./mydb.db"
ConnectionString string `json:"connection_string"`
// Maximum rows per write batch. Default: 10,000.
BatchSize int64 `json:"batch_size,omitempty"`
// Maximum bytes per write batch. Default: 10 MiB.
BatchSizeBytes int64 `json:"batch_size_bytes,omitempty"`
}
func (s *Spec) SetDefaults() {
if s.BatchSize == 0 {
s.BatchSize = defaultBatchSize
}
if s.BatchSizeBytes == 0 {
s.BatchSizeBytes = defaultBatchSizeBytes
}
}
func (s *Spec) Validate() error {
if s.ConnectionString == "" {
return fmt.Errorf("connection_string is required")
}
return nil
}A user’s destination YAML block looks like:
kind: destination
spec:
name: my-sqlite
registry: grpc
path: localhost:7777
spec:
connection_string: ./cloudquery.dbClient
The Client struct holds your database connection and the batchwriter instance. Write delegates to the batchwriter and doesn’t change as you add features.
package client
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/apache/arrow-go/v18/arrow"
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/plugin"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/cloudquery/plugin-sdk/v4/writers/batchwriter"
"github.com/rs/zerolog"
// Pure-Go SQLite driver — no C compiler required.
// Registers the "sqlite" driver name with database/sql.
_ "modernc.org/sqlite"
)
type Client struct {
plugin.UnimplementedSource
db *sql.DB
logger zerolog.Logger
spec Spec
writer *batchwriter.BatchWriter
}
// Read satisfies the plugin.Client interface. Return an error if your
// destination doesn't need to support reading rows back.
func (c *Client) Read(_ context.Context, _ *schema.Table, _ chan<- arrow.RecordBatch) error {
return fmt.Errorf("read not supported by this destination")
}
// New is the constructor passed to plugin.NewPlugin.
func New(_ context.Context, logger zerolog.Logger, specBytes []byte, _ plugin.NewClientOptions) (plugin.Client, error) {
c := &Client{
logger: logger.With().Str("module", "sqlite-dest").Logger(),
}
if err := json.Unmarshal(specBytes, &c.spec); err != nil {
return nil, fmt.Errorf("failed to unmarshal spec: %w", err)
}
c.spec.SetDefaults()
if err := c.spec.Validate(); err != nil {
return nil, err
}
var err error
c.writer, err = batchwriter.New(c,
batchwriter.WithLogger(c.logger),
batchwriter.WithBatchSize(c.spec.BatchSize),
batchwriter.WithBatchSizeBytes(c.spec.BatchSizeBytes),
)
if err != nil {
return nil, fmt.Errorf("failed to create batchwriter: %w", err)
}
// modernc.org/sqlite uses the driver name "sqlite" (vs "sqlite3" for mattn/go-sqlite3).
c.db, err = sql.Open("sqlite", c.spec.ConnectionString)
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
}
return c, nil
}
func (c *Client) Close(_ context.Context) error {
if c.db == nil {
return fmt.Errorf("client already closed")
}
err := c.db.Close()
c.db = nil
return err
}
// Write is called once per sync. It passes the message channel to the
// batchwriter, which routes messages to MigrateTables, WriteTableBatch,
// and DeleteStale. Always call Flush — the batchwriter may hold buffered
// rows that haven't been flushed yet.
func (c *Client) Write(ctx context.Context, res <-chan message.WriteMessage) error {
if err := c.writer.Write(ctx, res); err != nil {
return fmt.Errorf("failed to write: %w", err)
}
if err := c.writer.Flush(ctx); err != nil {
return fmt.Errorf("failed to flush: %w", err)
}
return nil
}Why modernc.org/sqlite? The official CloudQuery SQLite destination uses github.com/mattn/go-sqlite3, which requires a C compiler. This guide uses modernc.org/sqlite — a pure-Go port that compiles with a standard go build on any platform. The API is identical; only the import path and driver name differ.
Starter Stubs
Create these two files now. They give the batchwriter valid methods to call so the whole thing compiles. You’ll replace them with real implementations in the next steps.
package client
import (
"context"
"github.com/cloudquery/plugin-sdk/v4/message"
)
func (c *Client) MigrateTables(_ context.Context, _ message.WriteMigrateTables) error {
return nil // Step 2: create tables
}package client
import (
"context"
"fmt"
"github.com/cloudquery/plugin-sdk/v4/message"
)
func (c *Client) WriteTableBatch(_ context.Context, _ string, _ message.WriteInserts) error {
return nil // Step 3: write rows
}
func (c *Client) DeleteStale(_ context.Context, _ message.WriteDeleteStales) error {
return nil // Step 4: delete stale rows
}
func (c *Client) DeleteRecord(_ context.Context, _ message.WriteDeleteRecords) error {
return fmt.Errorf("DeleteRecord is not supported by this destination")
}Entry Point
package main
import (
"context"
"log"
"github.com/cloudquery/<your-org>/cq-destination-sqlite/client"
internalPlugin "github.com/cloudquery/<your-org>/cq-destination-sqlite/resources/plugin"
"github.com/cloudquery/plugin-sdk/v4/plugin"
"github.com/cloudquery/plugin-sdk/v4/serve"
)
func main() {
p := plugin.NewPlugin(
internalPlugin.Name,
internalPlugin.Version,
client.New,
plugin.WithKind(internalPlugin.Kind), // "destination"
plugin.WithTeam(internalPlugin.Team),
)
if err := serve.Plugin(p, serve.WithDestinationV0V1Server()).Serve(context.Background()); err != nil {
log.Fatalf("failed to serve plugin: %v", err)
}
}serve.WithDestinationV0V1Server() adds gRPC endpoints compatible with older source integration protocol versions. Include it in every destination for broad compatibility.
go.mod
module github.com/<your-org>/cq-destination-sqlite
go 1.21
require (
github.com/apache/arrow-go/v18 v18.0.0
github.com/cloudquery/plugin-sdk/v4 v4.0.0
github.com/rs/zerolog v1.33.0
modernc.org/sqlite v1.34.0
)The versions above are placeholders. Fetch the latest before running:
go get github.com/cloudquery/plugin-sdk/v4@latest
go get modernc.org/sqlite@latest
go mod tidyRun It
go mod tidy
go build -o cq-destination-sqlite .
./cq-destination-sqlite serveYou should see:
Plugin server listening address=127.0.0.1:7777 plugin=your-team/destination/sqlite@developmentYour destination is running. CloudQuery can connect to it. It won’t create tables or write data yet — that’s what the next two sections add.
Create Tables
Implement MigrateTables so your destination creates a table the first time rows arrive for it.
MigrateTables is called once per table before any rows are written. At a minimum, it needs to create the table if it doesn’t exist. Replace client/migrate.go with this:
package client
import (
"context"
"fmt"
"strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/schema"
)
func (c *Client) MigrateTables(ctx context.Context, msgs message.WriteMigrateTables) error {
for _, msg := range msgs {
table := msg.Table
c.logger.Info().Str("table", table.Name).Msg("Migrating table")
exists, err := c.tableExists(table.Name)
if err != nil {
return err
}
if !exists {
if err := c.createTable(ctx, table); err != nil {
return err
}
}
}
return nil
}
func (c *Client) tableExists(tableName string) (bool, error) {
rows, err := c.db.Query(fmt.Sprintf("PRAGMA table_info('%s')", tableName))
if err != nil {
return false, err
}
defer rows.Close()
return rows.Next(), rows.Err()
}
func (c *Client) createTable(ctx context.Context, table *schema.Table) error {
var sb strings.Builder
sb.WriteString(`CREATE TABLE IF NOT EXISTS "`)
sb.WriteString(table.Name)
sb.WriteString(`" (`)
var primaryKeys []string
for i, col := range table.Columns {
sb.WriteString(`"` + col.Name + `" `)
sb.WriteString(arrowTypeToSQLite(col.Type))
if col.NotNull {
sb.WriteString(" NOT NULL")
}
if i < len(table.Columns)-1 {
sb.WriteString(", ")
}
if col.PrimaryKey {
primaryKeys = append(primaryKeys, `"`+col.Name+`"`)
}
}
// Composite primary key — SQLite requires this syntax when there are multiple keys.
if len(primaryKeys) > 0 {
sb.WriteString(`, CONSTRAINT "`)
sb.WriteString(table.Name)
sb.WriteString(`_cqpk" PRIMARY KEY (`)
sb.WriteString(strings.Join(primaryKeys, ", "))
sb.WriteString(")")
}
sb.WriteString(")")
_, err := c.db.ExecContext(ctx, sb.String())
return err
}
// arrowTypeToSQLite maps an Arrow DataType to a SQLite storage class.
// SQLite has four storage classes: TEXT, INTEGER, REAL, BLOB.
func arrowTypeToSQLite(t arrow.DataType) string {
switch t.ID() {
case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64,
arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64,
arrow.BOOL:
return "INTEGER"
case arrow.FLOAT32, arrow.FLOAT64:
return "REAL"
case arrow.BINARY, arrow.LARGE_BINARY:
return "BLOB"
case arrow.TIMESTAMP, arrow.DATE32, arrow.DATE64, arrow.TIME32, arrow.TIME64:
return "TIMESTAMP"
default:
// UTF8, LARGE_UTF8, lists, structs, maps → TEXT
return "TEXT"
}
}Try It
Start your destination, then run a sync against any source. Here’s a config using the public xkcd source (no API key needed):
kind: source
spec:
name: xkcd
path: cloudquery/xkcd
registry: cloudquery
version: "v1.5.38"
tables: ["*"]
destinations:
- my-sqlite
---
kind: destination
spec:
name: my-sqlite
registry: grpc
path: localhost:7777
spec:
connection_string: ./test.db./cq-destination-sqlite serve &
cloudquery sync config.yaml
sqlite3 test.db ".tables"
# → xkcd_comicsThe table exists. No rows yet — WriteTableBatch still returns nil without writing anything. That’s what the next section adds.
Write Rows
Implement WriteTableBatch so rows actually land in the database.
When the batchwriter has accumulated enough rows for a table (up to batch_size), it calls WriteTableBatch with all the accumulated WriteInsert messages. You get the table name and the messages — each message contains an Apache Arrow RecordBatch (a columnar block of rows).
Replace client/write.go with this:
package client
import (
"context"
"database/sql"
"fmt"
"strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/schema"
)
func (c *Client) WriteTableBatch(ctx context.Context, name string, msgs message.WriteInserts) error {
if len(msgs) == 0 {
return nil
}
// Wrap all inserts in a single transaction — much faster than one per row.
tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
c.logger.Error().Err(rbErr).Str("table", name).Msg("rollback failed")
}
}
}()
for _, msg := range msgs {
if err = c.insertRows(ctx, tx, msg); err != nil {
return err
}
}
return tx.Commit()
}
func (c *Client) insertRows(ctx context.Context, tx *sql.Tx, msg *message.WriteInsert) error {
table := msg.GetTable()
sc := msg.Record.Schema()
// Tables with primary keys use INSERT OR REPLACE (upsert).
// Tables without primary keys use plain INSERT (append).
var sqlStr string
if len(table.PrimaryKeys()) == 0 {
sqlStr = buildInsert(sc)
} else {
sqlStr = buildUpsert(sc)
}
for _, row := range recordToRows(msg.Record) {
if _, err := tx.ExecContext(ctx, sqlStr, row...); err != nil {
return fmt.Errorf("insert failed for table %s: %w", table.Name, err)
}
}
return nil
}
func buildInsert(sc *arrow.Schema) string {
return buildSQL("insert into", sc)
}
// buildUpsert uses INSERT OR REPLACE, SQLite's upsert syntax.
// On a primary key conflict it deletes the old row and inserts the new one.
func buildUpsert(sc *arrow.Schema) string {
return buildSQL("insert or replace into", sc)
}
func buildSQL(verb string, sc *arrow.Schema) string {
tableName, _ := sc.Metadata().GetValue(schema.MetadataTableName)
var sb strings.Builder
sb.WriteString(verb + ` "` + tableName + `" (`)
fields := sc.Fields()
for i, f := range fields {
sb.WriteString(`"` + f.Name + `"`)
if i < len(fields)-1 {
sb.WriteString(", ")
}
}
sb.WriteString(") VALUES (")
for i := range fields {
sb.WriteString(fmt.Sprintf("$%d", i+1))
if i < len(fields)-1 {
sb.WriteString(", ")
}
}
sb.WriteString(")")
return sb.String()
}
func (c *Client) DeleteStale(_ context.Context, _ message.WriteDeleteStales) error {
return nil // Step 4: implement stale data deletion
}
func (c *Client) DeleteRecord(_ context.Context, _ message.WriteDeleteRecords) error {
return fmt.Errorf("DeleteRecord is not supported by this destination")
}Arrow Records to Rows
CloudQuery passes row data as Apache Arrow RecordBatch values — a columnar in-memory format. To call sql.ExecContext, you need to convert from columnar Arrow format to row-oriented []any slices.
Add these helpers at the bottom of client/write.go:
// recordToRows converts a columnar Arrow RecordBatch into row-oriented slices
// for sql.ExecContext.
func recordToRows(rec arrow.RecordBatch) [][]any {
numRows := int(rec.NumRows())
numCols := int(rec.NumCols())
rows := make([][]any, numRows)
for i := range rows {
rows[i] = make([]any, numCols)
}
for colIdx, col := range rec.Columns() {
for rowIdx := 0; rowIdx < numRows; rowIdx++ {
if col.IsNull(rowIdx) {
rows[rowIdx][colIdx] = nil
continue
}
rows[rowIdx][colIdx] = arrowValueAt(col, rowIdx)
}
}
return rows
}
// arrowValueAt extracts a single value from an Arrow array as a Go-native type
// suitable for use as a SQL parameter.
func arrowValueAt(col arrow.Array, i int) any {
switch c := col.(type) {
case *array.String:
return c.Value(i)
case *array.LargeString:
return c.Value(i)
case *array.Int8:
return int64(c.Value(i))
case *array.Int16:
return int64(c.Value(i))
case *array.Int32:
return int64(c.Value(i))
case *array.Int64:
return c.Value(i)
case *array.Uint8:
return int64(c.Value(i))
case *array.Uint16:
return int64(c.Value(i))
case *array.Uint32:
return int64(c.Value(i))
case *array.Uint64:
return int64(c.Value(i))
case *array.Float32:
return float64(c.Value(i))
case *array.Float64:
return c.Value(i)
case *array.Boolean:
if c.Value(i) {
return int64(1)
}
return int64(0)
case *array.Binary:
return c.Value(i)
case *array.LargeBinary:
return c.Value(i)
default:
// For complex types (timestamps, lists, maps, structs) fall back
// to the string representation. Add explicit cases as needed.
return col.ValueStr(i)
}
}CloudQuery also uses custom Arrow extension types (UUID, MAC address, Inet, etc.) on top of the standard types. The default case above handles them via ValueStr. For production use, see the official write.go for the complete mapping.
Try It
Rebuild and run the sync again:
go build -o cq-destination-sqlite . && ./cq-destination-sqlite serve &
cloudquery sync config.yamlVerify rows landed:
# Check total row count — should be 3,000+ comics
sqlite3 test.db "SELECT COUNT(*) FROM xkcd_comics;"
# Spot-check the most recent entries
sqlite3 test.db "SELECT num, title FROM xkcd_comics ORDER BY num DESC LIMIT 5;"Expected output:
3228
3228|Sunscreen Misconceptions
3227|Research Ethics
3226|Deepest Lake
3225|Sitting Room
3224|LosslessYour destination is now fully functional for the most common case: append and overwrite write modes.
Advanced: Schema Migration
The MigrateTables implementation in Step 2 creates tables but doesn’t handle changes to an existing schema. If a new column is added to a source, or you change a column type, the destination needs to either add the column safely or force-recreate the table.
Replace the entire contents of client/migrate.go with this full implementation:
package client
import (
"context"
"fmt"
"strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/schema"
)
func (c *Client) MigrateTables(ctx context.Context, msgs message.WriteMigrateTables) error {
for _, msg := range msgs {
table := msg.Table
c.logger.Info().Str("table", table.Name).Msg("Migrating table")
existing, err := c.getExistingColumns(table.Name)
if err != nil {
return err
}
if existing == nil {
// Table doesn't exist — create it.
if err := c.createTable(ctx, table); err != nil {
return err
}
continue
}
// Table exists. Compute what changed.
changes := existing.GetChanges(table)
if len(changes) == 0 {
continue // Nothing to do.
}
if msg.MigrateForce {
// Force mode: drop and recreate.
if err := c.recreateTable(ctx, table); err != nil {
return err
}
continue
}
// Safe mode: only apply additive, non-breaking changes.
if !canAutoMigrate(changes) {
return fmt.Errorf("cannot safely migrate table %s: breaking changes detected. "+
"Use migrate_mode: forced to allow destructive migrations.\n%s",
table.Name, schema.GetChangesSummary(map[string][]schema.TableColumnChange{table.Name: changes}))
}
if err := c.autoMigrate(ctx, table, changes); err != nil {
return err
}
}
return nil
}
// canAutoMigrate returns true if all changes can be applied without data loss.
func canAutoMigrate(changes []schema.TableColumnChange) bool {
for _, change := range changes {
switch change.Type {
case schema.TableColumnChangeTypeAdd:
if change.Current.PrimaryKey || change.Current.NotNull {
return false
}
case schema.TableColumnChangeTypeRemove:
if change.Previous.PrimaryKey || change.Previous.NotNull {
return false
}
case schema.TableColumnChangeTypeRemoveUniqueConstraint:
// Safe.
default:
return false
}
}
return true
}
func (c *Client) recreateTable(ctx context.Context, table *schema.Table) error {
if _, err := c.db.ExecContext(ctx, `DROP TABLE IF EXISTS "`+table.Name+`"`); err != nil {
return fmt.Errorf("failed to drop table %s: %w", table.Name, err)
}
return c.createTable(ctx, table)
}
func (c *Client) autoMigrate(ctx context.Context, table *schema.Table, changes []schema.TableColumnChange) error {
for _, change := range changes {
if change.Type == schema.TableColumnChangeTypeAdd {
sql := fmt.Sprintf(`ALTER TABLE "%s" ADD COLUMN "%s" %s`,
table.Name, change.Current.Name, arrowTypeToSQLite(change.Current.Type))
if _, err := c.db.ExecContext(ctx, sql); err != nil {
return err
}
}
}
return nil
}
// getExistingColumns reads the live schema from SQLite using PRAGMA table_info.
// Returns nil if the table does not exist.
func (c *Client) getExistingColumns(tableName string) (*schema.Table, error) {
rows, err := c.db.Query(fmt.Sprintf("PRAGMA table_info('%s')", tableName))
if err != nil {
return nil, err
}
defer rows.Close()
var columns []schema.Column
for rows.Next() {
var cid int
var name, typ string
var notNull bool
var dflt any
var pk int
if err := rows.Scan(&cid, &name, &typ, ¬Null, &dflt, &pk); err != nil {
return nil, err
}
columns = append(columns, schema.Column{
Name: name,
Type: sqliteTypeToArrow(strings.ToLower(typ)),
NotNull: notNull,
PrimaryKey: pk != 0,
})
}
if err := rows.Err(); err != nil {
return nil, err
}
if len(columns) == 0 {
return nil, nil // Table does not exist.
}
return &schema.Table{Name: tableName, Columns: columns}, nil
}
// sqliteTypeToArrow maps PRAGMA type strings back to Arrow types.
func sqliteTypeToArrow(typ string) arrow.DataType {
switch typ {
case "integer":
return arrow.PrimitiveTypes.Int64
case "real":
return arrow.PrimitiveTypes.Float64
case "blob":
return arrow.BinaryTypes.Binary
case "timestamp":
return arrow.FixedWidthTypes.Timestamp_us
default:
return arrow.BinaryTypes.String
}
}
// createTable and arrowTypeToSQLite remain unchanged from Step 2.The official SQLite destination has fuller type handling — it normalizes Arrow types before comparison and handles CloudQuery-specific extension types. See migrate.go for the complete implementation.
Try It
Rebuild with the updated migrate.go and run a sync:
go build -o cq-destination-sqlite . && ./cq-destination-sqlite serve &
cloudquery sync config.yamlInspect the table schema to confirm all columns were created correctly:
sqlite3 test.db ".schema xkcd_comics"You should see all source columns plus the CloudQuery system columns (_cq_id, _cq_source_name, _cq_sync_time) with a composite primary key constraint:
CREATE TABLE "xkcd_comics" (
"_cq_id" TEXT NOT NULL,
"_cq_source_name" TEXT NOT NULL,
"_cq_sync_time" TIMESTAMP NOT NULL,
"num" INTEGER,
"title" TEXT,
"year" TEXT,
...
CONSTRAINT "xkcd_comics_cqpk" PRIMARY KEY ("_cq_id")
);Verify safe migration mode: Delete the database file and sync again to create a fresh table. Then sync a second time without deleting — no error means the “no changes” path is working:
rm test.db
cloudquery sync config.yaml # creates the table
cloudquery sync config.yaml # should complete without error (no schema changes)Verify forced migration: Add migrate_mode: forced to your destination spec, then sync. This triggers a drop-and-recreate even when no changes are needed — useful to confirm the force path runs without errors:
kind: destination
spec:
name: my-sqlite
path: localhost:7777
registry: grpc
migrate_mode: forced
spec:
connection_string: ./test.dbcloudquery sync config.yaml # drops and recreates xkcd_comics
sqlite3 test.db "SELECT COUNT(*) FROM xkcd_comics;" # rows are backAdvanced: Delete Stale Data
When a user configures write_mode: overwrite-delete-stale, the CloudQuery engine sends WriteDeleteStale messages after rows have been written. These identify rows that were present in a previous sync but are no longer in the source — the destination should delete them.
Create client/deletestale.go and update client/write.go to remove the stub:
package client
import (
"context"
"fmt"
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/schema"
)
func (c *Client) DeleteStale(ctx context.Context, msgs message.WriteDeleteStales) error {
for _, msg := range msgs {
sql := fmt.Sprintf(
`DELETE FROM "%s" WHERE "%s" = $1 AND datetime(%s) < datetime($2)`,
msg.TableName,
schema.CqSourceNameColumn.Name, // "_cq_source_name"
schema.CqSyncTimeColumn.Name, // "_cq_sync_time"
)
if _, err := c.db.ExecContext(ctx, sql, msg.SourceName, msg.SyncTime); err != nil {
return fmt.Errorf("failed to delete stale rows from %s: %w", msg.TableName, err)
}
}
return nil
}The _cq_source_name and _cq_sync_time columns are added automatically to every CloudQuery table by the SDK. This query deletes rows from the current source whose sync time is older than the current sync — anything that no longer exists upstream.
Now remove the DeleteStale stub from client/write.go. If you leave it in, Go will fail with “method redeclared” because DeleteStale is now defined in two files. The end of client/write.go should have only DeleteRecord:
// ... (WriteTableBatch, insertRows, buildInsert, buildUpsert, buildSQL,
// recordToRows, arrowValueAt — all unchanged)
// DeleteRecord is the only method that stays in write.go.
// DeleteStale has moved to deletestale.go.
func (c *Client) DeleteRecord(_ context.Context, _ message.WriteDeleteRecords) error {
return fmt.Errorf("DeleteRecord is not supported by this destination")
}Try It
Update your config to use overwrite-delete-stale mode. This tells the engine to send WriteDeleteStale messages after each sync:
kind: source
spec:
name: xkcd
path: cloudquery/xkcd
registry: cloudquery
version: "v1.5.38"
tables: ["*"]
destinations:
- my-sqlite
---
kind: destination
spec:
name: my-sqlite
registry: grpc
path: localhost:7777
write_mode: overwrite-delete-stale
spec:
connection_string: ./test.dbRebuild and sync twice:
go build -o cq-destination-sqlite . && ./cq-destination-sqlite serve &
# First sync — populates the table
cloudquery sync config.yaml
sqlite3 test.db "SELECT COUNT(*) FROM xkcd_comics;"
# → 3228 (or however many exist today)
# Second sync — overwrites existing rows and deletes any that are no longer in the source
cloudquery sync config.yaml
sqlite3 test.db "SELECT COUNT(*) FROM xkcd_comics;"
# → 3228 (same count — rows were overwritten, not duplicated or deleted)The count should be identical after both syncs. If rows were duplicating instead, you’d be missing DeleteStale. If rows were disappearing, your datetime() comparison in the SQL query is off.
To confirm _cq_sync_time is being updated on each sync (which is what makes deletion work correctly):
sqlite3 test.db "SELECT num, _cq_sync_time FROM xkcd_comics ORDER BY num LIMIT 3;"After the second sync, _cq_sync_time should be a more recent timestamp than after the first sync.
Write Modes
Your destination now supports all three write modes:
write_mode | Messages your destination receives |
|---|---|
append | WriteMigrateTable, WriteInsert (plain INSERT) |
overwrite | WriteMigrateTable, WriteInsert (INSERT OR REPLACE on tables with primary keys) |
overwrite-delete-stale | WriteMigrateTable, WriteInsert, WriteDeleteStale |
Your destination does not read write_mode directly. The engine controls which messages it sends. The behavior is determined by how you handle WriteInsert (INSERT vs INSERT OR REPLACE) and whether DeleteStale does anything.
Testing
The SDK ships a write test suite that exercises MigrateTables, WriteTableBatch, DeleteStale, and schema migration scenarios — no test cases to write yourself.
package client
import (
"context"
"encoding/json"
"testing"
"github.com/cloudquery/plugin-sdk/v4/plugin"
)
func TestPlugin(t *testing.T) {
ctx := context.Background()
p := plugin.NewPlugin("sqlite", "development", New)
spec := Spec{
ConnectionString: ":memory:",
}
specBytes, err := json.Marshal(spec)
if err != nil {
t.Fatal(err)
}
if err := p.Init(ctx, specBytes, plugin.NewClientOptions{}); err != nil {
t.Fatal(err)
}
plugin.TestWriterSuiteRunner(t,
p,
plugin.WriterTestSuiteTests{
SafeMigrations: plugin.SafeMigrations{
AddColumn: true,
RemoveColumn: true,
},
})
}go test ./client/...TestWriterSuiteRunner creates tables, inserts rows in all three write modes, applies schema migrations, and verifies correctness. The SafeMigrations block tells the suite which schema changes your destination handles without data loss — set these to match your canAutoMigrate logic.
This pattern is taken directly from the official SQLite destination test.
Troubleshooting
“Integration server listening” never appears after ./cq-destination-sqlite serve
The binary didn’t build correctly, or the previous go build succeeded but the binary wasn’t updated. Run go build ./... and check for compile errors. A common cause is a missing method — if your struct doesn’t implement all of batchwriter.Client (MigrateTables, WriteTableBatch, DeleteStale, DeleteRecord), the build will fail with a type error.
cloudquery sync fails with connection refused
Your destination isn’t running, or the port in your config doesn’t match the one it’s listening on. Make sure ./cq-destination-sqlite serve is running in a separate terminal, and that the path in your YAML (localhost:7777) matches the address shown in the server output.
failed to unmarshal spec error on sync
The spec block in your YAML has a field name mismatch or indentation problem. Check that the keys match the JSON field names in your Spec struct exactly (e.g., connection_string, not connectionString).
failed to open database error on sync
The directory in your connection_string path doesn’t exist. SQLite can create the file but not the parent directory. Create the directory first, or use a path that already exists (e.g., ./test.db in the current directory).
Sync completes successfully but the database is empty
You’re missing the writer.Flush(ctx) call at the end of Write. The batchwriter holds rows in memory until a batch fills or the flush is called explicitly. Without Flush, the last partial batch is never written.
cannot safely migrate table: breaking changes detected
A column was removed, renamed, or had its type changed between syncs. In safe migration mode, only additive changes (new optional columns) are allowed. Either:
- Add
migrate_mode: forcedto your destination spec to allow destructive migrations - Delete the database file and start fresh
- Restore the previous schema
Rows appear duplicated after multiple syncs
Your table has no primary keys, so the destination uses plain INSERT on every sync. This is correct for append mode. If you want deduplication, make sure your source table defines primary keys — the destination will automatically switch to INSERT OR REPLACE.
Common Pitfalls
Avoid these common mistakes:
- Always call
writer.Flush(ctx)at the end ofWrite. The batchwriter holds buffered rows until a batch fills or a timeout fires. If you skipFlush, the last batch may never be written. - Wrap writes in a transaction. Without a transaction, each row is its own implicit transaction — extremely slow for large datasets. See
WriteTableBatchabove. - Always rollback on error. Use
deferwith an error check as shown. Skipping rollback can leave partially-written batches in the database. - Handle
nilfor NULL values. Checkcol.IsNull(i)and passnilas the SQL parameter — not an empty string or zero. INSERT OR REPLACEis a delete + insert, not an update. It resets all columns, not only the conflicting ones. This is correct for CloudQuery’s use case (full resource snapshots) but worth understanding if you expect partial updates.
Next Steps
- Publish to the Hub — make your destination available to others
- Go Source Guide — build a source integration that produces the data your destination consumes
- Running Locally — detailed guide on local registry, gRPC, and Docker modes
- Performance Tuning — batch size, concurrency, and write optimization
- PostgreSQL Destination — a more complex example using
mixedbatchwriterwith bulk insert support
Real-World Examples
- SQLite Destination — the official reference this guide is based on (
batchwriter, CGO-based) - PostgreSQL Destination — uses
mixedbatchwriterfor bulk operations - BigQuery Destination — streaming insert pattern
- All destination integrations
Resources
- CloudQuery Community
- Go SDK Source Code (
plugin-sdk/v4) - batchwriter interface — the four methods your client must implement
- WriteMessage types — full list of message types your destination can receive
- Apache Arrow Go docs — Arrow type system reference