Oracle CDC Source Connector for Confluent Platform

The Kafka Connect Oracle CDC Source connector captures each change to rows in a database and then represents the changes as change event records in Apache Kafka® topics. The connector uses Oracle LogMiner to read the database redo log.

Note

The connector requires a database user with permissions to use LogMiner and permissions to select from all of the tables captured by the connector. For additional information, see Oracle database prerequisites.

The connector can be configured to capture a subset of the tables in a single database, defined as all tables accessible by the user that match an include regular expression. It can also be configured to not capture tables that match a separate exclude regular expression.

The connector writes the changes from each of the tables to Kafka topics, where the table-to-topic mapping is determined by the table.topic.name.template connector configuration property. This property defaults to the dot-delimited fully-qualified name of the table (for example, database_name.schema_name.table_name).

The connector recognizes literals and several variables (for example, ${tableName} and ${schemaName}) to customize table-to-topic mapping. Variables are resolved at runtime. For example, the following configuration property results in changes to the ORCL.ADMIN.USERS table to be written to the Kafka topic named my-prefix.ORCL.ADMIN.USERS. For a list of template variables, see Template variables.

table.topic.name.template=my-prefix.${databaseName}.${schemaName}.${tableName}

The connector is designed to write all of the raw Oracle redo log records to one Kafka topic logically referred to as the “redo log topic”. The redo.log.topic.name configuration property determines the name of this topic. The connector actually consumes this topic to identify and produce all of the table-specific events written to the table-specific topics. The connector can be configured by setting the table.topic.name.template property to an empty string to only write to the redo log topic without generating table-specific events to the table-specific topics.

There are many other configuration properties. For example, the metadata associated with each database change event can be included in the Kafka record header or in extra fields (with user-defined field names) in the Kafka record value.

Features

The Connect Oracle CDC Source connector provides the following features:

Redo log topic
The connector reads the Oracle database redo log and writes each raw redo log event as a separate Kafka record. The connector queries the V$LOGMNR_CONTENTS view. Each row in the result set that applies to one of the matched tables is converted to records with a field for each column in the result set. The connector will write to this topic using an at-least-once guarantee. This means that, following an ungraceful stop of the Connect worker, the connector may rewrite a portion of the redo log event records upon restart.
Redo log corruption topic
It is possible that the Oracle redo logs themselves may be corrupted. The connector will not terminate if LogMiner reports corrupted blocks or segments. It is possible to configure the connector to write the corrupted block details to a separate Kafka topic, allowing downstream consumers to use this information to track and react to Oracle redo log file corruptions.
Table change event topics
The connector can turn raw logs into change events for each table and write these to Kafka topics using the configured table-to-topic mapping.
Pattern match tables to be captured
The connector configuration uses two regular expressions to identify the tables in the database that it should capture. The connector captures events from all tables in the database whose fully-qualified names (for example, dbo.Users) are matched by the include expression, unless explicitly excluded by matching the exclude expression.
Flexible mapping of tables to Kafka topics
The connector configuration specifies a template that identifies the names of the Kafka topic to which the events are written. This template is resolved into a name for every change event and can use literals or template variables including the schema name, table name, database name, various timestamps, and transaction IDs. This gives users a flexible way of identifying the names of the Kafka topics where the change events are written.
Record keys
The records that the connector writes to Kafka topics have (by default) a key corresponding to the primary key column values for the corresponding row in the database. If the primary key consists of a single column, the Kafka record’s key will contain the value of the column for that row. If the primary key consists of multiple columns, the Kafka record’s key will be a STRUCT containing a field for each of the primary key’s columns. You can change this behavior by setting the key.template configuration property. As with other Connect source connectors, each record’s key determines the topic partition where the connector writes the record.
Snapshots

When a connector is first started, it attempts to obtain a snapshot of all existing rows in each table, writing these (as records) to the Kafka topic for the table, before starting to capture changes made to those rows. This results in the Kafka topic containing records for every row in the database table. However, if the Kafka topic should only contain records from a specific point in time, you can use the start.from configuration property to specify an SCN or timestamp. This will set the point where the connector will start capturing events for all tables.

Note

If the connector is interrupted, is stopped, or fails while performing a snapshot of any tables, upon recovery or restart the connector restarts all incomplete snapshots from the beginning. Unfortunately, it is currently not possible to resume a snapshot of a table that is changing while ensuring that all changes to that table have been captured.

Large Object (LOB) types

You can configure the connector to capture changes in tables that contain columns with BLOB, CLOB, and NCLOB types. These LOB types are written to separate LOB topics that can be consumed by downstream applications. To enable this feature, specify a template variable to use in the lob.topic.name.template configuration property (see Template variables for supported variables). When enabled, the connector writes LOB objects to a separate topic with the key of the topic, consisting of the table full name, column name, and primary key of the change record of the LOB object row.

Note

Note the following:

  • A table that contains LOB type columns must include primary keys.
  • Be careful when updating the value of primary keys when used in association with LOB topics. When an update to the primary key is processed, the connector will emit the updated record to the change event, but will not retroactively update the LOB record key.
Auto-table set sync and task reconfiguration
Tables can be deleted and created in the Oracle database while the connector is running. The connector periodically checks for newly added or recently dropped tables that match the tables to be captured. When the connector identifies new or deleted tables, the connector automatically reconfigures its tasks to stop watching the deleted tables and begin capturing changes from new tables that match the table filter expressions.
Scalable database workloads
The connector is designed to scale from small to large database workloads using connector tasks. The connector can be configured to use as few as one task (tasks.max=1) or scale to as many tasks as required to capture all table changes.
Micro-rebalancing of task loads
This feature applies only to connectors in a Connect cluster running Confluent Platform 6.0 or later. Upon startup, the connector evenly distributes tables across its tasks. The connector monitors throughput variations for each table and the position of each task in the redo log. The connector automatically attempts to distribute the load across all of the connector’s tasks by assigning frequently-changing tables to different tasks.
Automatic creation of Kafka topics
This feature applies only to connectors in a Connect cluster running Confluent Platform 6.0 or later. You can include rules in your connector configuration that define the topic settings for any topic that the source connector writes to. If you are using an earlier version of Confluent Platform, either create the Kafka topics ahead of time or configure your Kafka brokers to automatically create topics (see the broker configuration properties).
Automated reconnection
The connector is able to automatically reconnect when the connection to the database is disrupted or interrupted. When a connection is lost, the connector stops, logs a disconnection warning or error messages, and attempts to reconnect using exponential backoff. Once the connection is re-established, the connector automatically resumes normal operation. Several connection properties control this behavior, including query.timeout.ms (defaults to 5 mins) and max.retry.time.ms (defaults to 24 hours). You can change these values. You set max.retry.time.ms to 0 to disable automated reconnection.
Oracle multitenant CDB/PDB architecture support
Oracle provides multitenant architecture support in Oracle Database 12c and above. System tables are stored in a single container database (CDB). User tables are stored in pluggable databases (PDBs) plugged into the CDB. Each instance of our connector can read user tables that reside in one PDB. The PDB name where user tables reside can be configured using the property oracle.pdb.name. To read from system tables in the CDB, or to read from legacy 11g database, leave the oracle.pdb.name configuration property blank. The oracle.sid property must be set to the Oracle system identifier (SID) to access either CDB, PDB, or legacy non-multitenant database.
Kerberos Integration
Use the oracle.kerberos.cache.file configuration property to set the location of the Kerberos ticket cache file. For an example, see Using Kerberos authentication.

Requirements and current limitations

The following sections provides usage requirements and current limitations.

Oracle versions

The connector works with the following Oracle versions:

  • Oracle 11g Enterprise Edition
  • Oracle 12c Enterprise Edition
  • Oracle 18c Enterprise Edition

Note

Note the following:

  • Currently, the connector does not support Oracle 19c (and later).
  • The connector has not been tested against Oracle RAC and Exadata.

Confluent Platform versions

The connector can be installed in Kafka Connect workers running Confluent Platform 5.3 (and later). It is recommended that you deploy the connector on Connect workers running Confluent Platform 6.0 (and later). In Confluent Platform 6.0 (and later), the connector can automatically distribute workloads across all of the connector’s tasks by assigning frequently-changing tables to different tasks.

Data types

For supported data types, see Supported data types.

The connector cannot differentiate between numeric types INT, INTEGER, SMALLINT, DEC, DECIMAL, NUMBER, NUMERIC. All these numerical types are mapped to the Connect Decimal logical type. For more about this, see Kafka Connect Deep Dive. The connector cannot differentiate between float number types DOUBLE PRECISION, REAL, and FLOAT. All of these float number types are mapped to Connect FLOAT64.

Note

Note the following:

  • You can use the configuration property numeric.mapping to map numeric types with known precision and scale to their best matching primitive type.
  • A few of the PL/SQL-exclusive data types are not supported.

DDL statements

The connector recognizes and parses DDL statements applied to the database, after the connector starts. These DDL statements are used to identify changes in the structure of captured tables and to adjust the schema of event records written to Kafka.

The connector’s DDL parser does not support the following DDL statements:

  • ALTER TABLE statements to add or remove a primary key constraint.
  • ALTER TABLE statements dropping multiple columns in a single statement.
  • ALTER TABLE adding columns of TIMESTAMP type with DEFAULT.
  • ALTER TABLE with columns that contain user-defined types.
  • ALTER TABLE to rename tables or columns

Other considerations

  • If the Oracle redo log is corrupted, or if the Oracle redo log is incompatible with the current table schema, the connector sends the redo log block to the error log topic.
  • Currently, the connector only supports writing to the redo log topic with one partition. All converted redo logs are sent to the same partition. If you create the redo log topic manually, create it as a single partition topic.
  • Currently, the connector does not record transaction summary information.
  • Using single-message transformations (SMTs) is not recommended, unless the following conditions apply:
    • The SMT is insensitive to record schema.
    • The SMT does not alter the schema of records.
  • RENAME a column is not supported.
  • The connector does not current support the Protobuf converter.

Oracle database prerequisites

Review the following sections and make sure the connector database user and the operating environment are configured correctly.

Configure database user privileges

The connector requires a database user with role privileges to use LogMiner and to select from all tables captured by the connector. For this reason, you may need to create a new database user for the connector. Note the following:

  • Create a local user for a standard database. The username must not start with c## or C##.
  • Create a common user for a multitenant database. The username must start with c## or C##.

Standard database: Oracle database version 12c or 18c

To set up a new user with the correct standard database privileges, you log in as SYSDBA and enter the following commands to create a role for the user with the required privileges. You can use any preferred name for the role. Note that CDC_PRIVS is the role name used in the following example steps.

  1. As SYSDBA, enter the following SQL commands to create the role and grant privileges to the role. See Step 3 if using a Database Vault and Step 4 for an Oracle 12c database.

    CREATE ROLE CDC_PRIVS;
    GRANT CREATE SESSION,
      EXECUTE_CATALOG_ROLE,
      SELECT ANY TRANSACTION,
      SELECT ANY DICTIONARY
      TO CDC_PRIVS;
    GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO CDC_PRIVS;
    GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO CDC_PRIVS;
    GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO CDC_PRIVS;
    GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO CDC_PRIVS;
    
  2. Create a username, password, and grant privileges for the user. You can use any username and password.

    CREATE USER myuser IDENTIFIED BY password DEFAULT TABLESPACE USERS;
    GRANT CDC_PRIVS TO myuser;
    ALTER USER myuser QUOTA UNLIMITED ON USERS;
    
    GRANT CREATE SESSION TO myuser;
    GRANT CREATE TABLE TO myuser;
    GRANT CREATE SEQUENCE TO myuser;
    GRANT CREATE TRIGGER TO myuser;
    
  3. (If using a Database Vault) Omit EXECUTE_CATALOG_ROLE and enter the following commands:

    GRANT EXECUTE ON SYS.DBMS_LOGMNR TO CDC_PRIVS;
    GRANT EXECUTE ON SYS.DBMS_LOGMNR_D TO CDC_PRIVS;
    GRANT EXECUTE ON SYS.DBMS_LOGMNR_LOGREP_DICT TO CDC_PRIVS;
    GRANT EXECUTE ON SYS.DBMS_LOGMNR_SESSION TO CDC_PRIVS;
    
  4. (If using Oracle 12c) Enter the following command:

    GRANT LOGMINING TO CDC_PRIVS;
    

Multitenant database: Oracle database version 12c or 18c (PDB)

To set up a common user with the correct database privileges, you log in as SYSDBA and enter the following commands to create a role for the user with the required privileges. You can use any preferred name for the role.

Note

Common user accounts are created in cdb$root and must use the convention: c##<name> or C##<name>. The user ##CDC_PRIVS is used in the following example.

As SYSDBA, enter the following SQL commands:

CREATE ROLE C##CDC_PRIVS;
GRANT CREATE SESSION,
EXECUTE_CATALOG_ROLE,
SELECT ANY TRANSACTION,
SELECT ANY DICTIONARY TO C##CDC_PRIVS;
GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO C##CDC_PRIVS;
GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO C##CDC_PRIVS;
GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO C##CDC_PRIVS;
GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO C##CDC_PRIVS;

CREATE USER C##myuser IDENTIFIED BY password CONTAINER=ALL;
GRANT C##CDC_PRIVS TO C##myuser CONTAINER=ALL;
ALTER USER C##myuser QUOTA UNLIMITED ON USERS;
ALTER USER C##myuser SET CONTAINER_DATA = (CDB$ROOT, <pdb name>) CONTAINER=CURRENT;

ALTER SESSION SET CONTAINER=CDB$ROOT;
GRANT CREATE SESSION, ALTER SESSION, SET CONTAINER, LOGMINING, EXECUTE_CATALOG_ROLE TO C##myuser CONTAINER=ALL;
GRANT SELECT ON GV_$DATABASE TO C##myuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##myuser CONTAINER=ALL;
GRANT SELECT ON GV_$ARCHIVED_LOG TO C##myuser CONTAINER=ALL;
GRANT CONNECT TO C##myuser CONTAINER=ALL;
GRANT CREATE TABLE TO C##myuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO C##myuser CONTAINER=ALL;
GRANT CREATE TRIGGER TO C##myuser CONTAINER=ALL;

Important

When you configure the connector, use this user account for the JDBC credentials. Use the entire user name, including the c##, as the JDBC user name.

Standard database: Oracle database version 11g

To set up a new user with the correct privileges, you log in as SYSDBA and enter the following commands to create a role for the user with the required privileges. You can use any preferred name for the role. Note that CDC_PRIVS is the role name used in the following example steps.

  1. As SYSDBA, enter the following SQL commands to create the role and grant privileges for the role.

    CREATE ROLE C##CDC_PRIVS;
    CREATE ROLE CDC_PRIVS;
    GRANT CREATE SESSION,
      EXECUTE_CATALOG_ROLE,
      SELECT ANY TRANSACTION,
      SELECT ANY DICTIONARY
      SELECT ANY TABLE
      TO CDC_PRIVS;
    GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO CDC_PRIVS;
    GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO CDC_PRIVS;
    GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO CDC_PRIVS;
    GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO CDC_PRIVS;
    
  2. Create a username, password, and grant privileges for the user. You can use any username and password.

    CREATE USER myuser IDENTIFIED BY password DEFAULT TABLESPACE USERS;
    GRANT CDC_PRIVS TO myuser;
    ALTER USER myuser QUOTA UNLIMITED ON USERS;
    
    GRANT CREATE SESSION TO myuser;
    GRANT CREATE TABLE TO myuser;
    GRANT CREATE SEQUENCE TO myuser;
    GRANT CREATE TRIGGER TO myuser;
    

Amazon RDS Oracle instance

Amazon RDS does not allow users to log in as SYSDBA, so it is not possible to create a role. You must create a user first and then grant the necessary privileges.

  1. Enter the following commands:

    CREATE USER myuser IDENTIFIED BY PASSWORD DEFAULT TABLESPACE USERS;
    GRANT CREATE SESSION, SELECT ANY TRANSACTION, SELECT ANY DICTIONARY TO myuser;
    GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO myuser;
    GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO myuser;
    GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO myuser;
    GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO myuser;
    ALTER USER myuser QUOTA UNLIMITED ON USERS;
    
    GRANT CREATE SESSION TO myuser;
    GRANT CREATE TABLE TO myuser;
    GRANT CREATE SEQUENCE TO myuser;
    GRANT CREATE TRIGGER TO myuser;
    GRANT LOGMINING TO myuser;
    
  2. Enter the following command:

    exec rdsadmin.rdsadmin_util.grant_sys_object('DBMS_LOGMNR', 'MYUSER', 'EXECUTE');
    

Turn on ARCHIVELOG mode

To extract a LogMiner dictionary to the redo log files, the database must be open and in ARCHIVELOG mode. Note that you shut down the database when completing the following steps.

Note

You can skip these steps if ARCHIVELOG mode is already enabled for the database. ARCHIVELOG mode is enabled by default for AWS RDS Oracle instances.

  1. Connect as a user with SYSDBA privileges.

  2. Shut down the database instance using the command SHUTDOWN IMMEDIATE.

    SHUTDOWN IMMEDIATE;
    
  3. Make a whole database backup including all data files and control files. You can use operating system commands or RMAN to perform this operation. This backup can be used in the future for recovery with archived redo log files created once the database is in ARCHIVELOG mode.

  4. Start the instance and mount the database using the command STARTUP MOUNT.

    STARTUP MOUNT;
    
  5. Place the database in ARCHIVELOG mode using the command ALTER DATABASE ARCHIVELOG and open the database using the command ALTER DATABASE OPEN.

    ALTER DATABASE ARCHIVELOG;
    ALTER DATABASE OPEN;
    

Enable supplemental logging for all columns

Enter the following commands to specify that when a row is updated, all columns of that row (except for LOBs, LONGS, and ADTs) are placed in the redo log file. You must have correct privileges to execute the command.

  1. Set the session container.

    ALTER SESSION SET CONTAINER=cdb$root;
    
  2. Enter one of the following commands:

    To enable full supplemental logging for all tables:

    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    

    To enable full supplemental logging for specific tables:

    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
    ALTER TABLE <schema name>.<table name> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    

Granting the user Flashback query privilege

To perform snapshots on captured tables, the connector requires the user to have privileges to perform Flashback queries (that is, SELECT AS OF) on the captured tables.

Enter the following command to grant the user privileges to perform flashback queries on the database:

GRANT FLASHBACK ANY TABLE TO myuser;

Note

This command is not required if you want to capture the redo log (without generating change events) or generate change events starting from a known System Change Number (SCN) or timestamp using start.from.

For example, you enter the following commands to grant the example user created above FLASHBACK ANY TABLE privileges:

GRANT FLASHBACK ANY TABLE TO C##myuser container=all

Amazon RDS for Oracle instance:

GRANT FLASHBACK ANY TABLE TO myuser;

Install the Oracle CDC Source connector

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-oracle-cdc-source:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-oracle-cdc-source:1.0.0

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

Confluent’s Oracle CDC Source connector is a Confluent Premium connector and requires an additional subscription, specifically for this connector.

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Oracle CDC Source Connector Configuration Properties.

Quick Start

For quick start instructions, see the testing scenarios and examples in Testing Scenarios and Examples.

Creating topics

Creating Kafka topics for records sourced from your database requires setting a few configuration properties.

Confluent Platform version 6.0 (or later)

If you are using Confluent Platform 6.0 (or later), you can configure your Connect worker to automatically create missing topics by adding properties to the worker and connector configuration.

  1. Add the following configuration property to the Connect worker and then restart the worker.

    topic.creation.enable=true
    
  2. Add the following configuration properties to the connector configuration:

    topic.creation.groups=redo
    topic.creation.redo.include=your-redo-log-topic
    topic.creation.redo.replication.factor=3
    topic.creation.redo.partitions=1
    topic.creation.redo.cleanup.policy=delete
    topic.creation.redo.retention.ms=1209600000
    topic.creation.default.replication.factor=3
    topic.creation.default.partitions=5
    topic.creation.default.cleanup.policy=compact
    

These properties define a topic creation rule called “redo” that creates a Kafka topic named your-redo-log-topic (the topic can have any name) with 1 partition and 3 replicas. The records for this topic can be deleted after 14 days (1209600000 milliseconds). You can change the replication factor and cleanup policy.

Note

The retention time needs to be longer than the maximum time the connector is allowed to be out of service.

All other topics are created with 5 partitions and 3 replicas. These topics have compaction enabled to remove any records for which there is a newer record with the same record key.

Confluent Platform version 5.5 (or earlier)

If you are using Confluent Platform 5.5 (or earlier) and the property auto.create.topics.enable=true is set in your Kafka broker configuration, the Kafka broker automatically creates any topics to which the Oracle CDC Source Connector writes. The Kafka broker creates the topics using the following connector configuration properties:

  • redo.log.topic.name
  • redo.log.corruption.topic
  • table.topic.name.template

If you are using Confluent Platform 5.5 (or earlier) and the property auto.create.topics.enable=false is set in your Kafka broker configuration, you must create topics manually before running the connector. Create the topics before configuring the connector to use created topics with the following connector configuration properties:

  • redo.log.topic.name
  • redo.log.corruption.topic
  • table.topic.name.template

Template variables

The connector uses template variables to create the name of the Kafka topic and the record key for each of the change events. The variables are similar to the Oracle GoldenGate Kafka Connect template variables which simplify migrating from Oracle GoldenGate to this connector. Variables are resolved at the task level and table level.

Connector and task variables

Variable keyword Description
${connectorName} Resolves to the name of the connector.
${databaseName} Resolves to the database name.
${emptyString} Resolves to an empty string.
${staticMap[]} Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square braces, in the following format: ${staticMap[dbo.table1=value1,dbo.table2=value2]}.
${currentTimestamp} or ${currentTimestamp[]} Resolves to the current timestamp. You can control the format of the current timestamp using Java-based formatting (see the SimpleDateFormat class documentation). Examples: ${currentDate}, ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}
${connectorName} Resolves to the name of the connector.

Table variables

Variable keyword Description
${schemaName} Resolves to the schema name for the table.
${tableName} Resolves to the short table name.
${fullyQualifiedTableName} Resolves to the fully-qualified table name including the period (.) delimiter between the schema and table names. For example, dbo.table1.

Column variables

Variable keyword Description
${columnName} Resolves to the column name.

Record variables

Variable keyword Description
${opType} Resolves to the type of the operation: READ, INSERT, UPDATE, DELETE, or TRUNCATE.
${opTimestamp} Resolves to the operation timestamp from the redo log.
${rowId} Resolves to the ID of the changed row.
${primaryKey} Resolves to the concatenated primary key values delimited by an underscore (_) character.
${primaryKeyStruct} Resolves to a SRUCT with fields for each of the primary key column values.
${primaryKeyStructOrValue} Resolves to either a STRUCT with fields for each of the 2+ primary key column values, or the column value if the primary key contains a single column.
${scn} Resolves to the system change number (SCN) when the change was made.
${cscn} Resolves to the system change number (SCN) when the change was committed.
${rbaseq} Resolves to the sequence number associated with the Redo Block Address (RBA) of the redo record associated with the change.
${rbablk} Resolves to the RBA block number within the log file.
${rbabyte} Resolves to the RBA byte offset within the block.
${currentTimestamp} or ${currentTimestamp[]} Resolves to the current timestamp. You can control the format of the current timestamp using Java-based formatting (see the SimpleDateFormat class documentation). Examples: ${currentDate}, ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

Supported data types

The following table lists data types and the associated Connect mapping.

Oracle data type SQL type code Connect mapping
CHAR or CHARACTER 1 STRING
LONG -1 STRING
VARCHAR 12 STRING
VARCHAR2 12 STRING
NCHAR -15 STRING
NVARCHAR2 -9 STRING
RAW -3 BYTES
LONG RAW -1 BYTES
INT or INTEGER 2 DECIMAL
SMALLINT 2 DECIMAL
DEC or DECIMAL 2 DECIMAL
NUMBER or NUMERIC 2 DECIMAL
DOUBLE PRECISION 6 FLOAT64
FLOAT 6 FLOAT64
REAL 6 FLOAT64
TIMESTAMP WITH TIMEZONE -101 TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE -102 TIMESTAMP
BLOB 2004 BYTES
CLOB 2005 BYTES
NCLOB 2011 BYTES
DATE 91 DATE

Note

The -101 and -102 codes for TIMESTAMP WITH TIMEZONE and TIMESTAMP WITH LOCAL TIMEZONE are Oracle-specific. BLOB, CLOB, NCLOB are handled out-of-band with a separate LOB topic.

Using the connector with Confluent Cloud

To run this connector with Kafka topics in Confluent Cloud, see Configuration examples for running against Confluent Cloud. Note that the configuration examples are based on running the connector with Confluent Platform version 6.0 (and later).