ELASTICSEARCH SERVICE SINK
Complete the following instructions to configure the Kafka Connect Elasticsearch connector with security. These instructions are based on the Elasticsearch document Encrypting HTTP Client communications.
Enter the following commands:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.tar.gz
tar xzvf elasticsearch-6.6.0.tar.gz
cd elasticsearch-6.6.0
Elasticsearch provides a certificate-generation utility named bin/elasticsearch-certutil. By default, this utility creates self-signed certificates. Use the commands below to generate your own certificates. This allows you to simulate a production-like environment.
bin/elasticsearch-certutil
Make a certificate working directory in the Elasticsearch config directory:
config
mkdir config/certs
cd config/certs
Generate the Certificate Authority (make sure your FQDN is localhost):
localhost
openssl req -new -x509 -keyout cacert.key -out cacert.pem -days 666
Generate a client certificate:
openssl genrsa -out client1.key 2048
Generate a certificate signing request:
openssl req -new -key client1.key -out client1.csr
Sign the request with the CA:
openssl x509 -req -in client1.csr -CA cacert.pem -CAkey cacert.key \ -CAcreateserial -out client1.crt -days 1825 -sha256
Repeat the previous steps for the next client:
openssl genrsa -out client2.key 2048
openssl req -new -key client2.key -out client2.csr
openssl x509 -req -in client2.csr -CA cacert.pem -CAkey cacert.key \ -CAcreateserial -out client2.crt -days 1825 -sha256
Package the connector keys as JKS:
openssl pkcs12 -export -out bundle.p12 -in client2.crt -inkey client2.key
keytool -keystore truststore.jks -import -file cacert.pem -alias cacert
keytool -destkeystore keystore.jks -importkeystore -srckeystore bundle.p12 -srcstoretype PKCS12
Return to the main Elasticsearch directory:
cd ../..
Update the Elasticsearch configuration file:
cat <<EOF >> config/elasticsearch.yml xpack.security.enabled: true xpack.security.http.ssl.enabled: true xpack.security.http.ssl.client_authentication: required xpack.security.http.ssl.key: certs/client1.key xpack.security.http.ssl.certificate: certs/client1.crt xpack.security.http.ssl.certificate_authorities: [ "certs/cacert.pem" ] EOF
Set the passwords:
bin/elasticsearch-keystore add xpack.security.http.ssl.secure_key_passphrase
Run Elasticsearch:
bin/elasticsearch
Test the connection:
curl --key config/certs/client2.key --cert config/certs/client2.crt \ --cacert config/certs/cacert.pem https://localhost:9200
Open a new terminal and change your current directory to <path-to-confluent>.
<path-to-confluent>
Save the configuration file as elastic.properties in etc/kafka-connect-elasticsearch and add the certificate paths.
elastic.properties
etc/kafka-connect-elasticsearch
cat <<EOF > etc/kafka-connect-elasticsearch/elastic-secure.properties name=elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=test-elasticsearch-sink key.ignore=true connection.url=https://localhost:9200 type.name=kafka-connect elastic.security.protocol=SSL elastic.https.ssl.keystore.location=/home/directory/elasticsearch-6.6.0/config/certs/keystore.jks elastic.https.ssl.keystore.password=asdfasdf elastic.https.ssl.key.password=asdfasdf elastic.https.ssl.keystore.type=JKS elastic.https.ssl.truststore.location=/home/directory/elasticsearch-6.6.0/config/certs/truststore.jks elastic.https.ssl.truststore.password=asdfasdf elastic.https.ssl.truststore.type=JKS elastic.https.ssl.protocol=TLS EOF
Start Connect and load the connector:
bin/confluent local services connect start
bin/confluent local services connect connector load elasticssl --config etc/kafka-connect-elasticsearch/elastic-secure.properties
Enter the following command:
bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test-elasticsearch-sink \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' {"f1": "secret1"} {"f1": "secret2"}
Query Elasticsearch:
curl --key config/certs/client2.key --cert config/certs/client2.crt / --cacert config/certs/cacert.pem 'https://localhost:9200/test-elasticsearch-sink/_search?pretty'