Skip to Content

Creating a New Destination Integration in Go

This guide builds a SQLite destination integration from scratch in stages. At each stage you’ll have something you can actually run — starting with a gRPC server that serves but does nothing, then adding the ability to create tables, then write rows, then handle schema changes.

The same patterns apply to any destination backend: PostgreSQL, BigQuery, a custom database, or an external API that accepts writes. The official SQLite destination follows this exact structure and is a good reference once you’ve finished this guide.

Before starting, review core concepts and the Creating a New Integration overview. The terminology (tables, syncs, write modes) is introduced there.

Prerequisites:

  • Go 1.21+ installed (Go Tutorial)
  • CloudQuery CLI installed
  • No CGO or C compiler required for this guide

How Destinations Differ from Sources

A source integration produces data — it fetches records from a third-party API and sends them to CloudQuery. A destination integration consumes data — it receives a stream of write messages from the CloudQuery engine and stores them somewhere.

When a user runs cloudquery sync, data flows into your destination like this:

cloudquery sync [CloudQuery Engine] ──▶ Write(ctx, res <-chan WriteMessage) [batchwriter] ← SDK helper: buffers and routes ┌─────────┼──────────────────┐ ▼ ▼ ▼ MigrateTables() WriteTableBatch() DeleteStale() before rows batched rows after rows │ │ │ └────┬────┴───────────────────┘ [Your Database]

Your destination receives one call to Write. You pass that channel straight to the SDK’s batchwriter helper, which takes care of buffering and routing. It calls three methods on your client:

MethodWhen calledWhat it does
MigrateTablesBefore rows for any tableCreate or update the table schema
WriteTableBatchWhen a batch of rows is readyInsert or upsert the rows
DeleteStaleAfter rows, in overwrite-delete-stale modeRemove rows older than this sync

You’ll implement these one at a time, building up the destination’s capabilities as you go.

Project Structure

cq-destination-sqlite/ ├── main.go ├── go.mod ├── resources/ │ └── plugin/ │ └── plugin.go # Name, Kind, Team, Version constants └── client/ ├── client.go # Client struct, constructor, Write ├── spec.go # Configuration spec ├── migrate.go # MigrateTables ├── write.go # WriteTableBatch, DeleteRecord └── deletestale.go # DeleteStale

Scaffold the Integration

There is no cq-scaffold equivalent for destination integrations — you’ll create the files manually. There aren’t many of them, and this guide walks through each one.

Build the minimum needed to get a gRPC server running. By the end of this section you can start your destination and CloudQuery can connect to it — even though it won’t create tables or write data yet.

Integration Constants

resources/plugin/plugin.go
package plugin var ( Name = "sqlite" Kind = "destination" Team = "your-team" Version = "development" )

Kind must be "destination". These values appear in log output and on the CloudQuery Hub.

Configuration Spec

client/spec.go
package client import "fmt" const ( defaultBatchSize = int64(10_000) defaultBatchSizeBytes = int64(10 * 1024 * 1024) // 10 MiB ) // Spec holds the user-provided configuration for this destination. type Spec struct { // Path to the SQLite file, e.g. "./mydb.db" ConnectionString string `json:"connection_string"` // Maximum rows per write batch. Default: 10,000. BatchSize int64 `json:"batch_size,omitempty"` // Maximum bytes per write batch. Default: 10 MiB. BatchSizeBytes int64 `json:"batch_size_bytes,omitempty"` } func (s *Spec) SetDefaults() { if s.BatchSize == 0 { s.BatchSize = defaultBatchSize } if s.BatchSizeBytes == 0 { s.BatchSizeBytes = defaultBatchSizeBytes } } func (s *Spec) Validate() error { if s.ConnectionString == "" { return fmt.Errorf("connection_string is required") } return nil }

A user’s destination YAML block looks like:

kind: destination spec: name: my-sqlite registry: grpc path: localhost:7777 spec: connection_string: ./cloudquery.db

Client

The Client struct holds your database connection and the batchwriter instance. Write delegates to the batchwriter and doesn’t change as you add features.

client/client.go
package client import ( "context" "database/sql" "encoding/json" "fmt" "github.com/apache/arrow-go/v18/arrow" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/writers/batchwriter" "github.com/rs/zerolog" // Pure-Go SQLite driver — no C compiler required. // Registers the "sqlite" driver name with database/sql. _ "modernc.org/sqlite" ) type Client struct { plugin.UnimplementedSource db *sql.DB logger zerolog.Logger spec Spec writer *batchwriter.BatchWriter } // Read satisfies the plugin.Client interface. Return an error if your // destination doesn't need to support reading rows back. func (c *Client) Read(_ context.Context, _ *schema.Table, _ chan<- arrow.RecordBatch) error { return fmt.Errorf("read not supported by this destination") } // New is the constructor passed to plugin.NewPlugin. func New(_ context.Context, logger zerolog.Logger, specBytes []byte, _ plugin.NewClientOptions) (plugin.Client, error) { c := &Client{ logger: logger.With().Str("module", "sqlite-dest").Logger(), } if err := json.Unmarshal(specBytes, &c.spec); err != nil { return nil, fmt.Errorf("failed to unmarshal spec: %w", err) } c.spec.SetDefaults() if err := c.spec.Validate(); err != nil { return nil, err } var err error c.writer, err = batchwriter.New(c, batchwriter.WithLogger(c.logger), batchwriter.WithBatchSize(c.spec.BatchSize), batchwriter.WithBatchSizeBytes(c.spec.BatchSizeBytes), ) if err != nil { return nil, fmt.Errorf("failed to create batchwriter: %w", err) } // modernc.org/sqlite uses the driver name "sqlite" (vs "sqlite3" for mattn/go-sqlite3). c.db, err = sql.Open("sqlite", c.spec.ConnectionString) if err != nil { return nil, fmt.Errorf("failed to open database: %w", err) } return c, nil } func (c *Client) Close(_ context.Context) error { if c.db == nil { return fmt.Errorf("client already closed") } err := c.db.Close() c.db = nil return err } // Write is called once per sync. It passes the message channel to the // batchwriter, which routes messages to MigrateTables, WriteTableBatch, // and DeleteStale. Always call Flush — the batchwriter may hold buffered // rows that haven't been flushed yet. func (c *Client) Write(ctx context.Context, res <-chan message.WriteMessage) error { if err := c.writer.Write(ctx, res); err != nil { return fmt.Errorf("failed to write: %w", err) } if err := c.writer.Flush(ctx); err != nil { return fmt.Errorf("failed to flush: %w", err) } return nil }

Why modernc.org/sqlite? The official CloudQuery SQLite destination uses github.com/mattn/go-sqlite3, which requires a C compiler. This guide uses modernc.org/sqlite — a pure-Go port that compiles with a standard go build on any platform. The API is identical; only the import path and driver name differ.

Starter Stubs

Create these two files now. They give the batchwriter valid methods to call so the whole thing compiles. You’ll replace them with real implementations in the next steps.

client/migrate.go
package client import ( "context" "github.com/cloudquery/plugin-sdk/v4/message" ) func (c *Client) MigrateTables(_ context.Context, _ message.WriteMigrateTables) error { return nil // Step 2: create tables }
client/write.go
package client import ( "context" "fmt" "github.com/cloudquery/plugin-sdk/v4/message" ) func (c *Client) WriteTableBatch(_ context.Context, _ string, _ message.WriteInserts) error { return nil // Step 3: write rows } func (c *Client) DeleteStale(_ context.Context, _ message.WriteDeleteStales) error { return nil // Step 4: delete stale rows } func (c *Client) DeleteRecord(_ context.Context, _ message.WriteDeleteRecords) error { return fmt.Errorf("DeleteRecord is not supported by this destination") }

Entry Point

main.go
package main import ( "context" "log" "github.com/cloudquery/<your-org>/cq-destination-sqlite/client" internalPlugin "github.com/cloudquery/<your-org>/cq-destination-sqlite/resources/plugin" "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/cloudquery/plugin-sdk/v4/serve" ) func main() { p := plugin.NewPlugin( internalPlugin.Name, internalPlugin.Version, client.New, plugin.WithKind(internalPlugin.Kind), // "destination" plugin.WithTeam(internalPlugin.Team), ) if err := serve.Plugin(p, serve.WithDestinationV0V1Server()).Serve(context.Background()); err != nil { log.Fatalf("failed to serve plugin: %v", err) } }

serve.WithDestinationV0V1Server() adds gRPC endpoints compatible with older source integration protocol versions. Include it in every destination for broad compatibility.

go.mod

go.mod
module github.com/<your-org>/cq-destination-sqlite go 1.21 require ( github.com/apache/arrow-go/v18 v18.0.0 github.com/cloudquery/plugin-sdk/v4 v4.0.0 github.com/rs/zerolog v1.33.0 modernc.org/sqlite v1.34.0 )

The versions above are placeholders. Fetch the latest before running:

go get github.com/cloudquery/plugin-sdk/v4@latest go get modernc.org/sqlite@latest go mod tidy

Run It

go mod tidy go build -o cq-destination-sqlite . ./cq-destination-sqlite serve

You should see:

Plugin server listening address=127.0.0.1:7777 plugin=your-team/destination/sqlite@development

Your destination is running. CloudQuery can connect to it. It won’t create tables or write data yet — that’s what the next two sections add.

Create Tables

Implement MigrateTables so your destination creates a table the first time rows arrive for it.

MigrateTables is called once per table before any rows are written. At a minimum, it needs to create the table if it doesn’t exist. Replace client/migrate.go with this:

client/migrate.go
package client import ( "context" "fmt" "strings" "github.com/apache/arrow-go/v18/arrow" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" ) func (c *Client) MigrateTables(ctx context.Context, msgs message.WriteMigrateTables) error { for _, msg := range msgs { table := msg.Table c.logger.Info().Str("table", table.Name).Msg("Migrating table") exists, err := c.tableExists(table.Name) if err != nil { return err } if !exists { if err := c.createTable(ctx, table); err != nil { return err } } } return nil } func (c *Client) tableExists(tableName string) (bool, error) { rows, err := c.db.Query(fmt.Sprintf("PRAGMA table_info('%s')", tableName)) if err != nil { return false, err } defer rows.Close() return rows.Next(), rows.Err() } func (c *Client) createTable(ctx context.Context, table *schema.Table) error { var sb strings.Builder sb.WriteString(`CREATE TABLE IF NOT EXISTS "`) sb.WriteString(table.Name) sb.WriteString(`" (`) var primaryKeys []string for i, col := range table.Columns { sb.WriteString(`"` + col.Name + `" `) sb.WriteString(arrowTypeToSQLite(col.Type)) if col.NotNull { sb.WriteString(" NOT NULL") } if i < len(table.Columns)-1 { sb.WriteString(", ") } if col.PrimaryKey { primaryKeys = append(primaryKeys, `"`+col.Name+`"`) } } // Composite primary key — SQLite requires this syntax when there are multiple keys. if len(primaryKeys) > 0 { sb.WriteString(`, CONSTRAINT "`) sb.WriteString(table.Name) sb.WriteString(`_cqpk" PRIMARY KEY (`) sb.WriteString(strings.Join(primaryKeys, ", ")) sb.WriteString(")") } sb.WriteString(")") _, err := c.db.ExecContext(ctx, sb.String()) return err } // arrowTypeToSQLite maps an Arrow DataType to a SQLite storage class. // SQLite has four storage classes: TEXT, INTEGER, REAL, BLOB. func arrowTypeToSQLite(t arrow.DataType) string { switch t.ID() { case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64, arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64, arrow.BOOL: return "INTEGER" case arrow.FLOAT32, arrow.FLOAT64: return "REAL" case arrow.BINARY, arrow.LARGE_BINARY: return "BLOB" case arrow.TIMESTAMP, arrow.DATE32, arrow.DATE64, arrow.TIME32, arrow.TIME64: return "TIMESTAMP" default: // UTF8, LARGE_UTF8, lists, structs, maps → TEXT return "TEXT" } }

Try It

Start your destination, then run a sync against any source. Here’s a config using the public xkcd source (no API key needed):

config.yaml
kind: source spec: name: xkcd path: cloudquery/xkcd registry: cloudquery version: "v1.5.38" tables: ["*"] destinations: - my-sqlite --- kind: destination spec: name: my-sqlite registry: grpc path: localhost:7777 spec: connection_string: ./test.db
./cq-destination-sqlite serve & cloudquery sync config.yaml sqlite3 test.db ".tables" # → xkcd_comics

The table exists. No rows yet — WriteTableBatch still returns nil without writing anything. That’s what the next section adds.

Write Rows

Implement WriteTableBatch so rows actually land in the database.

When the batchwriter has accumulated enough rows for a table (up to batch_size), it calls WriteTableBatch with all the accumulated WriteInsert messages. You get the table name and the messages — each message contains an Apache Arrow RecordBatch (a columnar block of rows).

Replace client/write.go with this:

client/write.go
package client import ( "context" "database/sql" "fmt" "strings" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" ) func (c *Client) WriteTableBatch(ctx context.Context, name string, msgs message.WriteInserts) error { if len(msgs) == 0 { return nil } // Wrap all inserts in a single transaction — much faster than one per row. tx, err := c.db.BeginTx(ctx, nil) if err != nil { return err } defer func() { if err != nil { if rbErr := tx.Rollback(); rbErr != nil { c.logger.Error().Err(rbErr).Str("table", name).Msg("rollback failed") } } }() for _, msg := range msgs { if err = c.insertRows(ctx, tx, msg); err != nil { return err } } return tx.Commit() } func (c *Client) insertRows(ctx context.Context, tx *sql.Tx, msg *message.WriteInsert) error { table := msg.GetTable() sc := msg.Record.Schema() // Tables with primary keys use INSERT OR REPLACE (upsert). // Tables without primary keys use plain INSERT (append). var sqlStr string if len(table.PrimaryKeys()) == 0 { sqlStr = buildInsert(sc) } else { sqlStr = buildUpsert(sc) } for _, row := range recordToRows(msg.Record) { if _, err := tx.ExecContext(ctx, sqlStr, row...); err != nil { return fmt.Errorf("insert failed for table %s: %w", table.Name, err) } } return nil } func buildInsert(sc *arrow.Schema) string { return buildSQL("insert into", sc) } // buildUpsert uses INSERT OR REPLACE, SQLite's upsert syntax. // On a primary key conflict it deletes the old row and inserts the new one. func buildUpsert(sc *arrow.Schema) string { return buildSQL("insert or replace into", sc) } func buildSQL(verb string, sc *arrow.Schema) string { tableName, _ := sc.Metadata().GetValue(schema.MetadataTableName) var sb strings.Builder sb.WriteString(verb + ` "` + tableName + `" (`) fields := sc.Fields() for i, f := range fields { sb.WriteString(`"` + f.Name + `"`) if i < len(fields)-1 { sb.WriteString(", ") } } sb.WriteString(") VALUES (") for i := range fields { sb.WriteString(fmt.Sprintf("$%d", i+1)) if i < len(fields)-1 { sb.WriteString(", ") } } sb.WriteString(")") return sb.String() } func (c *Client) DeleteStale(_ context.Context, _ message.WriteDeleteStales) error { return nil // Step 4: implement stale data deletion } func (c *Client) DeleteRecord(_ context.Context, _ message.WriteDeleteRecords) error { return fmt.Errorf("DeleteRecord is not supported by this destination") }

Arrow Records to Rows

CloudQuery passes row data as Apache Arrow RecordBatch values — a columnar in-memory format. To call sql.ExecContext, you need to convert from columnar Arrow format to row-oriented []any slices.

Add these helpers at the bottom of client/write.go:

client/write.go
// recordToRows converts a columnar Arrow RecordBatch into row-oriented slices // for sql.ExecContext. func recordToRows(rec arrow.RecordBatch) [][]any { numRows := int(rec.NumRows()) numCols := int(rec.NumCols()) rows := make([][]any, numRows) for i := range rows { rows[i] = make([]any, numCols) } for colIdx, col := range rec.Columns() { for rowIdx := 0; rowIdx < numRows; rowIdx++ { if col.IsNull(rowIdx) { rows[rowIdx][colIdx] = nil continue } rows[rowIdx][colIdx] = arrowValueAt(col, rowIdx) } } return rows } // arrowValueAt extracts a single value from an Arrow array as a Go-native type // suitable for use as a SQL parameter. func arrowValueAt(col arrow.Array, i int) any { switch c := col.(type) { case *array.String: return c.Value(i) case *array.LargeString: return c.Value(i) case *array.Int8: return int64(c.Value(i)) case *array.Int16: return int64(c.Value(i)) case *array.Int32: return int64(c.Value(i)) case *array.Int64: return c.Value(i) case *array.Uint8: return int64(c.Value(i)) case *array.Uint16: return int64(c.Value(i)) case *array.Uint32: return int64(c.Value(i)) case *array.Uint64: return int64(c.Value(i)) case *array.Float32: return float64(c.Value(i)) case *array.Float64: return c.Value(i) case *array.Boolean: if c.Value(i) { return int64(1) } return int64(0) case *array.Binary: return c.Value(i) case *array.LargeBinary: return c.Value(i) default: // For complex types (timestamps, lists, maps, structs) fall back // to the string representation. Add explicit cases as needed. return col.ValueStr(i) } }

CloudQuery also uses custom Arrow extension types (UUID, MAC address, Inet, etc.) on top of the standard types. The default case above handles them via ValueStr. For production use, see the official write.go for the complete mapping.

Try It

Rebuild and run the sync again:

go build -o cq-destination-sqlite . && ./cq-destination-sqlite serve & cloudquery sync config.yaml

Verify rows landed:

# Check total row count — should be 3,000+ comics sqlite3 test.db "SELECT COUNT(*) FROM xkcd_comics;" # Spot-check the most recent entries sqlite3 test.db "SELECT num, title FROM xkcd_comics ORDER BY num DESC LIMIT 5;"

Expected output:

3228 3228|Sunscreen Misconceptions 3227|Research Ethics 3226|Deepest Lake 3225|Sitting Room 3224|Lossless

Your destination is now fully functional for the most common case: append and overwrite write modes.

Advanced: Schema Migration

The MigrateTables implementation in Step 2 creates tables but doesn’t handle changes to an existing schema. If a new column is added to a source, or you change a column type, the destination needs to either add the column safely or force-recreate the table.

Replace the entire contents of client/migrate.go with this full implementation:

client/migrate.go
package client import ( "context" "fmt" "strings" "github.com/apache/arrow-go/v18/arrow" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" ) func (c *Client) MigrateTables(ctx context.Context, msgs message.WriteMigrateTables) error { for _, msg := range msgs { table := msg.Table c.logger.Info().Str("table", table.Name).Msg("Migrating table") existing, err := c.getExistingColumns(table.Name) if err != nil { return err } if existing == nil { // Table doesn't exist — create it. if err := c.createTable(ctx, table); err != nil { return err } continue } // Table exists. Compute what changed. changes := existing.GetChanges(table) if len(changes) == 0 { continue // Nothing to do. } if msg.MigrateForce { // Force mode: drop and recreate. if err := c.recreateTable(ctx, table); err != nil { return err } continue } // Safe mode: only apply additive, non-breaking changes. if !canAutoMigrate(changes) { return fmt.Errorf("cannot safely migrate table %s: breaking changes detected. "+ "Use migrate_mode: forced to allow destructive migrations.\n%s", table.Name, schema.GetChangesSummary(map[string][]schema.TableColumnChange{table.Name: changes})) } if err := c.autoMigrate(ctx, table, changes); err != nil { return err } } return nil } // canAutoMigrate returns true if all changes can be applied without data loss. func canAutoMigrate(changes []schema.TableColumnChange) bool { for _, change := range changes { switch change.Type { case schema.TableColumnChangeTypeAdd: if change.Current.PrimaryKey || change.Current.NotNull { return false } case schema.TableColumnChangeTypeRemove: if change.Previous.PrimaryKey || change.Previous.NotNull { return false } case schema.TableColumnChangeTypeRemoveUniqueConstraint: // Safe. default: return false } } return true } func (c *Client) recreateTable(ctx context.Context, table *schema.Table) error { if _, err := c.db.ExecContext(ctx, `DROP TABLE IF EXISTS "`+table.Name+`"`); err != nil { return fmt.Errorf("failed to drop table %s: %w", table.Name, err) } return c.createTable(ctx, table) } func (c *Client) autoMigrate(ctx context.Context, table *schema.Table, changes []schema.TableColumnChange) error { for _, change := range changes { if change.Type == schema.TableColumnChangeTypeAdd { sql := fmt.Sprintf(`ALTER TABLE "%s" ADD COLUMN "%s" %s`, table.Name, change.Current.Name, arrowTypeToSQLite(change.Current.Type)) if _, err := c.db.ExecContext(ctx, sql); err != nil { return err } } } return nil } // getExistingColumns reads the live schema from SQLite using PRAGMA table_info. // Returns nil if the table does not exist. func (c *Client) getExistingColumns(tableName string) (*schema.Table, error) { rows, err := c.db.Query(fmt.Sprintf("PRAGMA table_info('%s')", tableName)) if err != nil { return nil, err } defer rows.Close() var columns []schema.Column for rows.Next() { var cid int var name, typ string var notNull bool var dflt any var pk int if err := rows.Scan(&cid, &name, &typ, &notNull, &dflt, &pk); err != nil { return nil, err } columns = append(columns, schema.Column{ Name: name, Type: sqliteTypeToArrow(strings.ToLower(typ)), NotNull: notNull, PrimaryKey: pk != 0, }) } if err := rows.Err(); err != nil { return nil, err } if len(columns) == 0 { return nil, nil // Table does not exist. } return &schema.Table{Name: tableName, Columns: columns}, nil } // sqliteTypeToArrow maps PRAGMA type strings back to Arrow types. func sqliteTypeToArrow(typ string) arrow.DataType { switch typ { case "integer": return arrow.PrimitiveTypes.Int64 case "real": return arrow.PrimitiveTypes.Float64 case "blob": return arrow.BinaryTypes.Binary case "timestamp": return arrow.FixedWidthTypes.Timestamp_us default: return arrow.BinaryTypes.String } } // createTable and arrowTypeToSQLite remain unchanged from Step 2.

The official SQLite destination has fuller type handling — it normalizes Arrow types before comparison and handles CloudQuery-specific extension types. See migrate.go for the complete implementation.

Try It

Rebuild with the updated migrate.go and run a sync:

go build -o cq-destination-sqlite . && ./cq-destination-sqlite serve & cloudquery sync config.yaml

Inspect the table schema to confirm all columns were created correctly:

sqlite3 test.db ".schema xkcd_comics"

You should see all source columns plus the CloudQuery system columns (_cq_id, _cq_source_name, _cq_sync_time) with a composite primary key constraint:

CREATE TABLE "xkcd_comics" ( "_cq_id" TEXT NOT NULL, "_cq_source_name" TEXT NOT NULL, "_cq_sync_time" TIMESTAMP NOT NULL, "num" INTEGER, "title" TEXT, "year" TEXT, ... CONSTRAINT "xkcd_comics_cqpk" PRIMARY KEY ("_cq_id") );

Verify safe migration mode: Delete the database file and sync again to create a fresh table. Then sync a second time without deleting — no error means the “no changes” path is working:

rm test.db cloudquery sync config.yaml # creates the table cloudquery sync config.yaml # should complete without error (no schema changes)

Verify forced migration: Add migrate_mode: forced to your destination spec, then sync. This triggers a drop-and-recreate even when no changes are needed — useful to confirm the force path runs without errors:

kind: destination spec: name: my-sqlite path: localhost:7777 registry: grpc migrate_mode: forced spec: connection_string: ./test.db
cloudquery sync config.yaml # drops and recreates xkcd_comics sqlite3 test.db "SELECT COUNT(*) FROM xkcd_comics;" # rows are back

Advanced: Delete Stale Data

When a user configures write_mode: overwrite-delete-stale, the CloudQuery engine sends WriteDeleteStale messages after rows have been written. These identify rows that were present in a previous sync but are no longer in the source — the destination should delete them.

Create client/deletestale.go and update client/write.go to remove the stub:

client/deletestale.go
package client import ( "context" "fmt" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" ) func (c *Client) DeleteStale(ctx context.Context, msgs message.WriteDeleteStales) error { for _, msg := range msgs { sql := fmt.Sprintf( `DELETE FROM "%s" WHERE "%s" = $1 AND datetime(%s) < datetime($2)`, msg.TableName, schema.CqSourceNameColumn.Name, // "_cq_source_name" schema.CqSyncTimeColumn.Name, // "_cq_sync_time" ) if _, err := c.db.ExecContext(ctx, sql, msg.SourceName, msg.SyncTime); err != nil { return fmt.Errorf("failed to delete stale rows from %s: %w", msg.TableName, err) } } return nil }

The _cq_source_name and _cq_sync_time columns are added automatically to every CloudQuery table by the SDK. This query deletes rows from the current source whose sync time is older than the current sync — anything that no longer exists upstream.

Now remove the DeleteStale stub from client/write.go. If you leave it in, Go will fail with “method redeclared” because DeleteStale is now defined in two files. The end of client/write.go should have only DeleteRecord:

client/write.go
// ... (WriteTableBatch, insertRows, buildInsert, buildUpsert, buildSQL, // recordToRows, arrowValueAt — all unchanged) // DeleteRecord is the only method that stays in write.go. // DeleteStale has moved to deletestale.go. func (c *Client) DeleteRecord(_ context.Context, _ message.WriteDeleteRecords) error { return fmt.Errorf("DeleteRecord is not supported by this destination") }

Try It

Update your config to use overwrite-delete-stale mode. This tells the engine to send WriteDeleteStale messages after each sync:

config.yaml
kind: source spec: name: xkcd path: cloudquery/xkcd registry: cloudquery version: "v1.5.38" tables: ["*"] destinations: - my-sqlite --- kind: destination spec: name: my-sqlite registry: grpc path: localhost:7777 write_mode: overwrite-delete-stale spec: connection_string: ./test.db

Rebuild and sync twice:

go build -o cq-destination-sqlite . && ./cq-destination-sqlite serve & # First sync — populates the table cloudquery sync config.yaml sqlite3 test.db "SELECT COUNT(*) FROM xkcd_comics;" # → 3228 (or however many exist today) # Second sync — overwrites existing rows and deletes any that are no longer in the source cloudquery sync config.yaml sqlite3 test.db "SELECT COUNT(*) FROM xkcd_comics;" # → 3228 (same count — rows were overwritten, not duplicated or deleted)

The count should be identical after both syncs. If rows were duplicating instead, you’d be missing DeleteStale. If rows were disappearing, your datetime() comparison in the SQL query is off.

To confirm _cq_sync_time is being updated on each sync (which is what makes deletion work correctly):

sqlite3 test.db "SELECT num, _cq_sync_time FROM xkcd_comics ORDER BY num LIMIT 3;"

After the second sync, _cq_sync_time should be a more recent timestamp than after the first sync.

Write Modes

Your destination now supports all three write modes:

write_modeMessages your destination receives
appendWriteMigrateTable, WriteInsert (plain INSERT)
overwriteWriteMigrateTable, WriteInsert (INSERT OR REPLACE on tables with primary keys)
overwrite-delete-staleWriteMigrateTable, WriteInsert, WriteDeleteStale

Your destination does not read write_mode directly. The engine controls which messages it sends. The behavior is determined by how you handle WriteInsert (INSERT vs INSERT OR REPLACE) and whether DeleteStale does anything.

Testing

The SDK ships a write test suite that exercises MigrateTables, WriteTableBatch, DeleteStale, and schema migration scenarios — no test cases to write yourself.

client/client_test.go
package client import ( "context" "encoding/json" "testing" "github.com/cloudquery/plugin-sdk/v4/plugin" ) func TestPlugin(t *testing.T) { ctx := context.Background() p := plugin.NewPlugin("sqlite", "development", New) spec := Spec{ ConnectionString: ":memory:", } specBytes, err := json.Marshal(spec) if err != nil { t.Fatal(err) } if err := p.Init(ctx, specBytes, plugin.NewClientOptions{}); err != nil { t.Fatal(err) } plugin.TestWriterSuiteRunner(t, p, plugin.WriterTestSuiteTests{ SafeMigrations: plugin.SafeMigrations{ AddColumn: true, RemoveColumn: true, }, }) }
go test ./client/...

TestWriterSuiteRunner creates tables, inserts rows in all three write modes, applies schema migrations, and verifies correctness. The SafeMigrations block tells the suite which schema changes your destination handles without data loss — set these to match your canAutoMigrate logic.

This pattern is taken directly from the official SQLite destination test.

Troubleshooting

“Integration server listening” never appears after ./cq-destination-sqlite serve

The binary didn’t build correctly, or the previous go build succeeded but the binary wasn’t updated. Run go build ./... and check for compile errors. A common cause is a missing method — if your struct doesn’t implement all of batchwriter.Client (MigrateTables, WriteTableBatch, DeleteStale, DeleteRecord), the build will fail with a type error.

cloudquery sync fails with connection refused

Your destination isn’t running, or the port in your config doesn’t match the one it’s listening on. Make sure ./cq-destination-sqlite serve is running in a separate terminal, and that the path in your YAML (localhost:7777) matches the address shown in the server output.

failed to unmarshal spec error on sync

The spec block in your YAML has a field name mismatch or indentation problem. Check that the keys match the JSON field names in your Spec struct exactly (e.g., connection_string, not connectionString).

failed to open database error on sync

The directory in your connection_string path doesn’t exist. SQLite can create the file but not the parent directory. Create the directory first, or use a path that already exists (e.g., ./test.db in the current directory).

Sync completes successfully but the database is empty

You’re missing the writer.Flush(ctx) call at the end of Write. The batchwriter holds rows in memory until a batch fills or the flush is called explicitly. Without Flush, the last partial batch is never written.

cannot safely migrate table: breaking changes detected

A column was removed, renamed, or had its type changed between syncs. In safe migration mode, only additive changes (new optional columns) are allowed. Either:

  • Add migrate_mode: forced to your destination spec to allow destructive migrations
  • Delete the database file and start fresh
  • Restore the previous schema

Rows appear duplicated after multiple syncs

Your table has no primary keys, so the destination uses plain INSERT on every sync. This is correct for append mode. If you want deduplication, make sure your source table defines primary keys — the destination will automatically switch to INSERT OR REPLACE.

Common Pitfalls

Avoid these common mistakes:

  • Always call writer.Flush(ctx) at the end of Write. The batchwriter holds buffered rows until a batch fills or a timeout fires. If you skip Flush, the last batch may never be written.
  • Wrap writes in a transaction. Without a transaction, each row is its own implicit transaction — extremely slow for large datasets. See WriteTableBatch above.
  • Always rollback on error. Use defer with an error check as shown. Skipping rollback can leave partially-written batches in the database.
  • Handle nil for NULL values. Check col.IsNull(i) and pass nil as the SQL parameter — not an empty string or zero.
  • INSERT OR REPLACE is a delete + insert, not an update. It resets all columns, not only the conflicting ones. This is correct for CloudQuery’s use case (full resource snapshots) but worth understanding if you expect partial updates.

Next Steps

Real-World Examples

Resources

Last updated on