Azure Cognitive Search Sink Connector for Confluent Platform

The Kafka Connect Azure Cognitive Search Sink Connector allows moving data from Apache Kafka® to Azure Cognitive Search. It writes each event from a topic in Kafka as document to an index in Azure Cognitive Search.

The connector leverages Azure Cognitive Search’s REST API to send records as documents.

Features

The Azure Cognitive Search Sink Connector offers the following features:

At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Ordered writes

The connector writes records in exactly the same order that it receives them. And for uniqueness, the Kafka coordinates (topic, partition, and offset) can be used as the document key. Otherwise, the connector can use the record key as the document key.

Reporting

The connector writes the HTTP responses from Azure Cognitive Search to success and error topics for each individual record.

Automatic retries

Occasionally, there may be issues writing to the Azure Cognitive Search service and the connector will retry all retry-able requests. The maximum amount of time that the connector spends retrying can be specified by the max.retry.ms config.

Prerequisites

The following are required to run the Kafka Connect Azure Cognitive Search Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above
  • Connect: Confluent Platform 4.1.0 or above
  • Java 1.8

Install the Azure Cognitive Search Sink Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

Note

You must install the connector on every machine where Connect will run.

  • An install of the Confluent Hub Client.

    Note

    This is installed by default with Confluent Enterprise.

  • An install of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-azure-search:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-azure-search:1.0.0-preview
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Confluent license properties for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Azure Cognitive Search Sink Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Quick Start

This quick start uses the Azure Cognitive Search Sink Connector to consume records and write them as documents to an Azure Cognitive Search service.

Prerequisites
  1. Before starting the connector, create and deploy an Azure Cognitive Search service.

    Note

    Ensure the index has the default name hotels-sample-index and only has the fields HotelId, HotelName, Description. All others should be deleted.

  2. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent-hub install confluentinc/kafka-connect-azure-seach:latest
    
  3. Start Confluent Platform using the Confluent CLI commands.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services start
    
  4. Produce test data to the hotels-sample topic in Kafka.

    Start the Avro console producer to import a few records to Kafka:

    <path-to-confluent>/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic hotels-sample \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"HotelName","type":"string"},{"name":"Description","type":"string"}]}' \
    --property key.schema='{"type":"string"}' \
    --property "parse.key=true" \
    --property "key.separator=,"
    

    Then in the console producer, enter:

    "marriotId",{"HotelName": "Marriot", "Description": "Marriot description"}
    "holidayinnId",{"HotelName": "HolidayInn", "Description": "HolidayInn description"}
    "motel8Id",{"HotelName": "Motel8", "Description": "motel8 description"}
    

    The three records entered are published to the Kafka topic hotels-sample in Avro format.

  5. Create a azure-search.json file with the following contents:

    {
      "name": "azure-search",
      "config": {
        "topics": "hotels-sample",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.azure.search.AzureSearchSinkConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "azure.search.service.name": "<the created Search service name>",
        "azure.search.api.key": "<the copied api key>",
        "index.name": "${topic}-index",
        "reporter.bootstrap.servers": "localhost:9092",
        "reporter.error.topic.name": "test-error",
        "reporter.error.topic.replication.factor": 1,
        "reporter.error.topic.key.format": "string",
        "reporter.error.topic.value.format": "string",
        "reporter.result.topic.name": "test-result",
        "reporter.result.topic.key.format": "string",
        "reporter.result.topic.value.format": "string",
        "reporter.result.topic.replication.factor": 1
      }
    }
    

    Caution

    Do not forget to change the azure.search.service.name and azure.search.api.key values in the JSON file to the copied name and admin key respectively. Find your azure.search.service.name.

    Note

    For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

  6. Load the Azure Cognitive Search Sink Connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load azure-search --config path/to/azure-search.json
    

    Important

    Don’t use the Confluent CLI commands in production environments.

  7. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status azure-search
    
  8. Confirm that the messages were delivered to the result topic in Kafka

    confluent local services kafka consume test-result --from-beginning
    
  9. Confirm that the messages were delivered to Azure Cognitive Search.

  10. Log in to the service and check that the index hotel-samples-index contains the three written records from before.

  11. Clean up resources:

    1. Delete the connector

      confluent local services connect connector unload azure-search
      
    2. Stop Confluent Platform

      confluent local stop
      
    3. Delete the created Azure Cognitive Search service and its resource group in the Azure portal.

Additional Documentation