Apache Kafka Schema Registry

1. 引言

在本章中,我们将了解什么是Schema Registry以及为什么我们应该将它与Apache Kafka一起使用。而且,我们将学习Avro Schema演变的概念,并使用Kafka Avro Serializers设置和使用Schema Registry。另外,我们将学习使用Schema RegistryREST接口管理Avro Schemas

2. 什么是Schema Registry

Schema Registry是在Confluent公司开发的产品Confluent Open Source中加入的功能,它Kafka一个比较大的增强,它使得Kafka的数据流必须符合注册的Schema,从而增强了可用性。

Schema Registry为你的元数据提供服务层。它提供了一个RESTful接口,用于存储和检索Avro Schema,它存储所有Schema的版本化历史记录,提供多种兼容性设置,并允许根据配置的兼容性设置演变模式。它提供了插入Kafka客户端的序列化程序,这些客户端处理以Avro格式发送的Kafka消息的Schema存储和检索。

pic

使用Schema Registry时生产者不需要发送整个Schema,只需要发送唯一的Schema ID即可。因此,消费者可以使用这个Schema ID从Schema Registry中查找完整Schema(如果尚未缓存)。这意味着我们不必为每组记录都发送Schema,这样就可以节省时间。

同时,Kafka生产者也创建了一条记录/消息,即Avro记录,这条记录包含一个Schema  ID和数据。Schema ID对应的模式需要注册,然后使用Kafka Avro Serializer序列化数据和Schema ID

3. 为什么需要Schema Registry?

因为消费者的模式(Schema)可能与生产者的模式不同。在定义消费者模式时,消费者期望记录/消息符合模式的要求。那么在执行检查时,如果两个模式不匹配但兼容,则可以通过Avma Schema EvolutionSchema Registry进行有效负载转换。此外,Kafka记录可以有一个键和一个值,两者都可以有一个模式。

4. Kafka Schema Registry的操作

Schema Registry可以为记录的键和值存储SchemaSchema Registry按照Schema的主题(subject)列出Schema,并且能列出一个Schema的所有版本。它还能通过版本号和ID来检索Schema,并获得一个Schema的最新版本。

Schema Registry的兼容级别有向前(Forward),向后(Backword),完全(Full),无(None)。我们可以使用Schema Registry提供的REST API管理来Schema

5. Schema兼容设置

我们来了解一下所有兼容性级别。向后兼容性(backward compatiblity)是指使用较新的Schema可以读取旧的Schema编写的数据。向前兼容性(forward compatibility)是指使用旧的Schema可以读取使用较新的Schema编写的数据。完全兼容性(full compatibility)是指新版本的Schema同时满足向后兼容和向前兼容。“无(none)”状态,意味着它禁用模式验证,但是不建议这样做。因此,Schema Registry只存储模式,如果我们将级别设置为“none”,它将不会验证兼容性。

Schema Registry配置

我们可以对全局或者每个Schema设置以下列出的兼容性的值:

a. None

这意味着不检查Schema的兼容性。

b. Forward

确保最后一个Schema版本与新Schema向前兼容。

c. Backward

确保新Schema向后兼容最后一个Schema版本。

d. Full

确保新Schema同时满足向前和向后兼容。

6. Schema的演变(Evolution

如果使用Arvo的旧Schema将数据写入存储后,Avro Schema会变化,那么当我们尝试读取该数据时,Avro可能会进行Schema演变。

Kafka的角度来看,Schema演变只发生在消费者的反序列化过程中(读取操作)。并且,如果消费者的Schema与生产者的Schema不同,在反序列化期间值或键会自动被修改以便符合消费者的读取Schema

简而言之,Schema演变是生产者放入Kafka日志的Schema与消费者Schema版本之间的Avro Schema的自动转换。当消费者Schema与用于序列化Kafka记录的生产者Schema不同时,Kafka记录的键或值就被进行数据转换。当然,如果模式匹配,就无需进行转换。

a. Schema演变过程中允许的修改

可以将具有默认值的字段添加到Schema,可以删除具有默认值的字段,还可以更改字段的排序属性。此外,我们可以将字段的默认值更改为其他值,或者给一个没有默认值的字段添加默认值。

可以删除或添加字段别名,但这可能会导致某些依赖别名的消费者无法工作。此外,我们可以将类型(type)更改为包含原始类型的联合(union)。上述更改将导致在使用旧Schema读取时可以使用AvroSchema演变。

b. 修改Schema的规则

如果想让Schema可以进化,我们必须遵循一些准则。首先,我们需要为Schema中的字段提供默认值,因为这允许我们以后删除这样的字段。然后,永远不要更改字段的数据类型。另外,在向Schema添加新字段时,我们必须为字段提供默认值。最后,请确保不要重命名现有字段(而是使用别名)。

例如:

{"namespace": "com.qidi.phonebook",
 "type": "record",
 "name": "Employee",
 "doc" : "Represents an Employee at a company",
 "fields": [
   {"name": "firstName", "type": "string", "doc": "The persons given name"},
   {"name": "nickName", "type": ["null", "string"], "default" : null},
   {"name": "lastName", "type": "string"},
   {"name": "age",  "type": "int", "default": -1},
   {"name": "emails", "default":[], "type":{"type": "array", "items": "string"}},
   {"name": "phoneNumber",  "type":
   [ "null",
     { "type": "record",   "name": "PhoneNumber",
       "fields": [
         {"name": "areaCode", "type": "string"},
         {"name": "countryCode", "type": "string", "default" : ""},
         {"name": "prefix", "type": "string"},
         {"name": "number", "type": "string"}
       ]
     }
   ]
   },
   {"name":"status", "default" :"SALARY", "type": { "type": "enum", "name": "Status",
     "symbols" : ["RETIRED", "SALARY", "HOURLY", "PART_TIME"]}
   }
 ]
}

7. Avro Schema演变的场景

假设在Schema的版本1中,员工记录没有年龄属性,但是现在我们想要添加一个默认值为-1的年龄字段。那么,假设我们有一个消费者使用没有年龄的版本1,和一个生产者使用有年龄的Schema的版本2

现在,生产者通过使用Employee Schema的版本2,创建了一个Employee记录 并将其中的年龄字段设置为42,然后把这个记录发送到Kafka主题new-Employees中。之后,消费者使用Employee Schema的版本1new-Employyes主题中读取了新的Employee记录,但是,在反序列化期间年龄字段被删除了因为消费者使用的Schema的版本1 中没有年龄这个字段。

同一个消费者修改一些记录,然后将记录写入一个NoSQL存储,因此,写入NoSQL存储的记录中都会缺少年龄字段。现在,另一个客户端使用有年龄的Schema的版本2,从NoSQL存储中读取记录,结果就是:因为消费者使用版本1写入记录,所以记录中缺少年龄字段,那么客户端读取记录时会将年龄设置为默认值-1

如果我们添加了年龄并且它不是可选的,即年龄字段没有默认值,Schema Registry可以拒绝这个Schema,并且生产者永远不会将其添加到Kafka日志中。

8. 使用Schema Registry REST API

通过以下操作,Schema Registry运行我们管理Schema

1. 存储Kafka记录的键和值的Schema

2. 根据主题列出Schema

3. 列出一个Schema的所有版本

4. 根据版本号查找Schema

5. 根据ID查找Schema

6. 获取一个Schema的最新版本

7. 进行兼容性检查

8. 设置全局兼容性级别

所有这些操作都可以通过REST APIKafka中的Schema Registry一起使用。

例如,我们可以执行以下操作,以便创建新Schema

a. 创建一个新的Schema 

curl -X POST -H "Content-Type:
application/vnd.schemaregistry.v1+json" \
   --data '{"schema": "{\"type\": …}’ \
   http://localhost:8081/subjects/Employee/versions

b. 列出所有的Schema

curl -X GET http://localhost:8081/subjects

我们可以通过Schema RegistryREST接口执行上述所有操作,只要你有一个好的HTTP客户端。例如,使用SquareOkHttp客户端(com.squareup.okhttp3:okhttp:3.7.0+:

package com.qidi.test;
import okhttp3.*;
import java.io.IOException;
public class SchemaMain {
   private final static MediaType SCHEMA_CONTENT =
           MediaType.parse("application/vnd.schemaregistry.v1+json");
   private final static String Employee_SCHEMA = "{\n" +
           " \"schema\": \"" +
           " {" +
           " \\\"namespace\\\": \\\"com.qidi.phonebook\\\"," +
           " \\\"type\\\": \\\"record\\\"," +
           " \\\"name\\\": \\\"Employee\\\"," +
           " \\\"fields\\\": [" +
           " {\\\"name\\\": \\\"fName\\\", \\\"type\\\": \\\"string\\\"}," +
           " {\\\"name\\\": \\\"lName\\\", \\\"type\\\": \\\"string\\\"}," +
           " {\\\"name\\\": \\\"age\\\",  \\\"type\\\": \\\"int\\\"}," +
           " {\\\"name\\\": \\\"phoneNumber\\\",  \\\"type\\\": \\\"string\\\"}" +
           " ]" +
           " }\"" +
           "}";
   public static void main(String... args) throws IOException {
       System.out.println(Employee_SCHEMA);
       final OkHttpClient client = new OkHttpClient();
       //POST A NEW SCHEMA
       Request request = new Request.Builder()
               .post(RequestBody.create(SCHEMA_CONTENT, Employee_SCHEMA))
               .url("http://localhost:8081/subjects/Employee/versions")
               .build();
       String output = client.newCall(request).execute().body().string();
       System.out.println(output);
       //LIST ALL SCHEMAS
       request = new Request.Builder()
               .url("http://localhost:8081/subjects")
               .build();
       output = client.newCall(request).execute().body().string();
       System.out.println(output);
       //SHOW ALL VERSIONS OF Employee
       request = new Request.Builder()
               .url("http://localhost:8081/subjects/Employee/versions/")
               .build();
       output = client.newCall(request).execute().body().string();
       System.out.println(output);
       //SHOW VERSION 2 OF Employee
       request = new Request.Builder()
               .url("http://localhost:8081/subjects/Employee/versions/2")
               .build();
       output = client.newCall(request).execute().body().string();
       System.out.println(output);
       //SHOW THE SCHEMA WITH ID 3
       request = new Request.Builder()
               .url("http://localhost:8081/schemas/ids/3")
               .build();
       output = client.newCall(request).execute().body().string();
       System.out.println(output);
       //SHOW THE LATEST VERSION OF Employee 2
       request = new Request.Builder()
               .url("http://localhost:8081/subjects/Employee/versions/latest")
               .build();
       output = client.newCall(request).execute().body().string();
       System.out.println(output);
       //CHECK IF SCHEMA IS REGISTERED
       request = new Request.Builder()
               .post(RequestBody.create(SCHEMA_CONTENT, Employee_SCHEMA))
               .url("http://localhost:8081/subjects/Employee")
               .build();
       output = client.newCall(request).execute().body().string();
       System.out.println(output);
       //TEST COMPATIBILITY
       request = new Request.Builder()
               .post(RequestBody.create(SCHEMA_CONTENT, Employee_SCHEMA))
               .url("http://localhost:8081/compatibility/subjects/Employee/versions/latest")
               .build();
       output = client.newCall(request).execute().body().string();
       System.out.println(output);
       // TOP LEVEL CONFIG
       request = new Request.Builder()
               .url("http://localhost:8081/config")
               .build();
       output = client.newCall(request).execute().body().string();
       System.out.println(output);
       // SET TOP LEVEL CONFIG
       // VALUES are none, backward, forward and full
       request = new Request.Builder()
               .put(RequestBody.create(SCHEMA_CONTENT, "{\"compatibility\": \"none\"}"))
               .url("http://localhost:8081/config")
               .build();
       output = client.newCall(request).execute().body().string();
       System.out.println(output);
       // SET CONFIG FOR Employee
       // VALUES are none, backward, forward and full
       request = new Request.Builder()
               .put(RequestBody.create(SCHEMA_CONTENT, "{\"compatibility\": \"backward\"}"))
               .url("http://localhost:8081/config/Employee")
              .build();
       output = client.newCall(request).execute().body().string();
      System.out.println(output);
   }
}

 我们建议运行该示例以尝试强制把不兼容的Schema注册到Schema Registry,并且注意各种兼容性设置的行为。

 c. 运行Schema Registry

$ cat ~/tools/confluent-3.2.1/etc/schema-registry/schema-registry.properties
listeners=http://0.0.0.0:8081
kafkastore.connection.url=localhost:2181
kafkastore.topic=_schemas
debug=false
~/tools/confluent-3.2.1/bin/schema-registry-start  ~/tools/confluent-3.2.1/etc/schema-registry/schema-registry.properties

9. 编写消费者和生产者

我们需要启动指向ZooKeeper集群的Schema Registry服务器,然后,我们要将Kafka Avro SerializerAvro JAR文件导入我们的Gradle项目。之后,我们要配置生产者使用Schema RegistryKafkaAvroSerializer,而且我们要编写消费者并配置它使用Schema RegistryKafkaAvroDeserializer

以下Gradle构建文件显示了我们需要的Avro JAR文件和其他配置。

plugins {
   id "com.commercehub.gradle.plugin.avro" version "0.9.0"
}
group 'qidi'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8
dependencies {
   compile "org.apache.avro:avro:1.8.1"
   compile 'com.squareup.okhttp3:okhttp:3.7.0'
   testCompile 'junit:junit:4.11'
   compile 'org.apache.kafka:kafka-clients:0.10.2.0'
   compile 'io.confluent:kafka-avro-serializer:3.2.1'
}
repositories {
   jcenter()
   mavenCentral()
   maven {
       url "http://packages.confluent.io/maven/"
   }
}
avro {
   createSetters = false
   fieldVisibility = "PRIVATE"
}

请记住要引用Kafka Avro Serializer libio.confluent:kafka-avro-serializer:3.2.1)和Avro liborg.apache.avro:avro:1.8.1.

a. 生产者

使用Kafka Avro Serialization and Schema Registry的生产者:

package com.qidi.test;
import com.qidi.phonebook.Employee;
import com.qidi.phonebook.PhoneNumber;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
import java.util.stream.IntStream;
public class AvroProducer {
   private static Producer<Long, Employee> createProducer() {
       Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer");
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
               LongSerializer.class.getName());
       // Configure the KafkaAvroSerializer.
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
               KafkaAvroSerializer.class.getName());
       // Schema Registry location.
       props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
               "http://localhost:8081");
       return new KafkaProducer<>(props);
   }
   private final static String TOPIC = "new-Employees";
   public static void main(String... args) {
       Producer<Long, Employee> producer = createProducer();
        Employee bob = Employee.newBuilder().setAge(35)
               .setFirstName("Bob")
               .setLastName("Jones")
               .setPhoneNumber(
                       PhoneNumber.newBuilder()
                               .setAreaCode("301")
                               .setCountryCode("1")
                               .setPrefix("555")
                               .setNumber("1234")
                              .build())
               .build();
       IntStream.range(1, 100).forEach(index->{
           producer.send(new ProducerRecord<>(TOPIC, 1L * index, bob));
       });
       producer.flush();
       producer.close();
   }
}

确保将Schema RegistryKafkaAvroSerializer配置为生产者设置的一部分。

/

/ Configure the KafkaAvroSerializer.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
               KafkaAvroSerializer.class.getName());
// Schema Registry location.
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
               "http://localhost:8081");

b. 消费者

使用Kafka Avro Serialization and Schema Registry的消费者:

package com.qidi.test;
import com.qidi.phonebook.Employee;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;
public class AvroConsumer {
   private final static String BOOTSTRAP_SERVERS = "localhost:9092";
   private final static String TOPIC = "new-Employee";
   private static Consumer<Long, Employee> createConsumer() {
       Properties props = new Properties();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
       props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
               LongDeserializer.class.getName());
       //Use Kafka Avro Deserializer.
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
               KafkaAvroDeserializer.class.getName());  //<----------------------
       //Use Specific Record or else you get Avro GenericRecord.
       props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
       //Schema registry location.
       props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
               "http://localhost:8081"); //<----- Run Schema Registry on 8081
       return new KafkaConsumer<>(props);
   }
   public static void main(String... args) {
       final Consumer<Long, Employee> consumer = createConsumer();
       consumer.subscribe(Collections.singletonList(TOPIC));
       IntStream.range(1, 100).forEach(index -> {
           final ConsumerRecords<Long, Employee> records =
                   consumer.poll(100);
           if (records.count() == 0) {
               System.out.println("None found");
           } else records.forEach(record -> {
               Employee EmployeeRecord = record.value();
               System.out.printf("%s %d %d %s \n", record.topic(),
                       record.partition(), record.offset(), EmployeeRecord);
           });
       });
   }
}

与生产者一样,我们必须告诉消费者在哪里找到Schema Registry,我们必须配置Kafka Avro反序列化类。

//Use Kafka Avro Deserializer.
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
               KafkaAvroDeserializer.class.getName());  
//Use Specific Record or else you get Avro GenericRecord.
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
//Schema registry location.        
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
               "http://localhost:8081"); //<----- Run Schema Registry on 8081

我们需要启动KafkaZooKeeper,运行上面的例子:

kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties &
kafka/bin/kafka-server-start.sh kafka/config/server.properties
Apache Kafka Schema Registry

发表评论

邮箱地址不会被公开。 必填项已用*标注

四 × = 16

滚动到顶部