geoffwilliams@home:~$

Confluent JSON Schema Evolution

TLDR

In-depth

Confluent JSON Schema evolution behaves differently to “normal” schema evolution. For the nuances, take a look at the official confluent docs and Robert Yokotas blog.

  • This post is a howto guide for developers in a hurry who want to add or remove fields
  • To list the schema IDs, we use this one-liner
  • Assumed technologies: Java, Kafka, Confluent, Schema Registry, Jackson JSON
  • Assumed history: Some message(s) have already been produced
  • Using default compatibility: BACKWARD
  • Note: The schema IDs in this example start at 3 because some other non-related schemas were already registered

Initial POJO

/src/main/java/example/Message.java

package example;
import lombok.Data;

@Data
public class Message{
  private String tranID;
  private String sourceApplication;
  private String targetApplication;
  private String messageType;
  private String timestamp;
  private String messageCreateTime;
}

Adding a field

If you want to add a field, just add it to your Jackson POJO class:

/src/main/java/example/Message.java

+  // a new field that will be mandatory in all new messages
+  private String aMandatoryNewField;
+

And of course set the value in your producer:

/src/main/java/example/Producer.java

+   demoRecord.setAMandatoryNewField("somevalue");

If we already have a message in the topic, after producing a message with this new schema we can inspect the schema IDs for our topic and we will see something like this:

partition: 0 offset: 0 schema id: 3
partition: 0 offset: 1 schema id: 4

The schema was incremented as expected.

Removing a field

Lets remove our field by:

  • commenting it out in POJO and producer
  • produce a new message

This looks like the schema was evolved to remove the field but this is not actually the case. If we inspect the schema IDs for the topic again we see something that might be surprising:

partition: 0 offset: 0 schema id: 3
partition: 0 offset: 1 schema id: 4
partition: 0 offset: 2 schema id: 3

The schema ID has reverted to 3 rather then incrementing to 5. This is because by commenting the new field, Jackson generated a schema that was identical to the previous 3 schema, which is allowed.

Removing a field

Lets try to remove a field that was previously used in schema 3 by commenting it out:

/src/main/java/example/Message.java

-  private String timestamp;
+  //private String timestamp;

/src/main/java/example/Producer.java

-   demoRecord.setTimestamp("now");
+   //demoRecord.setTimestamp("now");

Trying to produce nows gives error: Schema being registered is incompatible with an earlier schema.

We can workaround the error by setting the global schema compatibility mode to NONE.

curl --request PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"compatibility": "NONE"}' --url http://localhost:8081/config/

This can also be done on a per-schema basis.

The schema IDs for the produced messages are now:

partition: 0 offset: 0 schema id: 3
partition: 0 offset: 1 schema id: 4
partition: 0 offset: 2 schema id: 3
partition: 0 offset: 3 schema id: 5

On the surface, this seems to have solved the problem but look what happens when we try to the updated POJO to read messages that were written against the earlier schemas: Error deserializing key/value for partition mytopic-0 at offset 0. If needed, please seek past the record to continue consumption.

We get the error because schema ID 3 contains data that doesnt have a “slot” in our POJO. Under the hood, the Java code is trying to do demoRecord.setTimestamp() which we removed earlier.

Now Robert Yokotas point becomes clear: we are using schemas in a way that violates the open/closed content model. Theres a few ways we can work around this:

Ignore fields that dont map to the POJO

If we really dont care any more about the deleted field, even in our old messages we can update our POJO with the annotation @JsonIgnoreProperties(ignoreUnknown = true). This allows us to harmlessly ignore attempts to set non-existent fields. Our POJO would now look like this:

package example;
import lombok.Data;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Message{
  private String tranID;
  private String sourceApplication;
  private String targetApplication;
  private String messageType;
  //private String timestamp;
  private String messageCreateTime;
}

If your not worried about schema evolution, this might be a workable solution. Otherwise if you want more guardrails around what data is and inst valid read on and lets go back to the drawing board.

Back to the drawing board

Since were in a lab environment we can destroy the entire cluster to purge the old messages, schemas and restore the default compatibility mode BACKWARD.

To allow fields to be removed we need the JSON schema to have an open content model as determined by Confluent Schema Registry. The determination is made based on the additionalProperties JSON schema directive:

  • "additionalProperties": true -> open content model
  • "additionalProperties": false -> closed content model

Generating an open content model with Jackson

To generate an open content model with Jackson, use the annotation:

@JsonSchemaInject(bools = {@JsonSchemaBool(path = "additionalProperties", value = true)})

What happens if we attempt to remove a field from a closed content model?

If we attempt to remove fields from a closed content model schema we will get the error PROPERTY_REMOVED_FROM_CLOSED_CONTENT_MODEL blocking registration of the new schema.

Post comment