Stream Changes from PostgreSQL to Any Destination with Change Data Capture
In this blog we will talk about how to stream changes from PostgreSQL to any destination with CloudQuery.
Yevgeny Pats • Feb 20, 2023
One of the most powerful features of PostgreSQL is its ability to replicate data, and in this article we'll take a deep dive into the topic of logical replication (For running CloudQuery PostgreSQL CDC source plugin take a look at our docs).
CDC (Change-Data-Capture) is a set of design patterns in databases where one can track changes to the data and act upon it. In PostgreSQL, CDC can be achieved using the logical replication feature.
PostgreSQL supports two types of replication: physical replication and logical replication. Physical replication creates a complete copy of the data, including the underlying storage structure, whereas logical replication only replicates changes to the data, such as insertions, updates, and deletions of rows in tables. Logical replication provides a more flexible and efficient way to replicate data as it only replicates changes to the data, rather than the entire dataset. This can help to minimize the overhead of replication and reduce the amount of disk space required to store the replicated data.
In PostgreSQL, logical replication is achieved by replicating changes made to the data in the form of write-ahead log (
WAL) records. These records contain all the necessary information to recreate the changes that were made to the data, including the type of change (insert, update, or delete), the time the change was made, and the actual data that was changed.
The WAL records are written to disk on the master server, and a separate process, known as the WAL sender, sends the WAL records to the replica server. The replica server then applies the changes to its own copy of the data. This process happens in real-time, so the replica server's copy of the data is always up-to-date with the master server's copy.
CloudQuery PostgreSQL source plugin provides the ability to sync data and changes in real time from a PostgreSQL database, directly to any of the supported destinations, without the need for Kafka or any other intermediary. (You can still stream to Kafka or Google PubSub.) This gives a much simpler setup to operate: just run one CloudQuery binary!
In the following section, we will go through how our CloudQuery PostgreSQL source plugin works internally. (For running and configuring, refer to the docs.)
To use logical replication, you will need to make sure you started your database with
See our documentation on how to set
wal_levelto logical in various environments.
CloudQuery creates a publication for all or a subset of the specified tables via the
CREATE PUBLICATIONcommand. This publication will then be used in the next step when we set up logical replication.
Now we are ready to start replicating. CloudQuery creates a replication slot and exports a snapshot via
CREATE_REPLICATION_SLOT slot_name LOGICAL pgoutput EXPORT_SNAPSHOT.
The above command creates two things:
- Replication slot name: We will use it when we start replicating and subscribing to changes
- Snapshot name: We will use this snapshot to replicate the initial content of the database at the exact point in time before the replication started.
Via the snapshot in the previous step, CloudQuery will extract all rows from all the specified tables, transform them into CloudQuery's type system, and send them to the specified CloudQuery destination.
Now that the initial snapshot of the database is synced, we can start sending the changes we receive from the logical replication slot.
We start listening to the replication changes via the START_REPLICATION command. This is a low-level command of the streaming replication protocol, so it's not really possible to start it via
psql, and we use
For each received message, we parse it per the protocol spec defined here and transform it to CloudQuery's type system to be sent to the destination.
Every X messages that are sent to the destination, we need to send back to the Postgres the last position that was received so it will know not to resend that data again. This is needed in case the process is stopped, so we can resume it from the exact same location.
We send the acknowledgement position every X messages or Y timeout to avoid bombarding Postgres with messages and basically batch our
ackcommand is sent via the Status Update Message.
That's it! It's quite a lot of PostgreSQL internals, so we hope you enjoyed it. The good news is that, by using CloudQuery source plugin, all those internals are pretty much abstracted away so you can focus on just what you want to replicate and where to replicate it!