Oracle CDC Source Connector Configuration Properties

The following sections define each connector configuration property.

High Importance

oracle.server

The host name or address for the Oracle server.

  • Type: string
  • Valid Values: Valid IPv4/IPv6 address or host name
  • Importance: high
oracle.port

The port number used to connect to Oracle.

  • Type: int
  • Default: 1521
  • Valid Values: 1 to 65535 (inclusive)
  • Importance: high
oracle.sid

The Oracle system identifier (SID) of a multitenant container database (CDB) or non-multitenant database where tables reside.

  • Type: string
  • Valid Values: Non-empty string
  • Importance: high
oracle.pdb.name

The name of the pluggable database (PDB).

  • Type: string
  • Valid Values: Non-empty string
  • Importance: high
oracle.username

The name of the Oracle database user.

  • Type: string
  • Importance: high
oracle.password

The password for the Oracle database user.

  • Type: string
  • Importance: high
key.converter

Converter class used to serialize and format all record keys written to Kafka and deserialize all record keys read from Kafka. Examples of common formats include JSON, Avro, and Protobuf. The connector requires the key converter to be specified in the connector configuration. The converter may require more key.converter.* properties. For more details, see Configuring Key and Value Converters.

value.converter

Converter class used to serialize and format all record values written to Kafka and deserialize all record values read from Kafka. Examples of common formats include JSON, Avro, and Protobuf. The connector requires the value converter to be specified in the connector configuration. The converter may require more value.converter.* properties. For more details, see Configuring Key and Value Converters.

table.inclusion.regex

The regular expression that matches the schema-qualified names of the tables to be included, in the format schema.table. For example, dbo[.](table1|orders|customers). The period delimiter between the schema name dbo table name must be escaped by surrounding it with square brackets (shown in the example) or by adding double backslashes \\ before the period (dbo\\.(table1|orders|customers)).

  • Type: string
  • Valid Values: A valid regular expression or a blank string when the table.topic.name.template is blank.
  • Importance: high
table.exclusion.regex

The regular expression that matches the schema-qualified names of the tables to be excluded, in the format in the format schema.table. The exclusions are applied after the inclusions. A blank regex (the default) implies no exclusions. The period delimiter between the schema name and the table name must be escaped by surrounding it with square brackets (dbo[.](table1|orders|customers)) or by adding double backslashes \\ before it (dbo\\.(table1|orders|customers)).

  • Type: string
  • Default: blank
  • Valid Values: A blank string or a valid regular expression.
  • Importance: high
table.topic.name.template

The template that defines the name of the Kafka topic where the change event is written. This can be left blank only if the connector will write to the redo log topic and no other topics.

  • Type: string
  • Default: ${fullyQualifiedTableName}
  • Valid Values: Template variables which resolve to a valid topic name or a blank string. Valid topic names consist of 1 to 249 alphanumeric, +, ., _, \, and - characters.
  • Importance: high
redo.log.topic.name

The name of the Kafka topic where the connector records all redo log events.

  • Type: string
  • Default: ${connectorName}-${databaseName}-redo-log
  • Valid Values: Template variables which resolve to a valid topic name. Valid topic names consist of 1 to 249 alphanumeric, +, ., _, \, and - characters.
  • Importance: high
redo.log.consumer.bootstrap.server

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster with the redo log topic. This list is only used to connect to the initial hosts to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... This list does not need to contain the full set of servers. You should enter more than one host/port pair in case a server is offline.

  • Type: string
  • Valid Values: Kafka cluster server host/port pairs
  • Importance: high
redo.log.consumer.fetch.min.bytes

The minimum amount of data the server should return for a fetch request. If insufficient data is available the request waits for the minimum bytes of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available (the fetch request times out waiting for data to arrive). Setting this to something greater than 1 causes the server to wait for a larger amount of data to accumulate, which can improve server throughput at the cost of additional latency.

  • Type: int
  • Default: 1
  • Importance: high
redo.log.consumer.max.partition.fetch.bytes

The maximum amount of per-partition data the server will return. Records are fetched in batches by the consumer. If the first record batch (in the first fetched non-empty partition) is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. (This is not an absolute maximum.) The maximum record batch size accepted by the broker is defined using message.max.bytes in the Kafka broker configuration or max.message.bytes in the topic configuration. See fetch.max.bytes for limiting the consumer request size.

  • Type: int
  • Default: 1048576
  • Importance: high
redo.log.corruption.topic

The name of the Kafka topic where the connector records events that describe errors and corrupted portions of the Oracle redo log (missed data). This can optionally use the template variables ${connectorName}, ${databaseName}, and ${schemaName}. A blank topic name (the default) designates that this information is not written to Kafka.

  • Type: string
  • Default: blank
  • Importance: high

Medium Importance

start.from

When starting for the first time, this is the position in the redo log where the connector should start. Specifies an Oracle System Change Number (SCN) or a database timestamp with format yyyy-MM-dd HH:mm:SS in the database time zone. Defaults to the literal snapshot, which instructs the connector to perform an initial snapshot of each captured table prior to capturing changes.

  • Type: string
  • Default: snapshot
  • Importance: medium
key.template

The template that defines the Kafka record key for each change event. By default the record key contains a concatenated primary key value delimited by an underscore (_) character. Use ${primaryKeyStructOrValue} to contain either the sole column value of a single-column primary key or a STRUCT with the multi-column primary key fields (or null if the table has no primary or unique key). Use ${primaryKeyStruct} to always use a STRUCT for primary keys that have one or more columns (or null if there is no primary or unique key). If the template contains variables or string literals, the record key is the string with the variables resolved and replaced.

  • Type: string
  • Default: ${primaryKeyStructOrValue}
  • Importance: medium
max.batch.size

The maximum number of record to be returned in a single batch from the connector to Kafka Connect. The connector may return fewer records if no additional records are available.

  • Type: int
  • Default: 1000
  • Importance: medium
query.timeout.ms

The timeout in milliseconds (ms) for any query submitted to Oracle. The default is 300000 ms (5 minutes).

  • Type: long
  • Default: 300000
  • Importance: medium
max.retry.time.ms

The maximum total time in milliseconds (ms) the connector tries to reconnect after receiving a retriable exception. A negative value disables retries. The default is 86400000 ms (24 hours).

  • Type: long
  • Default: 86400000
  • Importance: medium
redo.log.poll.interval.ms

The interval between polls to retrieve the database logs. This has no effect when continuous mine is available and enabled for the database.

  • Type: int
  • Default: 1000
  • Importance: medium
snapshot.row.fetch.size

The number of rows to provide as a hint to the JDBC driver when fetching table rows in a snapshot. A value of 0 disables this hint.

  • Type: int
  • Default: 2000
  • Importance: medium
redo.log.row.fetch.size

The number of rows to provide as a hint to the JDBC driver when fetching rows from the redo log. A value of 0 disables this hint.

  • Type: int
  • Default: 10
  • Importance: medium
redo.log.consumer.fetch.max.bytes

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer. If the first record batch (in the first non-empty partition of the fetch) is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. (This is not an absolute maximum.) The maximum record batch size accepted by the broker is defined via message.max.bytes in the Kafka broker configuration or max.message.bytes in the topic configuration. Note that the consumer performs multiple fetches in parallel.

  • Type: long
  • Default: 52428800
  • Importance: medium
redo.log.consumer.max.poll.records

The maximum number of records returned in a single call to poll().

  • Type: int
  • Default: 500
  • Importance: medium
redo.log.consumer.receive.buffer.bytes

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the Operating System default buffer size is used.

  • Type: int
  • Default: 65536
  • Importance: medium
redo.log.consumer.request.timeout.ms

Controls the maximum amount of time the client waits for the request response. If the response is not received before the timeout elapses, the client resends the request (if necessary) or fails the request if retries are exhausted.

  • Type: int
  • Default: 30000
  • Importance: medium
redo.log.consumer.send.buffer.bytes

The size of the TCP receive buffer (SO_RCVBUF) to use when sending data. If the value is -1, the Operating System default buffer size is used.

  • Type: int
  • Default: 131072
  • Importance: medium
oracle.ssl.truststore.file

If using SSL for encryption and server authentication, set this to the location of the trust store containing server certificates that should be trusted.

  • Type: string
  • Default: “”
  • Importance: medium
oracle.ssl.truststore.password

If using SSL for encryption and server authentication, the password of the trust store containing server certificates that should be trusted.

  • Type: string
  • Default: “”
  • Importance: medium
oracle.kerberos.cache.file

If using Kerberos 5 authentication, set this to the location of the Kerberos 5 ticket cache file on all the Connect workers.

  • Type: string
  • Default: “”
  • Importance: medium

Low Importance

max.batch.timeout.ms

The maximum time to wait for a record before returning an empty batch. Must be at least 1000 milliseconds (one second). The default is 60000 milliseconds (one minute).

  • Type: long
  • Default: 60000
  • Importance: low
lob.topic.name.template

The template that defines the name of the Kafka topic where the connector writes LOB objects. The value can be a constant if the connector writes all LOB objects from all captured tables to one topic. Or, the value can include any supported template variables (for example, ${columnName}, ${databaseName}, ${schemaName}, ${tableName}, and ${connectorName}). The default is empty, which ignores all LOB type columns if any exist on captured tables. Special-meaning characters \, $, {, and } must be escaped with \ when not intended to be part of a template variable.

  • Type: string
  • Default: “”
  • Importance: low
numeric.mapping

Map NUMERIC values by precision and optionally scale to integral or decimal types.

  • Use none if all NUMERIC columns should be represented by the Connect DECIMAL logical type.
  • Use best_fit if NUMERIC columns should be cast to the Connect INT8, INT16, INT32, INT64, or FLOAT64 types based upon the column’s precision and scale.
  • Use precision_only to map NUMERIC columns based only on the column’s precision assuming the column’s scale is 0.
  • The none option is the default, but may lead to serialization issues with Avro since the Connect DECIMAL type is mapped to its binary representation. best_fit is often preferred since it maps to the most appropriate primitive type.
  • Type: string
  • Default: “none”
  • Importance: low
oracle.fan.events.enable

Whether the connection should allow using Oracle RAC Fast Application Notification (FAN) events. This is disabled by default, meaning FAN events will not be used even if they are supported by the database. You should only be enable this feature when using Oracle RAC set up with FAN events. Enabling the feature may cause connection issues when the database is not set up to use FAN events.

  • Type: boolean
  • Default: false
  • Importance: low
table.task.reconfig.checking.interval.ms

The interval for the background monitoring thread to examine changes to tables and reconfigure table placement if necessary. The default is 300000 milliseconds (5 minutes).

  • Type: long
  • Default: 300000
  • Importance: low
table.rps.logging.interval.ms

The interval for the background thread to log current requests per second (RPS) for each table.

  • Type: long
  • Default: 60000
  • Importance: low
output.table.name.field

The name of the field in the change record written to Kafka that contains the schema-qualified name of the affected Oracle table. A blank value signals that this field should not be included in the change records.

  • Type: string
  • Default: table
  • Importance: low
output.scn.field

The name of the field in the change record written to Kafka that contains the Oracle system change number (SCN) where this change was made. A blank value indicates the field should not be included in the change records.

  • Type: string
  • Default: scn
  • Importance: low
op_type

The name of the field in the change record written to Kafka that contains the operation type for this change event. A blank value indicates the field should not be included in the change records.

  • Type: string
  • Default: op_type
  • Importance: low
output.op.timestamp.field

The name of the field in the change record written to Kafka that contains the operation timestamp for the change event. A blank value indicates the field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the operation timestamp as a header with the given name.

  • Type: string
  • Default: op_ts
  • Importance: low
output.current.timestamp.field

The name of the field in the change record written to Kafka that contains the current timestamp of the Kafka Connect worker when this change event was processed. A blank value indicates the field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the current timestamp as a header with the given name.

  • Type: string
  • Default: current_ts
  • Importance: low
output.row.id.field

The name of the field in the change record written to Kafka that contains the row ID of the changed row. A blank value indicates the field field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the row ID as a header with the given name.

  • Type: string
  • Default: row_id
  • Importance: low
output.username.field

The name of the field in the change record written to Kafka that contains the name of the Oracle user that executed the transaction. A blank value indicates the field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the username as a header with the given name.

  • Type: string
  • Default: username
  • Importance: low
output.redo.field

The name of the field in the change record written to Kafka that contains the original redo data manipulation language (DML) statement from which this change record was created. A blank value indicates the field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the username as a header with the given name.

  • Type: string
  • Default: “”
  • Importance: low
output.undo.field

The name of the field in the change record written to Kafka that contains the original undo DML statement that effectively undoes this change and represents the “before” state of the row. A blank value indicates the field should not be included in the change records. Use unescaped . characters to designate nested fields within structs, or prefix with header: to write the username as a header with the given name.

  • Type: string
  • Default: “”
  • Importance: low
output.op.type.read.value

The value of the operation type for a read (snapshot) change event. By default this is R (read).

  • Type: string
  • Default: R
  • Importance: low
output.op.type.insert.value

The value of the operation type for an insert change event. By default this is I (insert).

  • Type: string
  • Default: I
  • Importance: low
output.op.type.update.value

The value of the operation type for an update change event. By default this is U (update).

  • Type: string
  • Default: U
  • Importance: low
output.op.type.delete.value

The value of the operation type for a delete change event. By default this is D (delete).

  • Type: string
  • Default: D
  • Importance: low
output.op.type.truncate.value

The value of the operation type for a delete change event. By default this is T (truncate).

  • Type: string
  • Default: T
  • Importance: low
redo.log.startup.polling.limit.ms

The amount of time to wait for the redo log to be present on connector startup. This is only relevant when connector is configured to capture change events. The default is 5 minutes (or 300000 milliseconds)

  • Type: long
  • Default: 300000
  • Importance: low

Confluent Platform license

confluent.topic.bootstrap.servers
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster used for licensing. All servers in the cluster will be discovered from the initial connection. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
  • Type: list
  • Importance: high
confluent.topic
Name of the Kafka topic used for Confluent Platform configuration, including licensing information.
  • Type: string
  • Default: _confluent-command
  • Importance: low
confluent.topic.replication.factor
The replication factor for the Kafka topic used for Confluent Platform configuration, including licensing information. This is used only if the topic does not already exist, and the default of 3 is appropriate for production use. If you are using a development environment with less than 3 brokers, you must set this to the number of brokers (often 1).
  • Type: int
  • Default: 3
  • Importance: low

Confluent license properties

Tip

While it is possible to include license-related properties in the connector configuration, starting with Confluent Platform version 6.0, you can now put license-related properties in the Connect worker configuration instead of in each connector configuration.

Note

This connector is proprietary and requires a license. The license information is stored in the _confluent-command topic. If the broker requires SSL for connections, you must include the security-related confluent.topic.* properties as described below.

confluent.license

Confluent issues enterprise license keys to each subscriber. The license key is text that you can copy and paste as the value for confluent.license. A trial license allows using the connector for a 30-day trial period. A developer license allows using the connector indefinitely for single-broker development environments.

If you are a subscriber, please contact Confluent Support for more information.

  • Type: string
  • Default: “”
  • Valid Values: Confluent Platform license
  • Importance: high
confluent.topic.ssl.truststore.location

The location of the trust store file.

  • Type: string
  • Default: null
  • Importance: high
confluent.topic.ssl.truststore.password

The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.

  • Type: password
  • Default: null
  • Importance: high
confluent.topic.ssl.keystore.location

The location of the key store file. This is optional for client and can be used for two-way authentication for client.

  • Type: string
  • Default: null
  • Importance: high
confluent.topic.ssl.keystore.password

The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.

  • Type: password
  • Default: null
  • Importance: high
confluent.topic.ssl.key.password

The password of the private key in the key store file. This is optional for client.

  • Type: password
  • Default: null
  • Importance: high
confluent.topic.security.protocol

Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

  • Type: string
  • Default: “PLAINTEXT”
  • Importance: medium

License topic configuration

A Confluent enterprise license is stored in the _confluent-command topic. This topic is created by default and contains the license that corresponds to the license key supplied through the confluent.license property.

Note

No public keys are stored in Kafka topics.

The following describes how the default _confluent-command topic is generated under different scenarios:

  • A 30-day trial license is automatically generated for the _confluent command topic if you do not add the confluent.license property or leave this property empty (for example, confluent.license=).
  • Adding a valid license key (for example, confluent.license=<valid-license-key>) adds a valid license in the _confluent-command topic.

Here is an example of the minimal properties for development and testing.

You can change the name of the _confluent-command topic using the confluent.topic property (for instance, if your environment has strict naming conventions). The example below shows this change and the configured Kafka bootstrap server.

confluent.topic=foo_confluent-command
confluent.topic.bootstrap.servers=localhost:9092

The example above shows the minimally required bootstrap server property that you can use for development and testing. For a production environment, you add the normal producer, consumer, and topic configuration properties to the connector properties, prefixed with confluent.topic..

License topic ACLs

The _confluent-command topic contains the license that corresponds to the license key supplied through the confluent.license property. It is created by default. Connectors that access this topic require the following ACLs configured:

  • CREATE and DESCRIBE on the resource cluster, if the connector needs to create the topic.
  • DESCRIBE, READ, and WRITE on the _confluent-command topic.

You can provide access either individually for each principal that will use the license or use a wildcard entry to allow all clients. The following examples show commands that you can use to configure ACLs for the resource cluster and _confluent-command topic.

  1. Set a CREATE and DESCRIBE ACL on the resource cluster:

    kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
    --add --allow-principal User:<principal> \
    --operation CREATE --operation DESCRIBE --cluster
    
  2. Set a DESCRIBE, READ, and WRITE ACL on the _confluent-command topic:

    kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
    --add --allow-principal User:<principal> \
    --operation DESCRIBE --operation READ --operation WRITE --topic _confluent-command
    

Overriding Default Configuration Properties

You can override the replication factor using confluent.topic.replication.factor. For example, when using a Kafka cluster as a destination with less than three brokers (for development and testing) you should set the confluent.topic.replication.factor property to 1.

You can override producer-specific properties by using the confluent.topic.producer. prefix and consumer-specific properties by using the confluent.topic.consumer. prefix.

You can use the defaults or customize the other properties as well. For example, the confluent.topic.client.id property defaults to the name of the connector with -licensing suffix. You can specify the configuration settings for brokers that require SSL or SASL for client connections using this prefix.

You cannot override the cleanup policy of a topic because the topic always has a single partition and is compacted. Also, do not specify serializers and deserializers using this prefix; they are ignored if added.