Salesforce Bulk API Sink Connector for Confluent Platform
The Kafka Connect Salesforce Bulk API Sink Connector integrates Salesforce with Apache Kafka®. The connector performs CRUD operations (insert, update, delete) on Salesforce SObjects using records available in Kafka topics and writes them to Salesforce. These records can be written into Kafka topic using Salesforce PushTopic Source Connector.
For example, given two Salesforce.com organizations or Instances: Instance A
and Instance B. Suppose a Data Engineer wants to synchronize Salesforce
Lead
objects from A to B. The Data Engineer can configure and deploy the
PushTopic Source connector to stream a Salesforce Lead
from Instance A into
a single Kafka topic, while the sink connector may be configured to stream a
Lead
from that topic into Instance B. Depending upon the configuration, all
changes to Lead
SObjects may be synchronized across organizations. This
connector can be used with standalone and distributed Connect workers.
Considerations
Note the following when using the Salesforce Bulk API Sink Connector.
Unexpected errors
When the connector is performing operations on Salesforce SObjects, unexpected
errors can occur that will be reported. The following lists several reasons why
errors may occur:
- Attempting to insert a duplicate record. Rules for determining duplicates are configurable in Salesforce.
- Attempting to delete, update, or upsert a record that does not exist because the
Id
field does not match.
- Attempting an operation on a field where the
Id
field value matches a previously deleted Id
field value.
ID field semantics
When the Salesforce Bulk API Sink Connector consumes records on Kafka topics
which originated from the Salesforce PushTopic Source Connector for Confluent Platform, an Id
field is included that is a
sibling of the other fields in the body of the SObject. Note that the Id
is
only valid within the Salesforce organization from which the record was
streamed. For upsert, delete, and update operations, attempting to rely on the
Id
field causes failures when used on different Salesforce organizations.
Inserts always ignore the Id
field because Id
fields are internally
managed in Salesforce. Upsert operations must be used with the external id
config options salesforce.use.custom.id.field=true
and
salesforce.custom.id.field.name=<externalIdField>
.
Caution
For update and delete operations across Salesforce organizations, an external
ID must be configured in Salesforce. Also, a custom ID must always be marked
as an external ID across both organizations.
Read-Only fields
Salesforce SObject fields may not be writable by insert, update, or upsert
operation because the fields are set with creatable=false
or
updatable=false
attributes within Salesforce. If a write is attempted to a
field with these attributes set, the sink connector excludes the field in the
operation rather than fail the entire operation. This behavior is not
configurable.
Event Type
The Salesforce Bulk API sink connector Kafka record format contains an
_EventType
field. This field describes the type of PushTopic event that
generated the record, if the record was created by the Salesforce PushTopic Source Connector for Confluent Platform. Types are
created
, updated
, and deleted
. When processing records, the sink
connector (by default) maps the _EventType
to either an insert
,
update
, or delete
operation on the configured SObject. This behavior can
be overridden using the override.event.type=true
and
salesforce.sink.object.operation=<sink operation>
fields. Overriding the
event type ignores the _EventType
field in the record and obeys the
salesforce.sink.object.operation
for every record.
API Limits
- The Salesforce Bulk API sink connector is limited by number of batches to execute, records per batch, and length of the batch. For detailed limitations, see Bulk API Limits.
- The Salesforce Bulk API supports only upsert operation with External Id field.
Quick Start
In this quick start, the Salesforce PushTopic source connector is used to get data into Kafka and the Salesforce Bulk API sink connector is used to export data from Kafka to Salesforce.
Populate a Kafka topic
Complete the steps in the Example: Configure Salesforce PushTopic Source Connector to get data into Kafka. Note the following:
- After creating the
Lead
and before checking the topic for the Lead
object, edit the name of the Lead
. Two records should be present when you check the contents of the Kafka topic: a record of _EventType created
and another of updated
.
- Complete the Salesforce Account steps to create a second account in a separate Salesforce organization from the sink connector. This account is used to configure the sink connector.
View Leads in Salesforce
Log into the secondary Salesforce organization and verify that the Leads
object exists with the correct name. It should match the primary Salesforce organization.
Running both connectors concurrently
- Running both the sink connector and the source connector in separate workers allows for synchronizing SObject changes in near real-time. If running in standalone mode, add a custom port to one of the workers using the
rest.port
configuration property.
- Add and change leads as necessary in the primary organization. The source connector captures your changes and writes them to the same topic that the sink connector reads from, and uses as a source for operations.