PAGERDUTY SINK CONNECTOR
The Kafka Connect PagerDuty Sink connector is used to read records from an Apache Kafka® topic and create Pagerduty incidents.
The PagerDuty Sink Connector offers the following features:
behavior.on.error
fail
log
ignore
pagerduty.max.retry.time.seconds
The following are required to run the Kafka Connect Pagerduty Sink Connector:
You can install this connector by using the instructions or you can manually download the ZIP file.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.
latest
confluent-hub install confluentinc/kafka-connect-pagerduty:latest
You can install a specific version by replacing latest with a version number. For example:
confluent-hub install confluentinc/kafka-connect-pagerduty:1.0.0-preview
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
For a complete list of configuration properties for this connector, see PagerDuty Sink Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.
The quick start guide uses PagerDuty Sink Connector to consume records from a Kafka topic and create incidents in PagerDuty.
If you have a service already created in PagerDuty, you can find the Service ID by navigating to Configuration > Services. Select your service. The Service ID is in the URL bar. For example: https://connector-test.pagerduty.com/service-directory/<serviceId>.
https://connector-test.pagerduty.com/service-directory/<serviceId>
You can add a service in PagerDuty. When you add a service, select Use our API directly as the Integration type. After the service is added, you will be redirected to the URL where you can copy the Service ID.
PagerDuty integration setting
Start the Confluent services using the following Confluent CLI command:
confluent local services start
Important
Do not use the Confluent CLI in production environments.
Create a configuration file pagerduty-sink.properties with the following content. This file should be placed inside the Confluent Platform installation directory. This configuration is used typically along with standalone workers.
pagerduty-sink.properties
name=pagerduty-sink-connector topics=incidents connector.class=io.confluent.connect.pagerduty.PagerDutySinkConnector tasks.max=1 pagerduty.api.key=**** behavior.on.error=fail key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 confluent.license= reporter.bootstrap.servers=localhost:9092 reporter.result.topic.replication.factor=1 reporter.error.topic.replication.factor=1
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
Run the connector with this configuration.
confluent local services connect connector load pagerduty-sink-connector --config pagerduty-sink.properties
The output should resemble:
{ "name":"pagerduty-sink-connector", "config":{ "topics":"incidents", "tasks.max":"1", "connector.class":"io.confluent.connect.pagerduty.PagerDutySinkConnector", "pagerduty.api.key":"****", "behavior.on.error":"fail", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://localhost:8081", "confluent.topic.bootstrap.servers":"localhost:9092", "confluent.topic.replication.factor":"1", "reporter.bootstrap.servers": "localhost:9092", "reporter.result.topic.replication.factor":"1", "reporter.error.topic.replication.factor":"1" "name":"pagerduty-sink-connector" }, "tasks":[ { "connector":"pagerduty-sink-connector", "task":0 } ], "type":"sink" }
Confirm that the connector is in a RUNNING state.
RUNNING
confluent local services connect connector status pagerduty-sink-connector
{ "name":"pagerduty-sink-connector", "connector":{ "state":"RUNNING", "worker_id":"127.0.1.1:8083" }, "tasks":[ { "id":0, "state":"RUNNING", "worker_id":"127.0.1.1:8083" } ], "type":"sink" }
Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect Kafka Connect REST Interface
config.json
{ "name" : "pagerduty-sink-connector", "config" : { "topics":"incidents", "connector.class":"io.confluent.connect.pagerduty.PagerDutySinkConnector", "tasks.max" : "1", "pagerduty.api.key":"****", "behavior.on.error":"fail", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://localhost:8081", "confluent.topic.bootstrap.servers":"localhost:9092", "confluent.topic.replication.factor":"1", "confluent.license":" Omit to enable trial mode ", "reporter.bootstrap.servers": "localhost:9092", "reporter.result.topic.replication.factor":"1", "reporter.error.topic.replication.factor":"1" } }
Change the confluent.topic.bootstrap.servers property to include your broker address(es) and change the confluent.topic.replication.factor to 3 for staging or production use.
confluent.topic.bootstrap.servers
confluent.topic.replication.factor
3
Use curl to post a configuration to one of the Kafka Connect workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).
http://localhost:8083/
curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/pagerduty-sink-connector/config
Confirm that the connector is in a RUNNING state by running the following command:
curl http://localhost:8083/connectors/pagerduty-sink-connector/status | jq
To produce Avro data to Kafka topic: incidents, use the following command.
incidents
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic incidents --property value.schema='{"type":"record","name":"details","fields":[{"name":"fromEmail","type":"string"}, {"name":"serviceId","type":"string"},{"name":"incidentTitle","type":"string"}]}'
While the console is waiting for the input, use the following three records and paste each of them on the console.
{"fromEmail":"user1@abc.com", "serviceId":"<your-service-id>", "incidentTitle":"Incident Title x 0"} {"fromEmail":"user2@abc.com", "serviceId":"<your-service-id>", "incidentTitle":"Incident Title x 1"} {"fromEmail":"user3@abc.com", "serviceId":"<your-service-id>", "incidentTitle":"Incident Title x 2"}
The fromEmail in the records should be of registered PagerDuty user to allow creation of incidents
fromEmail
Finally, check the PagerDuty incident dashboard to see the newly created incidents.
The PagerDuty Sink Connector expects the value of records in Kafka topic to be either of type JSON String or Avro. In either case, the value should adhere to the following conditions:
serviceId
incidentTitle
body
urgency
escalationPolicy
priority
Following is the value schema for Pagerduty incident:
{ "name":"PagerdutyValueSchema", "type":"record", "fields":[ { "name":"fromEmail", "type":"string" }, { "name":"serviceId", "type":"string" }, { "name":"incidentTitle", "type":"string" }, { "name":"priorityId", "type":"string", "isOptional":true }, { "name":"urgency", "type":"string", "isOptional":true }, { "name":"bodyDetails", "type":"string", "isOptional":true }, { "name":"escalationPolicyId", "type":"string", "isOptional":true } ] }