内容
1. 引言
在本章中,我们将了解什么是Schema Registry以及为什么我们应该将它与Apache Kafka一起使用。而且,我们将学习Avro Schema演变的概念,并使用Kafka Avro Serializers设置和使用Schema Registry。另外,我们将学习使用Schema Registry的REST接口管理Avro Schemas。
2. 什么是Schema Registry?
Schema Registry是在Confluent公司开发的产品Confluent Open Source中加入的功能,它对Kafka一个比较大的增强,它使得Kafka的数据流必须符合注册的Schema,从而增强了可用性。
Schema Registry为你的元数据提供服务层。它提供了一个RESTful接口,用于存储和检索Avro Schema,它存储所有Schema的版本化历史记录,提供多种兼容性设置,并允许根据配置的兼容性设置演变模式。它提供了插入Kafka客户端的序列化程序,这些客户端处理以Avro格式发送的Kafka消息的Schema存储和检索。
pic
使用Schema Registry时生产者不需要发送整个Schema,只需要发送唯一的Schema ID即可。因此,消费者可以使用这个Schema ID从Schema Registry中查找完整Schema(如果尚未缓存)。这意味着我们不必为每组记录都发送Schema,这样就可以节省时间。
同时,Kafka生产者也创建了一条记录/消息,即Avro记录,这条记录包含一个Schema ID和数据。Schema ID对应的模式需要注册,然后使用Kafka Avro Serializer序列化数据和Schema ID。
3. 为什么需要Schema Registry?
因为消费者的模式(Schema)可能与生产者的模式不同。在定义消费者模式时,消费者期望记录/消息符合模式的要求。那么在执行检查时,如果两个模式不匹配但兼容,则可以通过Avma Schema Evolution和Schema Registry进行有效负载转换。此外,Kafka记录可以有一个键和一个值,两者都可以有一个模式。
4. Kafka Schema Registry的操作
Schema Registry可以为记录的键和值存储Schema。Schema Registry按照Schema的主题(subject)列出Schema,并且能列出一个Schema的所有版本。它还能通过版本号和ID来检索Schema,并获得一个Schema的最新版本。
Schema Registry的兼容级别有向前(Forward),向后(Backword),完全(Full),无(None)。我们可以使用Schema Registry提供的REST API管理来Schema。
5. Schema兼容设置
我们来了解一下所有兼容性级别。向后兼容性(backward compatiblity)是指使用较新的Schema可以读取旧的Schema编写的数据。向前兼容性(forward compatibility)是指使用旧的Schema可以读取使用较新的Schema编写的数据。完全兼容性(full compatibility)是指新版本的Schema同时满足向后兼容和向前兼容。“无(none)”状态,意味着它禁用模式验证,但是不建议这样做。因此,Schema Registry只存储模式,如果我们将级别设置为“none”,它将不会验证兼容性。
Schema Registry配置
我们可以对全局或者每个Schema设置以下列出的兼容性的值:
a. None
这意味着不检查Schema的兼容性。
b. Forward
确保最后一个Schema版本与新Schema向前兼容。
c. Backward
确保新Schema向后兼容最后一个Schema版本。
d. Full
确保新Schema同时满足向前和向后兼容。
6. Schema的演变(Evolution)
如果使用Arvo的旧Schema将数据写入存储后,Avro Schema会变化,那么当我们尝试读取该数据时,Avro可能会进行Schema演变。
从Kafka的角度来看,Schema演变只发生在消费者的反序列化过程中(读取操作)。并且,如果消费者的Schema与生产者的Schema不同,在反序列化期间值或键会自动被修改以便符合消费者的读取Schema。
简而言之,Schema演变是生产者放入Kafka日志的Schema与消费者Schema版本之间的Avro Schema的自动转换。当消费者Schema与用于序列化Kafka记录的生产者Schema不同时,Kafka记录的键或值就被进行数据转换。当然,如果模式匹配,就无需进行转换。
a. Schema演变过程中允许的修改
可以将具有默认值的字段添加到Schema,可以删除具有默认值的字段,还可以更改字段的排序属性。此外,我们可以将字段的默认值更改为其他值,或者给一个没有默认值的字段添加默认值。
可以删除或添加字段别名,但这可能会导致某些依赖别名的消费者无法工作。此外,我们可以将类型(type)更改为包含原始类型的联合(union)。上述更改将导致在使用旧Schema读取时可以使用Avro的Schema演变。
b. 修改Schema的规则
如果想让Schema可以进化,我们必须遵循一些准则。首先,我们需要为Schema中的字段提供默认值,因为这允许我们以后删除这样的字段。然后,永远不要更改字段的数据类型。另外,在向Schema添加新字段时,我们必须为字段提供默认值。最后,请确保不要重命名现有字段(而是使用别名)。
例如:
{"namespace": "com.qidi.phonebook", "type": "record", "name": "Employee", "doc" : "Represents an Employee at a company", "fields": [ {"name": "firstName", "type": "string", "doc": "The persons given name"}, {"name": "nickName", "type": ["null", "string"], "default" : null}, {"name": "lastName", "type": "string"}, {"name": "age", "type": "int", "default": -1}, {"name": "emails", "default":[], "type":{"type": "array", "items": "string"}}, {"name": "phoneNumber", "type": [ "null", { "type": "record", "name": "PhoneNumber", "fields": [ {"name": "areaCode", "type": "string"}, {"name": "countryCode", "type": "string", "default" : ""}, {"name": "prefix", "type": "string"}, {"name": "number", "type": "string"} ] } ] }, {"name":"status", "default" :"SALARY", "type": { "type": "enum", "name": "Status", "symbols" : ["RETIRED", "SALARY", "HOURLY", "PART_TIME"]} } ] }
7. Avro Schema演变的场景
假设在Schema的版本1中,员工记录没有年龄属性,但是现在我们想要添加一个默认值为-1的年龄字段。那么,假设我们有一个消费者使用没有年龄的版本1,和一个生产者使用有年龄的Schema的版本2。
现在,生产者通过使用Employee Schema的版本2,创建了一个Employee记录 并将其中的年龄字段设置为42,然后把这个记录发送到Kafka主题new-Employees中。之后,消费者使用Employee Schema的版本1从new-Employyes主题中读取了新的Employee记录,但是,在反序列化期间年龄字段被删除了因为消费者使用的Schema的版本1 中没有年龄这个字段。
同一个消费者修改一些记录,然后将记录写入一个NoSQL存储,因此,写入NoSQL存储的记录中都会缺少年龄字段。现在,另一个客户端使用有年龄的Schema的版本2,从NoSQL存储中读取记录,结果就是:因为消费者使用版本1写入记录,所以记录中缺少年龄字段,那么客户端读取记录时会将年龄设置为默认值-1。
如果我们添加了年龄并且它不是可选的,即年龄字段没有默认值,Schema Registry可以拒绝这个Schema,并且生产者永远不会将其添加到Kafka日志中。
8. 使用Schema Registry REST API
通过以下操作,Schema Registry运行我们管理Schema:
1. 存储Kafka记录的键和值的Schema
2. 根据主题列出Schema
3. 列出一个Schema的所有版本
4. 根据版本号查找Schema
5. 根据ID查找Schema
6. 获取一个Schema的最新版本
7. 进行兼容性检查
8. 设置全局兼容性级别
所有这些操作都可以通过REST API与Kafka中的Schema Registry一起使用。
例如,我们可以执行以下操作,以便创建新Schema:
a. 创建一个新的Schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"schema": "{\"type\": …}’ \ http://localhost:8081/subjects/Employee/versions
b. 列出所有的Schema
curl -X GET http://localhost:8081/subjects
我们可以通过Schema Registry的REST接口执行上述所有操作,只要你有一个好的HTTP客户端。例如,使用Square的OkHttp客户端(com.squareup.okhttp3:okhttp:3.7.0+):
package com.qidi.test; import okhttp3.*; import java.io.IOException; public class SchemaMain { private final static MediaType SCHEMA_CONTENT = MediaType.parse("application/vnd.schemaregistry.v1+json"); private final static String Employee_SCHEMA = "{\n" + " \"schema\": \"" + " {" + " \\\"namespace\\\": \\\"com.qidi.phonebook\\\"," + " \\\"type\\\": \\\"record\\\"," + " \\\"name\\\": \\\"Employee\\\"," + " \\\"fields\\\": [" + " {\\\"name\\\": \\\"fName\\\", \\\"type\\\": \\\"string\\\"}," + " {\\\"name\\\": \\\"lName\\\", \\\"type\\\": \\\"string\\\"}," + " {\\\"name\\\": \\\"age\\\", \\\"type\\\": \\\"int\\\"}," + " {\\\"name\\\": \\\"phoneNumber\\\", \\\"type\\\": \\\"string\\\"}" + " ]" + " }\"" + "}"; public static void main(String... args) throws IOException { System.out.println(Employee_SCHEMA); final OkHttpClient client = new OkHttpClient(); //POST A NEW SCHEMA Request request = new Request.Builder() .post(RequestBody.create(SCHEMA_CONTENT, Employee_SCHEMA)) .url("http://localhost:8081/subjects/Employee/versions") .build(); String output = client.newCall(request).execute().body().string(); System.out.println(output); //LIST ALL SCHEMAS request = new Request.Builder() .url("http://localhost:8081/subjects") .build(); output = client.newCall(request).execute().body().string(); System.out.println(output); //SHOW ALL VERSIONS OF Employee request = new Request.Builder() .url("http://localhost:8081/subjects/Employee/versions/") .build(); output = client.newCall(request).execute().body().string(); System.out.println(output); //SHOW VERSION 2 OF Employee request = new Request.Builder() .url("http://localhost:8081/subjects/Employee/versions/2") .build(); output = client.newCall(request).execute().body().string(); System.out.println(output); //SHOW THE SCHEMA WITH ID 3 request = new Request.Builder() .url("http://localhost:8081/schemas/ids/3") .build(); output = client.newCall(request).execute().body().string(); System.out.println(output); //SHOW THE LATEST VERSION OF Employee 2 request = new Request.Builder() .url("http://localhost:8081/subjects/Employee/versions/latest") .build(); output = client.newCall(request).execute().body().string(); System.out.println(output); //CHECK IF SCHEMA IS REGISTERED request = new Request.Builder() .post(RequestBody.create(SCHEMA_CONTENT, Employee_SCHEMA)) .url("http://localhost:8081/subjects/Employee") .build(); output = client.newCall(request).execute().body().string(); System.out.println(output); //TEST COMPATIBILITY request = new Request.Builder() .post(RequestBody.create(SCHEMA_CONTENT, Employee_SCHEMA)) .url("http://localhost:8081/compatibility/subjects/Employee/versions/latest") .build(); output = client.newCall(request).execute().body().string(); System.out.println(output); // TOP LEVEL CONFIG request = new Request.Builder() .url("http://localhost:8081/config") .build(); output = client.newCall(request).execute().body().string(); System.out.println(output); // SET TOP LEVEL CONFIG // VALUES are none, backward, forward and full request = new Request.Builder() .put(RequestBody.create(SCHEMA_CONTENT, "{\"compatibility\": \"none\"}")) .url("http://localhost:8081/config") .build(); output = client.newCall(request).execute().body().string(); System.out.println(output); // SET CONFIG FOR Employee // VALUES are none, backward, forward and full request = new Request.Builder() .put(RequestBody.create(SCHEMA_CONTENT, "{\"compatibility\": \"backward\"}")) .url("http://localhost:8081/config/Employee") .build(); output = client.newCall(request).execute().body().string(); System.out.println(output); } }
我们建议运行该示例以尝试强制把不兼容的Schema注册到Schema Registry,并且注意各种兼容性设置的行为。
c. 运行Schema Registry
$ cat ~/tools/confluent-3.2.1/etc/schema-registry/schema-registry.properties listeners=http://0.0.0.0:8081 kafkastore.connection.url=localhost:2181 kafkastore.topic=_schemas debug=false ~/tools/confluent-3.2.1/bin/schema-registry-start ~/tools/confluent-3.2.1/etc/schema-registry/schema-registry.properties
9. 编写消费者和生产者
我们需要启动指向ZooKeeper集群的Schema Registry服务器,然后,我们要将Kafka Avro Serializer和Avro JAR文件导入我们的Gradle项目。之后,我们要配置生产者使用Schema Registry和KafkaAvroSerializer,而且我们要编写消费者并配置它使用Schema Registry和KafkaAvroDeserializer。
以下Gradle构建文件显示了我们需要的Avro JAR文件和其他配置。
plugins { id "com.commercehub.gradle.plugin.avro" version "0.9.0" } group 'qidi' version '1.0-SNAPSHOT' apply plugin: 'java' sourceCompatibility = 1.8 dependencies { compile "org.apache.avro:avro:1.8.1" compile 'com.squareup.okhttp3:okhttp:3.7.0' testCompile 'junit:junit:4.11' compile 'org.apache.kafka:kafka-clients:0.10.2.0' compile 'io.confluent:kafka-avro-serializer:3.2.1' } repositories { jcenter() mavenCentral() maven { url "http://packages.confluent.io/maven/" } } avro { createSetters = false fieldVisibility = "PRIVATE" }
请记住要引用Kafka Avro Serializer lib(io.confluent:kafka-avro-serializer:3.2.1)和Avro lib(org.apache.avro:avro:1.8.1).
a. 生产者
使用Kafka Avro Serialization and Schema Registry的生产者:
package com.qidi.test; import com.qidi.phonebook.Employee; import com.qidi.phonebook.PhoneNumber; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.util.Properties; import java.util.stream.IntStream; public class AvroProducer { private static Producer<Long, Employee> createProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); // Configure the KafkaAvroSerializer. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); // Schema Registry location. props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); return new KafkaProducer<>(props); } private final static String TOPIC = "new-Employees"; public static void main(String... args) { Producer<Long, Employee> producer = createProducer(); Employee bob = Employee.newBuilder().setAge(35) .setFirstName("Bob") .setLastName("Jones") .setPhoneNumber( PhoneNumber.newBuilder() .setAreaCode("301") .setCountryCode("1") .setPrefix("555") .setNumber("1234") .build()) .build(); IntStream.range(1, 100).forEach(index->{ producer.send(new ProducerRecord<>(TOPIC, 1L * index, bob)); }); producer.flush(); producer.close(); } }
确保将Schema Registry和KafkaAvroSerializer配置为生产者设置的一部分。
/
/ Configure the KafkaAvroSerializer. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); // Schema Registry location. props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
b. 消费者
使用Kafka Avro Serialization and Schema Registry的消费者:
package com.qidi.test; import com.qidi.phonebook.Employee; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.LongDeserializer; import java.util.Collections; import java.util.Properties; import java.util.stream.IntStream; public class AvroConsumer { private final static String BOOTSTRAP_SERVERS = "localhost:9092"; private final static String TOPIC = "new-Employee"; private static Consumer<Long, Employee> createConsumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); //Use Kafka Avro Deserializer. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); //<---------------------- //Use Specific Record or else you get Avro GenericRecord. props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true"); //Schema registry location. props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); //<----- Run Schema Registry on 8081 return new KafkaConsumer<>(props); } public static void main(String... args) { final Consumer<Long, Employee> consumer = createConsumer(); consumer.subscribe(Collections.singletonList(TOPIC)); IntStream.range(1, 100).forEach(index -> { final ConsumerRecords<Long, Employee> records = consumer.poll(100); if (records.count() == 0) { System.out.println("None found"); } else records.forEach(record -> { Employee EmployeeRecord = record.value(); System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), EmployeeRecord); }); }); } }
与生产者一样,我们必须告诉消费者在哪里找到Schema Registry,我们必须配置Kafka Avro反序列化类。
//Use Kafka Avro Deserializer. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); //Use Specific Record or else you get Avro GenericRecord. props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true"); //Schema registry location. props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); //<----- Run Schema Registry on 8081
我们需要启动Kafka和ZooKeeper,运行上面的例子:
kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties & kafka/bin/kafka-server-start.sh kafka/config/server.properties