The EXPERIMENTAL CHANGEFEED FOR
statement creates a new basic changefeed, which streams row-level changes to the client indefinitely until the underlying connection is closed or the changefeed is canceled. A basic changefeed can watch one table or multiple tables in a comma-separated list.
For more information, see Change Data Capture Overview.
This feature is in preview. This feature is subject to change. To share feedback and/or issues, contact Support.
Required privileges
In v22.2 and above, CockroachDB introduces a new system-level privilege model that provides more fine-grained control over a user's privileges to work with the cluster, including the ability to create and manage changefeeds.
There is continued support for the legacy privilege model for changefeeds in v23.1, however it will be removed in a future release of CockroachDB. We recommend implementing the new privilege model that follows in this section for all changefeeds.
To create a changefeed with EXPERIMENTAL CHANGEFEED FOR
, a user must have the SELECT
privilege on the changefeed's source tables.
You can grant a user the SELECT
privilege to allow them to create basic changefeeds on a specific table:
GRANT SELECT ON TABLE example_table TO user;
Legacy privilege model
Changefeeds can only be created by superusers, i.e., members of the admin
role. The admin role exists by default with root
as the member.
Considerations
Because basic changefeeds return results differently than other SQL statements, they require a dedicated database connection with specific settings around result buffering. In normal operation, CockroachDB improves performance by buffering results server-side before returning them to a client; however, result buffering is automatically turned off for basic changefeeds. basic changefeeds also have different cancellation behavior than other queries: they can only be canceled by closing the underlying connection or issuing a
CANCEL QUERY
statement on a separate connection. Combined, these attributes of changefeeds mean that applications should explicitly create dedicated connections to consume changefeed data, instead of using a connection pool as most client drivers do by default.This cancellation behavior (i.e., close the underlying connection to cancel the changefeed) also extends to client driver usage; in particular, when a client driver calls
Rows.Close()
after encountering errors for a stream of rows. The pgwire protocol requires that the rows be consumed before the connection is again usable, but in the case of a basic changefeed, the rows are never consumed. It is therefore critical that you close the connection, otherwise the application will be blocked forever onRows.Close()
.In most cases, each version of a row will be emitted once. However, some infrequent conditions (e.g., node failures, network partitions) will cause them to be repeated. This gives our changefeeds an at-least-once delivery guarantee. For more information, see Ordering Guarantees.
As of v22.1, changefeeds filter out
VIRTUAL
computed columns from events by default. This is a backward-incompatible change. To maintain the changefeed behavior in previous versions whereNULL
values are emitted for virtual computed columns, see thevirtual_columns
option for more detail.
Synopsis
> EXPERIMENTAL CHANGEFEED FOR table_name [ WITH (option [= value] [, ...]) ];
Parameters
Parameter | Description |
---|---|
table_name |
The name of the table (or tables in a comma separated list) to create a changefeed for. |
option / value |
For a list of available options and their values, see Options below. |
Options
Option | Value | Description |
---|---|---|
confluent_schema_registry |
Schema Registry address | The Schema Registry address is required to use avro . |
cursor |
Timestamp | Emits any changes after the given timestamp, but does not output the current state of the table first. If cursor is not specified, the changefeed starts by doing a consistent scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.cursor can be used to start a new changefeed where a previous changefeed ended.Example: CURSOR=1536242855577149065.0000000000 |
end_time |
Timestamp | Indicate the timestamp up to which the changefeed will emit all events and then complete with a successful status. Provide a future timestamp to end_time in number of nanoseconds since the Unix epoch. For example, end_time="1655402400000000000" . |
envelope |
wrapped / bare / key_only / row |
wrapped the default envelope structure for changefeed messages containing an array of the primary key, a top-level field for the type of message, and the current state of the row (or null for deleted rows).bare removes the after key from the changefeed message. When used with avro format, record will replace the after key.key_only emits only the key and no value, which is faster if you only need to know the key of the changed row.row emits the row without any additional metadata fields in the message. row does not support avro format.Refer to Responses for more detail on message format. Default: envelope=wrapped . |
format |
json / avro / csv / parquet |
Format of the emitted message. avro : For mappings of CockroachDB types to Avro types, refer to the table and detail on Avro limitations. Note: confluent_schema_registry is required with format=avro . csv : You cannot combine format=csv with the diff or resolved options. Changefeeds use the same CSV format as the EXPORT statement. Refer to Export data with changefeeds for details using these options to create a changefeed as an alternative to EXPORT . Note: initial_scan = 'only' is required with format=csv . parquet : Cloud storage is the only supported sink. The topic_in_value option is not compatible with parquet format.Default: format=json . |
initial_scan / no_initial_scan / initial_scan_only |
N/A | Control whether or not an initial scan will occur at the start time of a changefeed. initial_scan_only will perform an initial scan and then the changefeed job will complete with a successful status. You cannot use end_time and initial_scan_only simultaneously.If none of these options are specified, an initial scan will occur if there is no cursor , and will not occur if there is one. This preserves the behavior from previous releases. You cannot specify initial_scan and no_initial_scan or no_initial_scan and initial_scan_only simultaneously.Default: initial_scan If used in conjunction with cursor , an initial scan will be performed at the cursor timestamp. If no cursor is specified, the initial scan is performed at now() . |
min_checkpoint_frequency |
Duration string | Controls how often nodes flush their progress to the coordinating changefeed node. Changefeeds will wait for at least the specified duration before a flushing. This can help you control the flush frequency to achieve better throughput. If this is set to 0s , a node will flush as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then min_checkpoint_frequency is the amount of time that changefeed will need to catch up. That is, it could emit duplicate messages during this time. Note: resolved messages will not be emitted more frequently than the configured min_checkpoint_frequency (but may be emitted less frequently). Since min_checkpoint_frequency defaults to 30s , you must configure min_checkpoint_frequency to at least the desired resolved message frequency if you require resolved messages more frequently than 30s .Default: 30s |
mvcc_timestamp |
N/A | Include the MVCC timestamp for each emitted row in a changefeed. With the mvcc_timestamp option, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill. |
resolved |
INTERVAL |
Emits resolved timestamp events for the changefeed. Resolved timestamp events do not emit until all ranges in the changefeed have progressed to a specific point in time. Set an optional minimal duration between emitting resolved timestamps. Example: resolved='10s' . This option will emit a resolved timestamp event only if the timestamp has advanced and at least the optional duration has elapsed. If no duration is specified, all resolved timestamps are emitted as the high-water mark advances.Note: If you set resolved lower than 30s , then you must also set the min_checkpoint_frequency option to at minimum the same value as resolved , because resolved messages may be emitted less frequently than min_checkpoint_frequency , but cannot be emitted more frequently.Refer to Resolved messages for more detail. |
split_column_families |
N/A | Target a table with multiple columns families. Emit messages for each column family in the target table. Each message will include the label: table.family . |
updated |
N/A | Include updated timestamps with each row. |
virtual_columns |
STRING |
Changefeeds omit virtual computed columns from emitted messages by default. To maintain the behavior of previous CockroachDB versions where the changefeed would emit NULL values for virtual computed columns, set virtual_columns = "null" when you start a changefeed. You may also define virtual_columns = "omitted" , though this is already the default behavior for v22.1+. If you do not set "omitted" on a table with virtual computed columns when you create a changefeed, you will receive a warning that changefeeds will filter out virtual computed values. Default: "omitted" |
Avro limitations
Creating a changefeed using Avro is available in Core and Enterprise changefeeds with the confluent_schema_registry
option.
Below are clarifications for particular SQL types and values for Avro changefeeds:
- Decimals must have precision specified.
BYTES
(or its aliasesBYTEA
andBLOB
) are often used to store machine-readable data. When you stream these types through a changefeed withformat=avro
, CockroachDB does not encode or change the data. However, Avro clients can often include escape sequences to present the data in a printable format, which can interfere with deserialization. A potential solution is to hex-encodeBYTES
values when initially inserting them into CockroachDB. This will ensure that Avro clients can consistently decode the hexadecimal. Note that hex-encoding values at insertion will increase record size.BIT
andVARBIT
types are encoded as arrays of 64-bit integers.For efficiency, CockroachDB encodes
BIT
andVARBIT
bitfield types as arrays of 64-bit integers. That is, base-2 (binary format)BIT
andVARBIT
data types are converted to base 10 and stored in arrays. Encoding in CockroachDB is big-endian, therefore the last value may have many trailing zeroes. For this reason, the first value of each array is the number of bits that are used in the last value of the array.For instance, if the bitfield is 129 bits long, there will be 4 integers in the array. The first integer will be
1
; representing the number of bits in the last value, the second integer will be the first 64 bits, the third integer will be bits 65–128, and the last integer will either be0
or9223372036854775808
(i.e., the integer with only the first bit set, or1000000000000000000000000000000000000000000000000000000000000000
when base 2).This example is base-10 encoded into an array as follows:
{"array": [1, <first 64 bits>, <second 64 bits>, 0 or 9223372036854775808]}
For downstream processing, it is necessary to base-2 encode every element in the array (except for the first element). The first number in the array gives you the number of bits to take from the last base-2 number — that is, the most significant bits. So, in the example above this would be
1
. Finally, all the base-2 numbers can be appended together, which will result in the original number of bits, 129.In a different example of this process where the bitfield is 136 bits long, the array would be similar to the following when base-10 encoded:
{"array": [8, 18293058736425533439, 18446744073709551615, 13690942867206307840]}
To then work with this data, you would convert each of the elements in the array to base-2 numbers, besides the first element. For the above array, this would convert to:
[8, 1111110111011011111111111111111111111111111111111111111111111111, 1111111111111111111111111111111111111111111111111111111111111111, 1011111000000000000000000000000000000000000000000000000000000000]
Next, you use the first element in the array to take the number of bits from the last base-2 element,
10111110
. Finally, you append each of the base-2 numbers together — in the above array, the second, third, and truncated last element. This results in 136 bits, the original number of bits.A changefeed in Avro format will not be able to serialize user-defined composite (tuple) types. #102903
Examples
Create a changefeed
To start a changefeed:
EXPERIMENTAL CHANGEFEED FOR cdc_test;
In the terminal where the basic changefeed is streaming, the output will appear:
table,key,value
cdc_test,[0],"{""after"": {""a"": 0}}"
For step-by-step guidance on creating a basic changefeed, see the Changefeed Examples page.
Create a changefeed with Avro
To start a changefeed in Avro format:
EXPERIMENTAL CHANGEFEED FOR cdc_test WITH format = avro, confluent_schema_registry = 'http://localhost:8081';
In the terminal where the basic changefeed is streaming, the output will appear:
table,key,value
cdc_test,\000\000\000\000\001\002\000,\000\000\000\000\002\002\002\000
For step-by-step guidance on creating a basic changefeed with Avro, see the Changefeed Examples page.
Create a changefeed on a table with column families
To create a changefeed on a table with column families, use the FAMILY
keyword for a specific column family:
EXPERIMENTAL CHANGEFEED FOR TABLE cdc_test FAMILY f1;
To create a changefeed on a table and output changes for each column family, use the split_column_families
option:
EXPERIMENTAL CHANGEFEED FOR TABLE cdc_test WITH split_column_families;
For step-by-step guidance creating a basic changefeed on a table with multiple column families, see the Changefeed Examples page.