Salesforce Bulk API Sink Connector for Confluent Platform

The Kafka Connect Salesforce Bulk API Sink Connector integrates Salesforce with Apache Kafka®. The connector performs CRUD operations (insert, update, delete) on Salesforce SObjects using records available in Kafka topics and writes them to Salesforce. These records can be written into Kafka topic using Salesforce PushTopic Source Connector.

For example, given two Salesforce.com organizations or Instances: Instance A and Instance B. Suppose a Data Engineer wants to synchronize Salesforce Lead objects from A to B. The Data Engineer can configure and deploy the PushTopic Source connector to stream a Salesforce Lead from Instance A into a single Kafka topic, while the sink connector may be configured to stream a Lead from that topic into Instance B. Depending upon the configuration, all changes to Lead SObjects may be synchronized across organizations. This connector can be used with standalone and distributed Connect workers.

Note

The connector’s salesforce.object property indicates the name of the SObject to operate on. The structure and format of input messages for the sink connector is identical to the output format of the Salesforce PushTopic Source Connector for Confluent Platform.

Configuration Properties

For a complete list of configuration properties for the sink connector, see Salesforce Bulk API Sink Connector Configuration Properties.

Considerations

Note the following when using the Salesforce Bulk API Sink Connector.

Unexpected errors

When the connector is performing operations on Salesforce SObjects, unexpected errors can occur that will be reported. The following lists several reasons why errors may occur:

  • Attempting to insert a duplicate record. Rules for determining duplicates are configurable in Salesforce.
  • Attempting to delete, update, or upsert a record that does not exist because the Id field does not match.
  • Attempting an operation on a field where the Id field value matches a previously deleted Id field value.

ID field semantics

When the Salesforce Bulk API Sink Connector consumes records on Kafka topics which originated from the Salesforce PushTopic Source Connector for Confluent Platform, an Id field is included that is a sibling of the other fields in the body of the SObject. Note that the Id is only valid within the Salesforce organization from which the record was streamed. For upsert, delete, and update operations, attempting to rely on the Id field causes failures when used on different Salesforce organizations. Inserts always ignore the Id field because Id fields are internally managed in Salesforce. Upsert operations must be used with the external id config options salesforce.use.custom.id.field=true and salesforce.custom.id.field.name=<externalIdField>.

Caution

For update and delete operations across Salesforce organizations, an external ID must be configured in Salesforce. Also, a custom ID must always be marked as an external ID across both organizations.

Input topic record format

The input topic record format is expected to be the same as the record format written to output topics by the Salesforce PushTopic Source Connector for Confluent Platform. The Kafka key value is not required. Please refer to the Salesforce PushTopic Source Connector for Confluent Platform for an example.

Read-Only fields

Salesforce SObject fields may not be writable by insert, update, or upsert operation because the fields are set with creatable=false or updatable=false attributes within Salesforce. If a write is attempted to a field with these attributes set, the sink connector excludes the field in the operation rather than fail the entire operation. This behavior is not configurable.

Event Type

The Salesforce Bulk API sink connector Kafka record format contains an _EventType field. This field describes the type of PushTopic event that generated the record, if the record was created by the Salesforce PushTopic Source Connector for Confluent Platform. Types are created, updated, and deleted. When processing records, the sink connector (by default) maps the _EventType to either an insert, update, or delete operation on the configured SObject. This behavior can be overridden using the override.event.type=true and salesforce.sink.object.operation=<sink operation> fields. Overriding the event type ignores the _EventType field in the record and obeys the salesforce.sink.object.operation for every record.

API Limits

  • The Salesforce Bulk API sink connector is limited by number of batches to execute, records per batch, and length of the batch. For detailed limitations, see Bulk API Limits.
  • The Salesforce Bulk API supports only upsert operation with External Id field.

Quick Start

In this quick start, the Salesforce PushTopic source connector is used to get data into Kafka and the Salesforce Bulk API sink connector is used to export data from Kafka to Salesforce.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Populate a Kafka topic

Complete the steps in the Example: Configure Salesforce PushTopic Source Connector to get data into Kafka. Note the following:

  • After creating the Lead and before checking the topic for the Lead object, edit the name of the Lead. Two records should be present when you check the contents of the Kafka topic: a record of _EventType created and another of updated.
  • Complete the Salesforce Account steps to create a second account in a separate Salesforce organization from the sink connector. This account is used to configure the sink connector.

Configure the Connector

Prerequisites

Note

Add the following Single Message Transform (SMT) to the connector configuration to process records generated by the Salesforce Bulk API Source connector.

"transforms" : "InsertField",
"transforms.InsertField.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field" : "_EventType",
"transforms.InsertField.static.value" : "created"
  1. Create a configuration file named salesforce-bulk-api-leads-sink-config.json with the following contents. Make sure to enter a real username, password, security token, consumer key, and consumer secret. For details about configuration properties, see Configuration Properties.

    {
       "name" : "SalesforceBulkApiSinkConnector",
       "config" : {
    
         "connector.class" : "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector",
         "tasks.max" : "1",
         "topics" : "sfdc-pushtopic-lead",
         "salesforce.object" : "Lead",
         "salesforce.password" : "< Required Configuration >",
         "salesforce.password.token" : "< Required Configuration >",
         "salesforce.username" : "< Required Configuration: secondary organization username >",
         "reporter.result.topic.replication.factor" : "1",
         "reporter.error.topic.replication.factor" : "1",
         "reporter.bootstrap.servers" : "localhost:9092",
         "confluent.topic.bootstrap.servers": "localhost:9092",
         "confluent.topic.replication.factor": "1",
         "confluent.license": " Omit to enable trial mode "
       }
     }
    

    Note

    For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

  2. Enter the Confluent CLI command to start the Salesforce sink connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local load SalesforceBulkApiSinkConnector -- -d salesforce-bulk-api-leads-sink-config.json
    

    Your output should resemble:

    {
       "name": "SalesforceBulkApiSinkConnector",
        "config": {
            "connector.class" : "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector",
            "tasks.max" : "1",
            "topics" : "sfdc-pushtopic-leads",
            "salesforce.object" : "Lead",
            "salesforce.username" : "<Required>"
            "salesforce.password" : "<Required>",
            "salesforce.password.token" : "<Required>",
            "reporter.result.topic.replication.factor" : "1",
            "reporter.error.topic.replication.factor" : "1",
            "reporter.bootstrap.servers" : "localhost:9092",
            "confluent.topic.bootstrap.servers": "localhost:9092",
            "confluent.topic.replication.factor": "1",
            "confluent.license": " Omit to enable trial mode "
        },
        "tasks": [
            ...
        ],
        "type": null
    }
    

    Tip

    The tasks field may include information about the one started task.

View Leads in Salesforce

Log into the secondary Salesforce organization and verify that the Leads object exists with the correct name. It should match the primary Salesforce organization.

Running both connectors concurrently

  • Running both the sink connector and the source connector in separate workers allows for synchronizing SObject changes in near real-time. If running in standalone mode, add a custom port to one of the workers using the rest.port configuration property.
  • Add and change leads as necessary in the primary organization. The source connector captures your changes and writes them to the same topic that the sink connector reads from, and uses as a source for operations.