Google Cloud Functions Sink Connector for Confluent Cloud

Note

If you are installing the connector locally for Confluent Platform, see Google Functions Sink Connector for Confluent Platform.

The Kafka Connect Google Cloud Functions Sink Connector for Confluent Cloud integrates Apache Kafka® with Google Cloud Functions. For basic information about functions, see the Google Cloud Functions Documentation.

The connector consumes records from Kafka topic(s) and executes a Google Cloud Function. Each request sent to Google Cloud Functions can contain up to the max.batch.size number of records.

Important

If you are still on Confluent Cloud Enterprise, please contact your Confluent Account Executive for more information about using this connector.

Features

The Google Cloud Functions sink connector provides the following features:

  • Results from Google Cloud Functions are stored in the following topics:
    • success-<connector-id>
    • error-<connector-id>
  • Input data formats supported are Bytes, AVRO, JSON_SR (JSON Schema), JSON (Schemaless) and PROTOBUF. If no schema is defined, values are encoded as plain strings. For example, "name": "Kimberley Human" is encoded as name=Kimberley Human.

You can manage your full-service connector using the Confluent Cloud API. For details, see the Confluent Cloud API documentation.

Refer to Cloud connector limitations for additional information.

Quick Start

Use this quick start to get up and running with the Confluent Cloud Google Cloud Functions sink connector. The quick start provides the basics of selecting the connector and configuring it to stream events to a target Google Cloud Function.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on GCP.

  • Access to a Google Cloud function. For basic information about functions, see the Google Cloud Functions Documentation.

  • A GCP service account. You download service account credentials as a JSON file. This credentials file is uploaded when you set up the connector configuration properties

  • The Confluent Cloud CLI installed and configured for the cluster. See Install and Configure the Confluent Cloud CLI.

  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

  • The target Google Cloud function and the Kafka cluster should be in the same region.

  • The following connector-service account role must be enabled in your project:

    Google Project Connector Role
  • The Trigger type must be set to HTTP. Select Require authentication.

    HTTP Trigger Type

Using the Confluent Cloud GUI

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

Click Connectors. If you already have connectors in your cluster, click Add connector.

Step 3: Select your connector.

Click the Google Cloud Functions Sink connector icon.

Google Cloud Functions Sink Connector Icon

Step 4: Set up the connection.

Complete the following and click Continue.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  1. Select one or more topics.

  2. Enter a connector Name or choose the default name.

  3. Select an Input message format (data coming from the Kafka topic): AVRO, JSON_SR (JSON Schema), PROTOBUF, JSON (schemaless), or BYTES. A valid schema must be available in Schema Registry to use a schema-based message format.

    Note

    If no schema is defined, values are encoded as plain strings. For example, "name": "Kimberley Human" is encoded as name=Kimberley Human.

  4. Enter your Kafka Cluster credentials. The credentials are either the API key and secret or the service account API key and secret.

  5. Enter your function name and project ID. For basic information about Google Cloud Functions, see Your First Function.

  6. Upload your GCP credentials JSON file created as part of the prerequisites.

  7. Enter the Function Details.

    • Batch Size: The maximum number of records to combine when invoking a single Google Cloud function. Defaults to 1 (batching disabled). Accepts values from 1 to 1000.

    • Max pending requests: The maximum number of pending requests that can be made to Google Cloud functions concurrently.

    • Request timeout (ms): The maximum time in milliseconds (ms) that the connector attempts to request Google Cloud function access before timing out (socket timeout). The default is 300000 ms (5 minutes).

    • Retry timeout (ms):

      Google Cloud Functions Request Timeout
  8. Enter the number of tasks to use with the connector.

Configuration properties that are not provided in the GUI use the default values. See Google Functions Sink Connector Configuration Properties. for default values and property definitions.

Step 5: Launch the connector.

Verify the connection details and click Launch.

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running.

Step 7: Check for records.

Verify that records are being produced.

You can manage your full-service connector using the Confluent Cloud API. For details, see the Confluent Cloud API documentation.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

For additional information about this connector, see Google Functions Sink Connector for Confluent Platform. Note that not all Confluent Platform connector features are provided in the Confluent Cloud connector.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../_images/topology.png

Using the Confluent Cloud CLI

Complete the following steps to set up and run the connector using the Confluent Cloud CLI.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors.

Enter the following command to list available connectors:

ccloud connector-catalog list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

ccloud connector-catalog describe <connector-catalog-name>

For example:

ccloud connector-catalog describe GoogleCloudFunctionSink

Example output:

Following are the required configs:
connector.class
name
topics
input.data.format
kafka.api.key
kafka.api.secret
function.name
project.id
gcf.credentials.json
tasks.max

Configuration properties that are not listed use the default values. See Google Functions Sink Connector Configuration Properties. for default values and property definitions.

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "topics": "pageviews_proto",
  "input.data.format": "PROTOBUF",
  "connector.class": "GoogleCloudFunctionsSink",
  "name": "GoogleCloudFunctionsSinkConnector_0",
  "kafka.api.key": "****************",
  "kafka.api.secret": "****************************************************************",
  "function.name": "dev-test",
  "project.id": "connect-2021",
  "gcf.credentials.json": "*",
  "tasks.max": "1"
}

Note the following property definitions:

  • "topics": Identifies the topic name or a comma-separated list of topic names.
  • "input.data.format": Sets the input message format. Valid entries are AVRO, JSON_SR, PROTOBUF, JSON, or BYTES. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • "connector.class": Identifies the connector plugin name.
  • "name": Sets a name for your new connector.
  • "function.name": The name of your predefined Google Cloud function.
  • "project.id": Your GCP project ID.
  • "gcf.credentials.json": This contains the contents of the downloaded JSON file. See Formatting GCP credentials for details about how to format and use the contents of the downloaded credentials file.

Optional:

  • "max.batch.size": The maximum number of records to combine when invoking a single Google Cloud function. Defaults to 1 (batching disabled). Accepts values from 1 to 1000.
  • "max.pending.requests": The maximum number of pending requests that can be made to Google Cloud functions concurrently. Defaults to 1.
  • "request.timeout": The maximum time in milliseconds that the connector will attempt a request to Google Cloud Functions before timing out (i.e., socket timeout). Defaults to 300000 ms (5 minutes).
  • "retry.timeout": The total amount of time, in milliseconds (ms), that the connector will exponentially backoff and retry failed requests (i.e., throttling). Response codes that are retried are HTTP 429 Too Busy and HTTP 502 Bad Gateway. Defaults to 300000 ms (5 minutes). Enter -1 to configure this property for indefinite retries.
Formatting GCP credentials

The contents of the downloaded credentials file must be converted to string format before it can be used in the connector configuration.

  1. Convert the JSON file contents into string format. You can use an online converter tool to do this. For example: JSON to String Online Converter.

  2. Add the escape character \ before all \n entries in the Private Key section so that each section begins with \\n (see the highlighted lines below). The example below has been formatted so that the \\n entries are easier to see. Most of the credentials key has been omitted.

    Tip

    A script is available that converts the credentials to a string and also adds additional \ escape characters where needed. See Stringify GCP Credentials.

      {
          "connector.class": "GoogleCloudFunctionsSink",
          "name": "GoogleCloudFunctionsSinkConnector_0",
          "kafka.api.key": "<my-kafka-api-key>",
          "kafka.api.secret": "<my-kafka-api-secret>",
          "topics": "<topic-name>",
          "data.format": "AVRO",
          "function.name": "dev-test",
          "project.id": "connect-2021",
          "gcf.credentials.json": "{\"type\":\"service_account\",\"project_id\":\"connect-
          1234567\",\"private_key_id\":\"omitted\",
          \"private_key\":\"-----BEGIN PRIVATE KEY-----
          \\nMIIEvAIBADANBgkqhkiG9w0BA
          \\n6MhBA9TIXB4dPiYYNOYwbfy0Lki8zGn7T6wovGS5pzsIh
          \\nOAQ8oRolFp\rdwc2cC5wyZ2+E+bhwn
          \\nPdCTW+oZoodY\\nOGB18cCKn5mJRzpiYsb5eGv2fN\/J
          \\n...rest of key omitted...
          \\n-----END PRIVATE KEY-----\\n\",
          \"client_email\":\"pub-sub@connect-123456789.iam.gserviceaccount.com\",
          \"client_id\":\"123456789\",\"auth_uri\":\"https:\/\/accounts.google.com\/o\/oauth2\/
          auth\",\"token_uri\":\"https:\/\/oauth2.googleapis.com\/
          token\",\"auth_provider_x509_cert_url\":\"https:\/\/
          www.googleapis.com\/oauth2\/v1\/
          certs\",\"client_x509_cert_url\":\"https:\/\/www.googleapis.com\/
          robot\/v1\/metadata\/x509\/pub-sub%40connect-
          123456789.iam.gserviceaccount.com\"}",
          "tasks.max": "1"
      }
    
  3. Add all the converted string content to the "gcf.credentials.json" section of your configuration file as shown in the example above.

Step 4: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

ccloud connector create --config <file-name>.json

For example:

ccloud connector create --config google-functions-sink-config.json

Example output:

Created connector GoogleCloudFunctionsSinkConnector_0 lcc-ix4dl

Step 5: Check the connector status.

Enter the following command to check the connector status:

ccloud connector list

Example output:

ID          |       Name                          | Status  | Type
+-----------+-------------------------------------+---------+------+
lcc-ix4dl   | GoogleCloudFunctionsSinkConnector_0 | RUNNING | sink

Step 6: Check for records.

Verify that records are being produced.

You can manage your full-service connector using the Confluent Cloud API. For details, see the Confluent Cloud API documentation.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

For additional information about this connector, see Google Functions Sink Connector for Confluent Platform. Note that not all Confluent Platform connector features are provided in the Confluent Cloud connector.

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../_images/topology.png