Apache Kafka序列化和反序列化

1. 引言

在本章中,我们将学习使用Kafka创建自定义的序列化和反序列化类。此外,我们将了解序列化在Kafka中的工作原理以及为什么需要序列化。同时,我们将用示例来展示Kafka序列化和Kafka反序列化。此外,这个Kafka序列化和反序列化教程为我们提供了Kafka字符串序列化类和Kafka对象序列化类的知识。

Apache Kafka提供了可以轻松发布以及订阅记录流的功能,因此,我们可以灵活地创建自己的自定义的序列化类以及反序列化类,这将有助于使用Kafka传输不同的数据类型。

2. Kafka序列化和反序列化

为了传输而将对象转换为字节流的过程就是我们所说的序列化。Apache Kafka在队列中存储以及传输这些字节数组。

与序列化相反的是反序列化,即将数组的字节转换为我们想要的数据类型。但是Kafka仅为少数数据类型提供序列化和反序列化功能,例如:

  • String

  • Long

  • Double

  • Integer

  • Bytes

3. 为什么需要自定义的序列化和反序列化类?

为了准备从生产者传递到代理的消息,我们使用序列化类,换句话说,在将整个消息传输到代理之前,我们使用序列化类让生产者知道如何将消息转换为字节数组。类似地,为了将字节数组转换回对象,我们使用消费者的反序列化类。

4. Kafka序列化和反序列化的实现

要创建自定义的序列化类,我们需要实现org.apache.kafka.common.serialization.Serializer接口。对于反序列化类,需要实现org.apache.kafka.common.serialization.Deserializer接口。

Kafka的序列化和反序列化接口都有3种方法:

  • configure
    在配置启动时,我们调用Configure方法。

  • serialize/deserialize

    调用该方法来进行序列化或反序列化。

  • close
    用来关闭Kafka会话

5. Serializer接口

public interface Serializer extends Closeable {
 void configure(Map<String, ?> var1, boolean var2);
 byte[] serialize(String var1, T var2);
 void close();
}

6. Deserializer接口

public interface Deserializer extends Closeable {
 void configure(Map<String, ?> var1, boolean var2);
 T deserialize(String var1, byte[] var2);
 void close();
}

7. 示例

依赖的库包括:

  • Kafka (0.10.1.1)

  • FasterXML Jackson (2.8.6)

user.java:
public class User {
 private String firstname;
 private int age;
 public User() {
 }
 public User(String firstname, int age) {
   this.firstname = firstname;
   this.age = age;
 }
 public String getfirstName() {
   return this.firstname;
 }
 public int getAge() {
   return this.age;
 }
 @Override public String toString() {
   return "User(" + firstname + ", " + age + ")";
 }
}
 
userserializer.java:
public class UserSerializer implements Serializer {
 @Override public void configure(Map<String, ?> map, boolean b) {
 }
 @Override public byte[] serialize(String arg0, User arg1) {
   byte[] retVal = null;
   ObjectMapper objectMapper = new ObjectMapper();
   try {
     retVal = objectMapper.writeValueAsString(arg1).getBytes();
   } catch (Exception e) {
     e.printStackTrace();
   }
   return retVal;
 }
 @Override public void close() {
 }
}
 
Userdeserializer.java:
public class UserDeserializer implements Deserializer {
 @Override public void close() {
 }
 @Override public void configure(Map<String, ?> arg0, boolean arg1) {
 }
 @Override
 public User deserialize(String arg0, byte[] arg1) {
   ObjectMapper mapper = new ObjectMapper();
   User user = null;
   try {
     user = mapper.readValue(arg1, User.class);
   } catch (Exception e) {
     e.printStackTrace();
   }
   return user;
 }
}

为了使用上面的序列化类,我们必须使用以下属性来注册它:

props.put("value.serializer", "com.knoldus.serializers.UserSerializer");

那么,生产者的类将会是以下实现:

try (Producer<String, User> producer = new KafkaProducer<>(props)) {
  producer.send(new ProducerRecord<String, User>("MyTopic", user));
  System.out.println("Message " + user.toString() + " sent !!");
} catch (Exception e) {
  e.printStackTrace();
}

同样,用以下属性来注册反序列化类:

props.put("value.deserializer", "com.knoldus.deserializer.UserDeserializer");

消费者的实现如下:

try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) {
   consumer.subscribe(Collections.singletonList(topic));
   while (true) {
       ConsumerRecords<String, User> messages = consumer.poll(100);
       for (ConsumerRecord<String, User> message : messages) {
         System.out.println("Message received " + message.value().toString());
       }
   }
} catch (Exception e) {
   e.printStackTrace();
}

Apache Kafka序列化和反序列化

发表评论

邮箱地址不会被公开。 必填项已用*标注

− 8 = 一

滚动到顶部