Use this quick start to get up and running with the Confluent Cloud MongoDB Atlas sink
connector. The quick start provides the basics of selecting the connector and
configuring it to consume data from Kafka and persist the data to a MongoDB
database.
Adding an IP Whitelist Entry
By default, MongoDB Atlas does not allow external network connections from the Internet. To allow external connections, you can add a specific IP or a CIDR IP range using the IP Whitelist entry dialog box under the Network Access menu in MongoDB.
In order for Confluent Cloud to connect to MongoDB Atlas, you need to specify the public IP address of your Confluent Cloud cluster. Since Confluent Cloud provides an IP address dynamically, you need to add 0.0.0.0/0
as the whitelist entry to your MongoDB Atlas cluster.
Using the Confluent Cloud GUI
Step 2: Add a connector.
Click Connectors. If you already have connectors in your cluster, click Add connector.
Step 3: Select your connector.
Click the MongoDB Atlas Sink connector icon.
Step 4: Set up the connection.
Complete the following and click Continue.
Note
- Make sure you have all your prerequisites completed.
- An asterisk ( * ) designates a required entry.
Select one or more topics.
Enter a connector name.
Select an Input message format (data coming from the Kafka topic): AVRO, JSON_SR (JSON Schema), PROTOBUF, JSON (schemaless), String, or BSON. A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
Enter your Kafka Cluster credentials. The credentials are either the API key and secret or the service account API key and secret.
Enter the MongoDB database details. For the Connection host, use only the hostname and not a full URL. For example: cluster4-r5q3r7.gcp.mongodb.net
.
Enter your MongoDB collection name. For multiple topics, this is the default collection the topics are mapped to.
The following are optional (with the exception of the number of tasks).
Enter the maximum number of retries on a write error. The default value is 3 retries.
Enter value in milliseconds (ms) that a retry gets deferred. The default is 5000 ms (5 seconds).
Enter the maximum number of records to batch together for processing. The default is 0.
Select whether or not the connector deletes documents with matching key values when the value is null. The default is false.
Select the strategy to generate a unique document ID (_id). To delete document when the value is null, this has to be set to FullKeyStrategy
, PartialKeyStrategy
, or ProvidedInKeyStrategy
. The default is BsonOidStrategy
. For more information, see DocumentIdAdder.
Depending on the selected strategy, complete the appropriate Document ID strategy projection list fields:
- If you selected
PartialKeyStrategy
, allow or block the custom key fields to be projected for ID strategy.
- If the
PartialValueStrategy
is chosen, allow or block the custom value fields to be projected for ID strategy.
Select the write model strategy for bulk write operations. The default is ReplaceOneDefaultStrategy
.
Enter the number of tasks for the connector. Refer to Confluent Cloud connector limitations for additional information.
Step 5: Launch the connector.
Verify the connection details and click Launch.
Step 6: Check the connector status.
The status for the connector should go from Provisioning to Running. It may take a few minutes.
Step 7: Check MongoDB
After the connector is running, verify that messages are populating your MongoDB database.
Tip
When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.
For additional information about this connector, see the MongoDB Kafka Connector documentation. Note that not all connector features are provided in the Confluent Cloud connector.
See also
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.
Using the Confluent Cloud CLI
Complete the following steps to set up and run the connector using the Confluent Cloud CLI.
Step 1: List the available connectors.
Enter the following command to list available connectors:
ccloud connector-catalog list
Step 2: Show the required connector configuration properties.
Enter the following command to show the required connector properties:
ccloud connector-catalog describe <connector-catalog-name>
For example:
ccloud connector-catalog describe MongoDbAtlasSink
Example output:
Following are the required configs:
connector.class: MongoDbAtlasSink
name
kafka.api.key
kafka.api.secret
input.data.format
connection.host
connection.user
connection.password
database
tasks.max
topics
Step 3: Create the connector configuration file.
Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.
{
"connector.class": "MongoDbAtlasSink",
"name": "confluent-mongodb-sink",
"kafka.api.key": "<my-kafka-api-key",
"kafka.api.secret": "<my-kafka-api-secret>",
"input.data.format" : "JSON",
"connection.host": "<database-host-address>",
"connection.user": "<my-username>",
"connection.password": "<my-password>",
"topics": "<kafka-topic-name>",
"max.num.retries": "3",
"retries.defer.timeout": "5000",
"max.batch.size": "0",
"database": "<database-name>",
"collection": "<collection-name>",
"tasks.max": "1"
}
Note the following property definitions:
"connector.class"
: Identifies the connector plugin name.
"name"
: Sets a name for your new connector.
"input.data.format"
: Sets the input message format (data coming from the Kafka topic). Valid entries are AVRO, JSON_SR, PROTOBUF, JSON, STRING, or BSON. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
"connection.host"
: The MongoDB host. Use a hostname address and not a full URL. For example: cluster4-r5q3r7.gcp.mongodb.net
.
"collection"
: The MongoDB collection name. For multiple topics, this is the default collection the topics are mapped to.
The following are optional (with the exception of the number of tasks).
"max.num.retries"
: How often retries should be attempted on write errors. If not used, this property defaults to 3.
"retries.defer.timeout"
: How long (in milliseconds) a retry should get deferred. If not used, the default is 5000 ms.
"max.batch.size"
: The maximum number of sink records to batch together for processing. If not used, this property defaults to 0.
"delete.on.null.values"
: Whether the connector should delete documents with matching key values, when the value is null. If not used, this property defaults to false
.
"doc.id.strategy"
: Sets the strategy to generate unique document ID (_id). Enter the strategy to generate a unique document ID (_id). To delete document when the value is null, this has to be set to FullKeyStrategy
, PartialKeyStrategy
, or ProvidedInKeyStrategy
. The default is BsonOidStrategy
. For more information, see DocumentIdAdder.
Depending on the selected strategy, add the appropriate Document ID strategy projection list:
"key.projection.type"
: For use with PartialKeyStrategy
. Use either allowlist
or blocklist
to allow or block the custom key fields to be projected for ID strategy. If not used, this property defaults to none
.
"key.projection.list"
: For use with PartialKeyStrategy
. A comma-separated list of key fields to be projected for ID strategy.
"value.projection.type"
: For use with PartialValueStrategy
. Use either allowlist
or blocklist
to allow or block the custom value fields to be projected for ID strategy. If not used, this property defaults to none
.
"value.projection.list"
: For use with PartialValueStrategy
. A comma-separated list of value fields to be projected for ID strategy.
"write.strategy"
: Sets the write model for bulk write operations. If not used, this property defaults to ReplaceOneDefaultStrategy
.
Enter the number of tasks for the connector. Refer to Confluent Cloud connector limitations for additional information.
Step 4: Load the properties file and create the connector.
Enter the following command to load the configuration and start the connector:
ccloud connector create --config <file-name>.json
For example:
ccloud connector create --config mongo-db-sink.json
Example output:
Created connector confluent-mongodb-sink lcc-ix4dl
Step 5: Check the connector status.
Enter the following command to check the connector status:
Example output:
ID | Name | Status | Type
+-----------+-------------------------+---------+------+
lcc-ix4dl | confluent-mongodb-sink | RUNNING | sink
Step 6: Check MongoDB
After the connector is running, verify that records are populating your MongoDB database.
Tip
When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.
For additional information about this connector, see the MongoDB Kafka Connector documentation. Note that not all connector features are provided in the Confluent Cloud connector.