内容
1. 引言
在本章中,我们将学习使用Kafka创建自定义的序列化和反序列化类。此外,我们将了解序列化在Kafka中的工作原理以及为什么需要序列化。同时,我们将用示例来展示Kafka序列化和Kafka反序列化。此外,这个Kafka序列化和反序列化教程为我们提供了Kafka字符串序列化类和Kafka对象序列化类的知识。
Apache Kafka提供了可以轻松发布以及订阅记录流的功能,因此,我们可以灵活地创建自己的自定义的序列化类以及反序列化类,这将有助于使用Kafka传输不同的数据类型。
2. Kafka序列化和反序列化
为了传输而将对象转换为字节流的过程就是我们所说的序列化。Apache Kafka在队列中存储以及传输这些字节数组。
与序列化相反的是反序列化,即将数组的字节转换为我们想要的数据类型。但是Kafka仅为少数数据类型提供序列化和反序列化功能,例如:
-
String
-
Long
-
Double
-
Integer
-
Bytes
3. 为什么需要自定义的序列化和反序列化类?
为了准备从生产者传递到代理的消息,我们使用序列化类,换句话说,在将整个消息传输到代理之前,我们使用序列化类让生产者知道如何将消息转换为字节数组。类似地,为了将字节数组转换回对象,我们使用消费者的反序列化类。
4. Kafka序列化和反序列化的实现
要创建自定义的序列化类,我们需要实现org.apache.kafka.common.serialization.Serializer接口。对于反序列化类,需要实现org.apache.kafka.common.serialization.Deserializer接口。
Kafka的序列化和反序列化接口都有3种方法:
-
configure
在配置启动时,我们调用Configure方法。 -
serialize/deserialize
调用该方法来进行序列化或反序列化。
-
close
用来关闭Kafka会话
5. Serializer接口
public interface Serializer extends Closeable { void configure(Map<String, ?> var1, boolean var2); byte[] serialize(String var1, T var2); void close(); }
6. Deserializer接口
public interface Deserializer extends Closeable { void configure(Map<String, ?> var1, boolean var2); T deserialize(String var1, byte[] var2); void close(); }
7. 示例
依赖的库包括:
-
Kafka (0.10.1.1)
-
FasterXML Jackson (2.8.6)
user.java: public class User { private String firstname; private int age; public User() { } public User(String firstname, int age) { this.firstname = firstname; this.age = age; } public String getfirstName() { return this.firstname; } public int getAge() { return this.age; } @Override public String toString() { return "User(" + firstname + ", " + age + ")"; } } userserializer.java: public class UserSerializer implements Serializer { @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String arg0, User arg1) { byte[] retVal = null; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(arg1).getBytes(); } catch (Exception e) { e.printStackTrace(); } return retVal; } @Override public void close() { } } Userdeserializer.java: public class UserDeserializer implements Deserializer { @Override public void close() { } @Override public void configure(Map<String, ?> arg0, boolean arg1) { } @Override public User deserialize(String arg0, byte[] arg1) { ObjectMapper mapper = new ObjectMapper(); User user = null; try { user = mapper.readValue(arg1, User.class); } catch (Exception e) { e.printStackTrace(); } return user; } }
为了使用上面的序列化类,我们必须使用以下属性来注册它:
props.put("value.serializer", "com.knoldus.serializers.UserSerializer");
那么,生产者的类将会是以下实现:
try (Producer<String, User> producer = new KafkaProducer<>(props)) { producer.send(new ProducerRecord<String, User>("MyTopic", user)); System.out.println("Message " + user.toString() + " sent !!"); } catch (Exception e) { e.printStackTrace(); }
同样,用以下属性来注册反序列化类:
props.put("value.deserializer", "com.knoldus.deserializer.UserDeserializer");
消费者的实现如下:
try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, User> messages = consumer.poll(100); for (ConsumerRecord<String, User> message : messages) { System.out.println("Message received " + message.value().toString()); } } } catch (Exception e) { e.printStackTrace(); }