PostgreSQL Source Connector (Debezium) Configuration Properties

The Postgres Source Connector can be configured using a variety of configuration properties.

plugin.name

The name of the Postgres logical decoding plugin installed on the server. Supported values are either decoderbufs, wal2json or wal2json_rds. There are two new options supported since 0.8.0.Beta1: wal2json_streaming and wal2json_rds_streaming. When the processed transactions are very large it is possible that the JSON batch event with all changes in the transaction will not fit into the hard-coded memory buffer of size 1 GB. In such cases it is possible to switch to so-called streaming mode when every change in transactions is sent as a separate message from PostgreSQL into Debezium.

  • Type: String
  • Importance: Medium
  • Default: decoderbufs
slot.name

The name of the Postgres logical decoding slot created for streaming changes from a plugin and database instance. Values must conform to Postgres replication slot naming rules which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”

  • Type: String
  • Importance: Medium
  • Default: debezium
slot.drop_on_stop

Indicates to drop or not to drop the logical replication slot when the connector finishes orderly. Should only be set to true in testing or development environments. Dropping the slot allows WAL segments to be discarded by the database. If set to true the connector may not be able to resume from the WAL position where it left off.

  • Type: String
  • Importance: Low
  • Default: false
database.hostname

IP address or hostname of the PostgreSQL database server.

  • Type: String
  • Importance: High
database.port

Integer port number of the PostgreSQL database server.

  • Type: Integer
  • Importance: Low
  • Default: 5432
database.user

Username to use when when connecting to the PostgreSQL database server.

  • Type: String
  • Importance: High
database.password

Password to use when when connecting to the PostgreSQL database server.

  • Type: Password
  • Importance: High
database.dbname

The name of the PostgreSQL database from which to stream the changes.

  • Type: String
  • Importance: High
database.server.name

Logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored. The logical name should be unique across all other connectors, since it is used as a prefix for all Kafka topic names coming from this connector. Defaults to host:_port_/dbname, where host is the value of the database.hostname property, port is the value of the database.port property, and dbname is the value of the database.dbname property. Confluent recommends using a meaningful and logical name for dbname.

  • Type: String
  • Importance: High
  • Default: database.hostname:database.port/database.dbname
schema.whitelist

An optional comma-separated list of regular expressions that match schema names to be monitored. Any schema name not included in the whitelist will be excluded from monitoring. By default all non-system schemas are monitored. May not be used with schema.blacklist.

  • Type: List of Strings
  • Importance: Low
schema.blacklist

An optional comma-separated list of regular expressions that match schema names to be excluded from monitoring. Any schema name not included in the blacklist will be monitored, with the exception of system schemas. May not be used with schema.whitelist.

  • Type: List of Strings
  • Importance: Low
table.whitelist

An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored. Any table not included in the whitelist is excluded from monitoring. Each identifier is in the form schemaName.tableName. By default the connector will monitor every non-system table in each monitored schema. May not be used with table.blacklist.

  • Type: List of Strings
  • Importance: Low
table.blacklist

An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring. Any table not included in the blacklist is monitored. Each identifier is in the form schemaName.tableName. May not be used with table.whitelist.

  • Type: List of Strings
  • Importance: Low
column.blacklist

An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName.

  • Type: List of Strings
  • Importance: Low
time.precision.mode

Time, date, and timestamps can be represented with different kinds of precision, including:

adaptive (the default) which captures the time and timestamp values exactly as they are in the database. adaptive uses either millisecond, microsecond, or nanosecond precision values based on the database column type;

adaptive_time_microseconds which captures the date, datetime and timestamp values exactly as they are in the database.

adaptive_time_microseconds uses either millisecond, microsecond, or nanosecond precision values based on the database column type, with the exception of TIME type fields, which are always captured as microseconds;

connect which always represents time and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp. connect uses millisecond precision regardless of database column precision.

See temporal values.

  • Type: String
  • Importance: High
  • Default: adaptive
decimal.handling.mode

Specifies how the connector should handle values for DECIMAL and NUMERIC columns:

precise (the default) represents values precisely using java.math.BigDecimal, which are represented in change events in binary form;

double which represents them using double values. double may result in a loss of precision but is easier to use;

string option which encodes values as a formatted string. string option is easy to consume but semantic information about the real type is lost.

See Decimal Values.

  • Type: String
  • Importance: High
  • Default: precise
hstore.handling.mode

Specifies how the connector should handle values for hstore columns:

map (the default) represents using MAP;

json represents them using json strings. The json option encodes values as formatted strings such as key:val}.

See HStore Values.

  • Type: List of Strings
  • Importance: Low
  • Default: n/a
database.sslmode

Sets whether or not to use an encrypted connection to the PostgreSQL server. Options include:

disable (the default) to use an unencrypted connection ;

require to use a secure (encrypted) connection. Fails if one cannot be established;

verify-ca is similar to require, but additionally verify the server TLS certificate against the configured Certificate Authority (CA) certificates. Fails if no valid matching CA certificates are found;

verify-full is similar to verify-ca but additionally verify that the server certificate matches the host to which the connection is attempted.

See the PostgreSQL documentation for more information.

  • Type: String
  • Importance: Low
  • Default: disable
database.sslcert

The path to the file containing the SSL Certificate for the client. See the PostgreSQL documentation for more information.

  • Type: String
  • Importance: High
database.sslpassword

The password to access the client private key from the file specified by database.sslkey. See the PostgreSQL documentation for more information.

  • Type: String
  • Importance: Low
database.sslrootcert

The path to the file containing the root certificate(s) against which the server is validated. See the PostgreSQL documentation for more information.

  • Type: String
  • Importance: Low
database.tcpKeepAlive

Enable TCP keep-alive probe to verify that database connection is still alive. (enabled by default). See the PostgreSQL documentation for more information.

  • Type: String
  • Importance: Low
tombstones.on.delete

Controls whether a tombstone event should be generated after a delete event. When true the delete operations are represented by a delete event and a subsequent tombstone event. When false only a delete event is sent. Emitting a tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.

  • Type: String
  • Importance: High
  • Dedault: true
column.propagate.source.type

An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages. The schema parameters __debezium.source.column.type, __debezium.source.column.length and __debezium.source.column.scale are used to propagate the original type name and length (for variable-width types), respectively. Useful to properly size corresponding columns in sink databases. Fully-qualified names for columns are in the form databaseName.tableName.columnName, or databaseName.schemaName.tableName.columnName.

  • Type: List of Strings
  • Importance: Low
  • Default: n/a

The following are advanced configuration properties:

snapshot.mode

The criteria for running a snapshot upon startup of the connector.

  • Type: String
  • Importance: Medium
  • Default: initial
  • Valid values: [always, initial, initial_only never, custom]
snapshot.lock.timeout.ms

Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail See snapshosts.

  • Type: String
  • Importance: Low
  • Default: 10000
snapshot.select.statement.overrides

Controls which rows from tables will be included in snapshot. This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME). Select statements for the individual tables are specified in additional configuration properties, one for each table, identified by the id

snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]. The value of these properties is the SELECT statement to use when retrieving data from the specific table during the snapshot process. A possible use case for large append-only tables is setting a specific point where to start (resume) the snapshot process, in case a previous snapshot process was interrupted.

Note: This setting has impact on snapshots only. Events generated by logical decoder are not affected by it at all.

  • Type: List of Strings
  • Importance: Low
rows.fetch.size

Positive integer value that specifies the maximum number of rows that should be read at one time from each table while taking a snapshot. The connector reads the table contents in multiple batches of this size.

  • Type: Integer
  • Importance: Low
  • Default: 10240
max.queue.size

Positive integer value that specifies the maximum size of the blocking queue into which change events received via streaming replication are placed before they are written to Kafka. This queue can provide backpressure when, for example, writes to Kafka are slower or if Kafka is not available.

  • Type: Integer
  • Importance: Low
  • Default: 8192
max.batch.size

Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector.

  • Type: Integer
  • Importance: Low
  • Default: 2048
poll.interval.ms

Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 500 milliseconds.

  • Type: Integer
  • Importance: Low
  • Default: 500
include.unknown.datatypes

When Debezium encounters a field whose data type is unknown, the field is omitted from the change event and a warning is logged (the default). In some cases it may be preferable to include the field and send it downstream to clients in an opaque binary representation so the clients can decode it. Set to false to filter unknown data from events and true to keep them in binary format.

Note: Clients risk backward compatibility issues with this setting. Not only may the database-specific binary representation change between releases, but when the datatype is eventually supported, it will be sent downstream as a logical type, requiring adjustments by consumers. In general, when encountering unsupported data types, please file a feature request so that support can be added.

  • Type: Boolean
  • Importance: Low
  • Default: false
database.initial.statements

A semicolon separated list of SQL statements to be executed when a JDBC connection (not the transaction log reading connection) to the database is established. Use a double semicolon (;;) to use a semicolon as a character and not as a delimiter.

Note: The connector may establish JDBC connections at its own discretion. This setting is typically only used for configuring session parameters only. It should not be used for executing DML statements.

  • Type: List of Strings separated
  • Importance: Low
heartbeat.interval.ms

Controls how frequently heartbeat messages are sent. This property (which is disabled by default) contains an interval in milliseconds that defines how frequently the connector sends messages to a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should use heartbeat messages when records in non-captured tables are changed for a longer period of time. In this case, the connector proceeds to read the log from the database but never emits any change messages into Kafka. This means that no offset updates are committed to Kafka.

This causes WAL files to be retained by the database longer than needed because the connector processed the files already but did not flush the latest retrieved Log Sequence Number (LSN) to the database. Using heartbeat messages may also result in more re-sent change events after a connector restart. Set this parameter to 0 to not send heartbeat messages.

  • Type: Integer
  • Importance: Low
  • Default: 0
heartbeat.topics.prefix

Sets the name of the topic to which heartbeat messages are sent. The topic is named according to the pattern <heartbeat.topics.prefix>.<server.name>.

  • Type: String
  • Importance: Low
  • Default: __debezium-heartbeat
schema.refresh.mode

Specify the conditions that trigger a refresh of the in-memory schema for a table. columns_diff (the default) is the safest mode. This setting ensures the in-memory schema stays in-sync with the database table schema.

columns_diff_exclude_unchanged_toast instructs the connector to refresh the in-memory schema cache if there is a discrepancy between it and the schema derived from the incoming message, unless unchanged TOASTable data fully accounts for the discrepancy.

This setting can improve connector performance significantly if there are frequent table updates for tables that have TOASTed data which are rarely part of the updates. However, it is possible for the in-memory schema to become outdated if TOASTable columns are dropped from the table.

  • Type: List of Strings separated
  • Importance: Low
  • Default: columns_diff
snapshot.delay.ms

An interval in milliseconds that the connector should wait before taking a snapshot after starting up. This setting can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which can cause connector re-balancing.

  • Type: Integer
  • Importance: Low
slot.stream.params

Optional list of parameters to be passed to the configured logical decoding plugin. This optional list can be used to enable server-side table filtering when using the wal2json plugin. Allowed values depend on the chosen plugin and are separated by semicolon (for example, add-tables=`public.table,public.table2;include-lsn=true.

  • Type: Integer
  • Importance: Low

Auto topic creation

For more information about Auto topic creation, see Configuring Auto Topic Creation for Source Connectors.

Note

Configuration properties accept regular expressions (regex) that are defined as Java regex.

topic.creation.groups

A list of group aliases that are used to define per-group topic configurations for matching topics. A default group always exists and matches all topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: The values of this property refer to any additional groups. A default group is always defined for topic configurations.
topic.creation.$alias.replication.factor

The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.partitions

The number of topic partitions created by this connector. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.include

A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude

A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group. Note that exclusion rules override any inclusion rules for topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}

Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule. $alias applies to the default group as well as any group defined in topic.creation.groups.

  • Type: property values
  • Default: Kafka broker value

More details can be found in the Debezium connector properties documentation.

Note

Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.