Example: Configure Salesforce PushTopic Source Connector

The Salesforce PushTopic source connector is used to subscribe to PushTopics and capture the create, update, delete, and undelete events related to Salesforce Objects (SObjects).

Salesforce Account

  1. Use https://login.salesforce.com/ to log into an existing account or create a new one.

    Tip

    You can also use the https://test.salesforce.com/ test environment instead. Make sure to add Leads in the same environment and set salesforce.instance=https://test.salesforce.com in your connector configurations.

  2. Create a new Connected App.

    1. Select the gear icon in the upper right hand corner and choose Setup.
    2. Enter App in the Quick Find search box, and choose App Manager in the filtered results.
    3. Click the New Connected App button in the upper right corner of the Setup panel.
    4. Supply a Connected App Name, API Name, and Contact Email.
    5. Select Enable OAuth Settings checkbox and select the Enable for Device Flow checkbox. These selections enable the connector to use the Salesforce API.
    6. Under the Select OAuth Scopes field, select all of the items under Available OAuth scopes and add them to the Selected OAuth Scopes.
    7. Save the new app and press Continue at the prompt.
    8. Look for the Consumer Key and Consumer Secret in the displayed form. Save these so you can put them in the configuration properties file for the Salesforce connect worker.
  3. Find your Security Token (emailed to you from Salesforce.com). If you need to reset your token or view your profile on Salesforce.com, select Reset My Security Token and follow the instructions.

Complete the following steps for JWT token bearer:

  1. Log into the account at https://login.salesforce.com/.
  2. Go to the setup area (the gear icon, top right).
  3. Go to Apps > App Manager (in the side navigation area).
  4. Click New Connect App.
  5. Populate the required Basic Information fields.
  6. In the API (Enable OAuth Settings) section:
    • Check Enable OAuth Settings.
    • Callback URL is unused in the JWT flow, but it still requires a value. Use “http://localhost/” or any other sample host URL.
    • Check Use digital signatures. Upload the salesforce.crt that was generated earlier.
    • For Selected OAuth Scopes, add Access and manage your data (api) and Perform requests on your behalf at any time (refresh_token, offline_access).
  7. Click Save. If there are any errors, upload salesforce.crt again.
  8. On the resulting app page, click Manage.
    • Click Edit Policies.
    • In the OAuth policies section, change Permitted Users to Admin approved users are pre-authorized.
    • Click Save.
  9. In the Profiles section (on the app page), click Manage Profiles.
  10. On the Application Profile Assignment page, assign the user profiles that may access the app.
  11. Complete the following steps to get the Salesforce app Consumer Key:
    1. Log into the account at https://login.salesforce.com/.
    2. Go to the setup area (the gear icon, top right).
    3. Go to Apps > App Manager (in the side navigation area).
    4. In the list, find the app that you created in the App Creation in a previous step.
    5. Click View from the app row drop-down selections. The Consumer Ke is in the API (Enable OAuth Settings) section.

Install the Salesforce Connector Plugin

Prerequisites
  1. Navigate to your Confluent Platform installation directory and run this command to install the latest version of the Salesforce connector.

    confluent-hub install confluentinc/kafka-connect-salesforce:latest
    

    Tip

    To install a specific version of connector, replace latest with version number (for example, 1.1.0-preview) in the command above.

  2. Restart Connect to pick up the new plugin.

    confluent local services connect stop && confluent local services connect start
    
  3. Check if the Salesforce plugin has been installed correctly and picked up by the plugin loader.

    curl -sS localhost:8083/connector-plugins | jq -c '.[] | select( .class | contains("Salesforce") )'
    

    This command should print the new Salesforce connector plugins available to the worker.

    {"class": "io.confluent.salesforce.SalesforcePlatformEventSourceConnector","type": "source","version": "1.1.0-preview"}
    {"class": "io.confluent.salesforce.SalesforcePushTopicSourceConnector","type": "source","version": "1.1.0-preview"}
    {"class": "io.confluent.salesforce.SalesforceSourceConnector","type": "source","version": "1.1.0-preview"}
    

    Tip

    The SalesforceSourceConnector connector is the old deprecated connector. The SalesforcePushTopicSourceConnector is identical to the deprecated connector and should be used in new connectors. When it’s convenient, change your existing configurations to use it.

Add a Lead to Salesforce

  1. In the Salesforce UI, find and click on the App launcher icon in the upper left of the page.
  2. Find and start the “Leads” app.
  3. Click the New button, fill out the form, and save it.

Configure Source Connector

Prerequisites

Note

You set the following connector configuration properties to enable OAuth JWT bearer token support:

  • salesforce.username
  • salesforce.consumer.key
  • salesforce.jwt.keystore.path
  • salesforce.jwt.keystore.password
  1. Create a configuration file named salesforce-pushtopic-lead-source-config.json with the following contents. Make sure to enter a real username, password, security token, consumer key, and consumer secret. See Salesforce PushTopic Source Connector Configuration Properties for more information on these and the other configuration properties.

    {
         "name": "lead-pushtopic",
         "config": {
             "connector.class" : "io.confluent.salesforce.SalesforcePushTopicSourceConnector",
             "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
             "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
             "tasks.max" : "1",
             "kafka.topic" : "sfdc-pushtopic-leads",
             "salesforce.object" : "Lead",
             "salesforce.push.topic.name" : "LeadsPushTopic",
             "salesforce.username" : "<Required>",
             "salesforce.password" : "<Required>",
             "salesforce.password.token" : "<Required>",
             "salesforce.consumer.key" : "<Required>",
             "salesforce.consumer.secret" : "<Required>",
             "salesforce.initial.start" : "all",
             "confluent.topic.bootstrap.servers": "localhost:9092",
             "confluent.topic.replication.factor": "1",
             "confluent.license": " Omit to enable trial mode "
         }
     }
    
  2. Enter the Confluent CLI confluent local services connect connector load command to start the Salesforce source connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services connect connector load lead-pushtopic --config salesforce-pushtopic-lead-source-config.json
    

    Your output should resemble:

    {
       "name": "lead-pushtopic",
        "config": {
            "connector.class" : "io.confluent.salesforce.SalesforcePushTopicSourceConnector",
            "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
            "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
            "tasks.max" : "1",
            "kafka.topic" : "sfdc-pushtopic-leads",
            "salesforce.object" : "Lead",
            "salesforce.push.topic.name" : "LeadsPushTopic",
            "salesforce.username" : "<Required>",
            "salesforce.password" : "<Required>",
            "salesforce.password.token" : "<Required>",
            "salesforce.consumer.key" : "<Required>",
            "salesforce.consumer.secret" : "<Required>",
            "confluent.topic.bootstrap.servers": "localhost:9092",
            "confluent.topic.replication.factor": "1",
            "confluent.license": " Omit to enable trial mode "
        },
        "tasks": [
            ...
        ],
        "type": null
    }
    

    Tip

    The tasks field may include information about the one started task.

View Leads in Kafka Topic

Navigate to your Confluent Platform installation directory and enter the following command to install the latest version of the Salesforce connector.

kafka-avro-console-consumer --bootstrap-server kafka:9092 --from-beginning --topic sfdc-pushtopic-leads

Add More Leads to Salesforce

Add and change leads as necessary. The connector captures your changes and writes them to the same topic.