How Does an Enterprise Changefeed Work?

On this page Carat arrow pointing down

When an Enterprise changefeed is started on a node, that node becomes the coordinator for the changefeed job (Node 2 in the diagram). The coordinator node acts as an administrator: keeping track of all other nodes during job execution and the changefeed work as it completes. The changefeed job will run across all nodes in the cluster to access changed data in the watched table. Typically, the leaseholder for a particular range (or the range’s replica) determines which node emits the changefeed data.

Note:

New in v23.1.23: You can enable the changefeed.random_replica_selection.enabled cluster setting to change the way in which a changefeed distributes work across the cluster. With changefeed.random_replica_selection.enabled set to true, the job will evenly distribute changefeed work across the cluster by assigning it to any replica for a particular range. For changefeed.random_replica_selection.enabled to take effect on changefeed jobs, ensure you enable the cluster setting and then pause and resume existing changefeeds.

Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed may re-emit messages starting at the high-water mark time to the current time. Refer to Ordering Guarantees for detail on CockroachDB's at-least-once-delivery-guarantee and how per-key message ordering is applied.

Changefeed process in a 3-node cluster

With resolved specified when a changefeed is started, the coordinator will send the resolved timestamp (i.e., the high-water mark) to each endpoint in the sink. For example, when using Kafka this will be sent as a message to each partition; for cloud storage, this will be emitted as a resolved timestamp file.

As rows are updated, added, and deleted in the targeted table(s), the node sends the row changes through the rangefeed mechanism to the changefeed encoder, which encodes these changes into the final message format. The message is emitted from the encoder to the sink—it can emit to any endpoint in the sink. In the diagram example, this means that the messages can emit to any Kafka Broker.

Note:

When you create a changefeed using change data capture queries, the optimizer will evaluate and optimize the query before the job sends the selected row changes to the changefeed encoder. The optimizer can restrict the rows a changefeed job considers during table scans, initial scans, and catch-up scans, which can make these more efficient using CDC queries.

If you are running changefeeds from a multi-region cluster, you may want to define which nodes take part in running the changefeed job. You can use the execution_locality option with key-value pairs to specify the locality requirements nodes must meet. See Job coordination using the execution locality option for detail on how a changefeed job works with this option.

See the following for more detail on changefeed setup and use:


Yes No
On this page

Yes No