# kafka-protobuf-image-test **Repository Path**: jiabochao/kafka-protobuf-image-test ## Basic Information - **Project Name**: kafka-protobuf-image-test - **Description**: Kafka Protobuf序列化 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2019-04-13 - **Last Updated**: 2024-02-25 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Kafka Protobuf序列化 ## 安装Google Protobuf - 下载 [https://github.com/protocolbuffers/protobuf/releases](https://github.com/protocolbuffers/protobuf/releases) - 解压 ## 写.proto文件 *Image.proto* ```protobuf syntax = "proto3"; package com.jthinking.test.proto; option java_package = "com.jthinking.test.proto"; option java_outer_classname = "ImageProto"; message Image { string name = 1; int64 size = 2; bytes file_bytes = 3; } ``` > Google Protobuf 官方文档:[https://developers.google.com/protocol-buffers/docs/proto3](https://developers.google.com/protocol-buffers/docs/proto3) ## 生成Java源文件 ```shell protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/Image.proto ``` ## 自定义Kafka序列化类 *ImageProtobufSerializer.java* ```java import com.jthinking.test.proto.ImageProto; import org.apache.kafka.common.serialization.Serializer; import java.util.Map; public class ImageProtobufSerializer implements Serializer { @Override public void configure(Map configs, boolean isKey) { } @Override public byte[] serialize(String topic, ImageProto.Image image) { return image.toByteArray(); } @Override public void close() { } } ``` ## 自定义Kafka反序列化类 *ImageProtobufDeserializer.java* ```shell import com.google.protobuf.InvalidProtocolBufferException; import com.jthinking.test.proto.ImageProto; import org.apache.kafka.common.serialization.Deserializer; import java.util.Map; public class ImageProtobufDeserializer implements Deserializer { @Override public void configure(Map configs, boolean isKey) { } @Override public ImageProto.Image deserialize(String topic, byte[] data) { ImageProto.Image image = null; try { image = ImageProto.Image.parseFrom(data); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } return image; } @Override public void close() { } } ``` ## 测试 ```java import com.google.protobuf.ByteString; import com.jthinking.test.proto.ImageProto; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.WakeupException; import org.junit.Test; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.Properties; /** * Unit test for simple App. */ public class AppTest { @Test public void testProducer() { Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "192.168.1.125:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer", "com.jthinking.test.serializer.ImageProtobufSerializer"); KafkaProducer kafkaProducer = new KafkaProducer<>(kafkaProps); ImageProto.Image.Builder builder = ImageProto.Image.newBuilder(); builder.setName("1555133736931.jpg"); builder.setSize(1000L); try { builder.setFileBytes(ByteString.readFrom(new FileInputStream("C:\\Software\\1555133736931.jpg"))); } catch (IOException e) { e.printStackTrace(); } ProducerRecord producerRecord = new ProducerRecord<>("user-image", "key1", builder.build()); for (int i = 0; i < 1000; i++) { kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); } } }); } } @Test public void testConsumer() { Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.1.125:9092"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "com.jthinking.test.deserializer.ImageProtobufDeserializer"); properties.put("group.id", "kafka-proto-test"); KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Collections.singleton("user-image")); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { kafkaConsumer.wakeup(); } }); try { while (true) { ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { System.out.println(record.topic() + " " + record.partition() + " " + record.offset() + " " + record.key()); ImageProto.Image value = record.value(); FileOutputStream outputStream = new FileOutputStream("C:\\Software\\out.jpg"); value.getFileBytes().writeTo(outputStream); outputStream.close(); System.out.println(value.getName() + " " + value.getSize()); } } } catch (WakeupException e) { } catch (IOException e) { e.printStackTrace(); } finally { kafkaConsumer.close(); } } } ```