To get started with JSON Schema, you can use the command line producer and consumer for JSON Schema.
The command line producer and consumer are useful for understanding how the built-in JSON Schema support works on Confluent Platform.
When you incorporate the serializer and deserializer into the code for your own
producers and consumers, messages and associated schemas are processed the same
way as they are on the console producers and consumers.
Start Confluent Platform using the following command:
confluent local services start
Tip
- Alternatively, you can simply run
confluent local schema-registry
which also starts kafka
and zookeeper
as dependencies.
This demo does not directly reference the other services, such as Connect and Control Center. That said, you may want to run the full
stack anyway to further explore, for example, how the topics and messages display on Control Center. To learn more about confluent local
,
see Quick Start for Apache Kafka using Confluent Platform (Local) and confluent local in the Confluent CLI command reference.
- The
confluent local
commands run in the background so you can re-use this command window. Separate sessions are required for the producer and consumer.
Verify registered schema types.
Starting with Confluent Platform 5.5.0, Schema Registry now supports arbitrary schema types. You should verify which schema types are currently registered with Schema Registry.
To do so, type the following command (assuming you use the default URL and port for Schema Registry, localhost:8081
):
curl http://localhost:8081/schemas/types
The response will be one or more of the following. If additional schema format plugins are installed, these will also be available.
["JSON", "PROTOBUF", "AVRO"]
Alternatively, use the curl --silent
flag, and pipe the command through jq (curl --silent http://localhost:8081/schemas/types | jq
) to get nicely formatted output:
"JSON",
"PROTOBUF",
"AVRO"
Use the producer to send JSON Schema records in JSON as the message value.
The new topic, t1-j
, will be created as a part of this producer command if it does not already exist.
kafka-json-schema-console-producer --broker-list localhost:9092 --property schema.registry.url=http://localhost:8081 --topic t1-j \
--property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}}'
Tip
The current JSON Schema specific producer does not show a >
prompt, just a blank line at which to type producer messages.
Type the following command in the shell, and press return.
Keep this session of the producer running.
Use the command line JSON Schema consumer to read the value just produced to this topic.
kafka-json-schema-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic t1-j --property schema.registry.url=http://localhost:8081
You should see following in the console.
Keep this session of the consumer running.
Use the producer to send another record as the message value, which includes a new property not explicitly declared in the schema.
JSON Schema has an open content model, which allows any number of additional properties to appear in a JSON document without being specified in the JSON schema.
This is achieved with additionalProperties
set to true
, which is the default. If you do not explicitly disable additionalProperties
(by setting it to false
),
undeclared properties are allowed in records. These next few steps demonstrate this unique aspect of JSON Schema.
Return to the producer session that is already running and send the following message, which includes a new property "f2"
that is not declared in the schema
with which we started this producer.
Use the command line JSON Schema consumer to read the value just produced.
Return to the consumer that is already running, and reading from topic t1-j
. You should see following new message in the console.
The message with the new property (f2
) is successfully produced and read. If you try this with the other schema formats (Avro, Protobuf),
it will fail at the producer command because those specifications require that all properties be explicitly declared in the schemas.
Keep this consumer running.
Start a producer and pass a JSON Schema with additionalProperties
explicitly set to false
.
Return to the producer command window, and stop the producer with Ctl+C.
Type the following in the shell, and press return. This is the same producer and topic (t1-j
) used in the previous steps. The schema is almost the same as the previous one, but in this example additionalProperties
is explicitly set to false, as a part of the schema.
kafka-json-schema-console-producer --broker-list localhost:9092 --property schema.registry.url=http://localhost:8081 --topic t1-j \
--property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}, "additionalProperties": false}'
Get the top-level compatibility configuration.
curl -X GET http://localhost:8081/config
Example result (this is the default):
{"compatibility":"BACKWARD"}
Update the compatibility requirements globally.
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' \
http://localhost:8081/config
Example result (this is the default):
Tip
If you do not update the compatibility requirements, the following step will fail on a different error than the one being demonstrated here, due to the BACKWARD
compatibility setting.
For more examples of using curl
against the APIs to test and set configurations, see Schema Registry API Usage Examples.
Attempt to use the producer to send another record as the message value, which includes a new property not explicitly declared in the schema.
Enter the following at the prompt for the producer that you just started, and press return.
{"f2": "value3-j-this-will-break"}
With additionalProperties
set to false
, this will fail upon attempt to send and crash the producer, with the following error message.
org.apache.kafka.common.errors.SerializationException: Error serializing JSON message
Caused by: org.apache.kafka.common.errors.SerializationException: JSON {"f2":"value3-j-this-will-break"} does not match schema {"type":"object","properties":{"f1":{"type":"string"}},"additionalProperties":false}
Caused by: org.everit.json.schema.ValidationException: #: extraneous key [f2] is not permitted
...
The consumer will continue running, but no new messages will be displayed.
This is the same behavior you would see by default if using Avro or Protobuf in this scenario.
Tip
If you want to incorporate this behavior into JSON Schema producer code for your applications, include "additionalProperties": false
into the schemas.
Examples of this are shown in the discussion about properties in Understanding JSON Schema.
Rerun the producer in default mode as before and send a follow-on message with an undeclared property.
In the producer command window, stop the producer with Ctl+C.
Run the original producer command. There is no need to explicitly declare additionalProperties
as true (although you could), as this is the default.
kafka-json-schema-console-producer --broker-list localhost:9092 --property schema.registry.url=http://localhost:8081 --topic t1-j \
--property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}}'
Use the producer to send another record as the message value, which includes a new property not explicitly declared in the schema.
{"f2": "value3-j-this-will-work-again"}
Return to the consumer session to read the new message.
The consumer should still be running and reading from topic t1-j
. You will see following new message in the console.
{"f2": "value3-j-this-will-work-again"}
More specifically, if you followed all steps in order and started the consumer with the --from-beginning
flag
as mentioned earlier, the consumer shows a history of all messages sent:
{"f1":"value1-j"}
{"f2":"value2-j"}
{"f2":"value3-j-this-will-work-again"}
In another shell, use this curl command (piped through jq
for readability) to query the schemas that were registered with Schema Registry as versions 1 and 2.
To query version 1 of the schema, type:
curl --silent -X GET http://localhost:8081/subjects/t1-j-value/versions/1/schema | jq
Here is the expected output for version 1:
"type": "object",
"properties": {
"f1": {
"type": "string"
}
Use the same command to query version 2 of the schema.
curl --silent -X GET http://localhost:8081/subjects/t1-j-value/versions/2/schema | jq
Here is the expected output for version 2:
"type": "object",
"properties": {
"f1": {
"type": "string"
}
},
"additionalProperties": false
View the latest version of the schema in more detail by running this command.
curl --silent -X GET http://localhost:8081/subjects/t1-j-value/versions/latest | jq
Here is the expected output of the above command:
"subject": "t1-j-value",
"version": 2,
"id": 2,
"schemaType": "JSON",
"schema": "{\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}},\"additionalProperties\":false}"
Use Confluent Control Center to examine schemas and messages.
Messages that were successfully produced also show on Control Center (http://localhost:9021/)
in Topics > <topicName> > Messages. You may have to select a partition or jump to a timestamp to see messages sent earlier.
(For timestamp, type in a number, which will default to partition 1/Partition: 0
, and press return. To get the message view shown here,
select the cards icon on the upper right.)
Schemas you create are available on the Schemas tab for the selected topic.
Run shutdown and cleanup tasks.
- You can stop the consumer and producer with Ctl-C in their respective command windows.
- To stop Confluent Platform, type
confluent local stop
.
- If you would like to clear out existing data (topics, schemas, and messages) before starting again with another test, type
confluent local destroy
.