【消息队列】 一文搞懂 Kafka
一、为什么要使用消息队列1、生活中收快递 2、下单功能:同步 ①问题1:耦合度高②问题2:响应时间长③问题3:并发压力传递④问题4:系统结构弹性不足3、下单功能:异步 ①好处1:功能解耦②好处2:快速响应③好处3:异步削峰限流削峰填谷:
④好处4:系统结构弹性大,易于扩展二、什么是消息队列1、概念 消息队列是实现应用程序和应用程序之间通信的中间件产品
2、消息队列底层实现的两大主流方式 由于消息队列执行的是跨应用的信息传递,所以制定底层通信标准非常必要目前主流的
消息队列通信协议标准包括:
AMQP (Advanced Message Queuing Protocol):通用协议,IBM公司研发JMS (Java Message Service):专门为Java语言服务,SUN公司研发,一组由Java接口组成的Java标准对比:
3、主流消息队列产品
RabbitMQActiveMQRocketMQKafka研发团队Rabbit(公司)Apache(社区)阿里(公司)Apache(社区)开发语言ErlangJavaJavaScala&Java核心机制基于AMQP的消息队列模型使用生产者-消费者模式,将消息发布到队列中,然后被消费者订阅和处理基于JMS的消息传递模型支持点对点模型和发布-订阅模型分布式的消息队列模型采用主题(Topic)和标签(Tag)的方式进行消息的分类和过滤分布式流平台,通过发布-订阅模型进行高吞吐量的消息处理协议支持XMPP
STOMP
SMTPXMPP
STOMP
OpenWireREST自定义协议自定义协议社区封装了HTTP协议支持客户端支持语言官方支持Erlang、Java、Ruby等社区产出多种API,几乎支持所有语言Java
C/C++
Python
PHP
Perl.NET等Java
C++不成熟官方支持Java社区产出多种API,如PHP、Python等可用性镜像队列主从复制主从复制分区和副本单机吞吐量每秒十万左右级别每秒数万级每秒十万+级(双十一)每秒百万级消息延迟微秒级毫秒级毫秒级毫秒以内消息确认完整的消息确认机制
内置消息表,消息保存到数据库实现持久化
功能特性并发能力强,性能极好,延时低,社区活跃,管理界面丰富老牌产品成熟度高文档丰富MQ功能比较完备扩展性佳只支持主要的MQ功能毕竟是专门为大数据领域服务的三、Kafka介绍1、Kafka是什么 Kafka是Apache开源的一款基于zookeeper协调的分布式消息系统,具有高吞吐率、高性能、实时、高可靠等特点,可实时处理流式数据。它最初由LinkedIn公司开发,使用Scala语言编写。
Kafka历经数年的发展,从最初纯粹的消息引擎,到近几年开始在流处理平台生态圈发力,多个组织或公司发布了各种不同特性的产品。常见产品如下:
Apache Kafka :最“正统”的Kafka也是开源版,它是后面其他所有发行版的基础。Cloudera/Hortonworks Kafka :集成了目前主流的大数据框架,能够帮助用户实现从分布式存储、集群调度、流处理到机器学习、实时数据库等全方位的数据处理。Confluent Kafka :主要提供基于Kafka的企业级流处理解决方案。Apache Kafka,它现在依然是开发人数最多、版本迭代速度最快的Kafka。我们使用此产品学习。Apache 目前为止总共演进了8个大版本,分别是0.7、0.8、0.9、0.11、1.0、2.0和3.0,我们选择3.5.1版本讲解(截止2023.8)。
2、Kafka的特点 高吞吐量、低延迟:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息,它的延迟最低只有几毫秒
持久性:支持消息持久化,即使数TB级别的消息也能够保持长时间的稳定性能。
可靠性:支持数据备份防止丢失
容错性:支持通过Kafka服务器和消费机集群来分区消息,允许集群中的节点失败(若分区副本数量为n,则允许n-1个节点失败)
高并发:单机可支持数千个客户端同时读写,支持在线水平扩展。可无缝对接hadoop、strom、spark等,支持Hadoop并行数据加载,
3、Kafka官网地址 kafka官网https://kafka.apache.org/kafka下载https://kafka.apache.org/downloads4、Kafka应用场景 ID设计目标功能1日志收集一个公司用Kafka可以收集各种服务的Log,通过Kafka以统一接口服务的方式开放给各种Consumer2消息系统解耦生产者和消费者、缓存消息等3用户活动跟踪用来记录Web用户或者APP用户的各种活动,如网页搜索、搜索、点击,用户数据收集然后进行用户行为分析。4运营指标Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告5流式处理比如Spark Streaming和Storm四、Kafka内部结构一、Producer 生产者:消息发送端
二、Consumer 消费者:消息接收端
三、broker 一个Kafka服务器实例,在Kafka集群中会有多个broker实例
image-20231120164659376.png四、Topic Topic中文意思是主题,在Kafka中只是一个逻辑概念,代表某一类消息。
结合具体项目中的业务功能,我们可以为每一个具体功能创建一个Topic。
image-20231120171117940.png五、Partition Partition就是分区,为什么要分区?
有了分区就可以把消息数据分散到不同broker上保存。
image-20231120173244763.png六、Replication 数据分区之后有一个问题:每个broker上保存一部分数据,如果某个broker宕机,那么数据就会不完整。
所以Kafka允许分区创建副本
image-20231121094903473.png七、主从 当分区存在副本时,就会区分Leader、Follower:
Leader:主分片,负责接收生产者端发送过来的消息,对接消费者端消费消息Follower:不和生产者、消费者交互,仅负责和Leader同步数据image-20231121095223337.pngimage-20231121095514411.png创建Topic时通过“分区数”指定Partition的数量,通过“复制因子”指定副本数量分区数和复制因子都不能为0分区数为1,复制因子为1表示:1个Partition内有1个Leader(此时数据只有一份,没有冗余的副本,生产环境不建议)复制因子为2表示每个Partition中包含1个Leader和1个Follower八、注册 Kafka工作过程中,broker、Partition……信息都需要在Zookeeper中注册
image-20231121095754757.png五、图形化界面软件Eagle一、创建Docker容器 #搜索镜像
dockersearchefak
#创建容器
dockerrun-d--namekafka-eagle\
-p8048:8048\
-eEFAK_CLUSTER_ZK_LIST="192.168.200.100:2181"\
nickzurich/efak:latest
提示:如果无法启动,往往是因为Zookeeper所需内存不足,可以试着把Zookeeper内存调整为更大的值再试。
二、使用 1、访问地址http://192.168.200.100:8048
默认登录信息:
账号:admin密码:1234562、查看broker列表image-20231121144103735.png3、主题相关操作①新建image-20231121144410912.pngimage-20231121144448590.png注意:Kafka集群中broker实例的数量需要大于等于复制因子(Replication factor),如果复制因子大于broker实例数量,那么就会看到下面保存信息——
image-20231121144724363.png②查看主题列表image-20231121150053400.png③查看主题详情点击主题名称查看详情:
image-20231121150129425.png4、查看分区中的消息image-20231121150926746.pngimage-20231121151133422.png六、客户端原生API一、生产者 1、创建主题kafka-topics.sh--bootstrap-server192.168.200.100:9092--create--topictopic-java-client
2、启动消费者监听主题kafka-console-consumer.sh--bootstrap-server192.168.200.100:9092--topictopic-java-client
3、引入依赖!--kafka-clients2023.10--
dependency
groupIdorg.apache.kafka/groupId
artifactIdkafka-clients/artifactId
version3.6.0/version
/dependency
4、Java程序importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.ProducerConfig;
importorg.apache.kafka.clients.producer.ProducerRecord;
importjava.util.Properties;
publicclassMyProducerDemo
{
publicstaticfinalStringTOPIC_NAME="topic-java-client";
publicstaticvoidmain(String[]args)
{
//1.创建Kafka生产者的配置对象
Propertiesproperties=newProperties();
//2.给Kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.100:9092");
//key,value序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//3.创建Kafka生产者对象
KafkaProducerkafkaProducer=newKafkaProducer(properties);
//4.调用send方法,发送消息
for(inti=0;i5;i++){
kafkaProducer.send(newProducerRecord(TOPIC_NAME,"hello-kafka-from-java-client~"+
}
System.out.println("----MyProducerDemo发送完毕");
//5.关闭资源
kafkaProducer.close();
}
}
ProducerRecord参数说明:
publicclassProducerRecordK,V{
//主题名称,必选参数
privatefinalStringtopic;
//分区号,大于等于0的整数,可选参数。
privatefinalIntegerpartition;
//消息的头信息,类型是RecordHeaders,可选属性。
privatefinalHeadersheaders;
//键,可选参数。
privatefinalK
//消息内容,必选参数。
privatefinalVvalue;
//每条消息都有一个时间戳,可选参数
privatefinalLongtimest
}
5、send()方法返回值KafkaProducer的send()方法返回Future类型的对象,可以调用Future的get()方法同步获取任务执行结果。
此时程序就成了前一个消息发送完成再发送后一个的同步模式。
也就是说不调用get()方法就是异步模式。
//同步
for(inti=0;i5;i++){
//发送消息的任务交给子线程去做
Futurefuture=kafkaProducer.send(newProducerRecord(TOPIC_NAME,"hello-kafka-from-java-client~~~"+
TimeUnit.SECONDS.sleep(1);
//但是因为调用了get()方法,就变成子线程必须执行完发送消息的任务
//for循环的本次循环体才算执行完,才能继续执行下一次循环
//下一次循环就是发送下一条消息
future.get();
}
6、获取消息发送结果给KafkaProducer的send()方法再传入一个CallBack类型的参数,以异步回调的方式获取消息发送结果,从而得知消息发送是成功还是失败。
①Java代码kafkaProducer.send(newProducerRecord(TOPIC_NAME,"hello-kafka-from-java-client*******"),newCallback(){
//onCompletion()方法在发送消息操作完成时被调用
//参数RecordMetadatarecordMetadata:发送消息相关的元数据
//参数Exceptione:发送消息失败时,失败原因封装的异常信息
@Override
publicvoidonCompletion(RecordMetadatarecordMetadata,Exceptione){
if(e==null){
longoffset=recordMetadata.offset();
System.out.println("offset="+offset);
intpartition=recordMetadata.partition();
System.out.println("partition="+partition);
longtimestamp=recordMetadata.timestamp();
System.out.println("timestamp="+timestamp);
Stringtopic=recordMetadata.topic();
System.out.println("topic="+topic);
}else{
System.out.println("e="+
}
}
});
②失败情况举例把broker地址改成错的:
e = org.apache.kafka.common.errors.TimeoutException: Topic topic-java-client not present in metadata after 60000 ms.
二、消费者 importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.apache.kafka.clients.consumer.ConsumerRecords;
importorg.apache.kafka.clients.consumer.KafkaConsumer;
importjava.time.Duration;
importjava.util.Arrays;
importjava.util.Properties;
importjava.util.concurrent.TimeUnit;
publicclassMyConsumerDemo
{
publicstaticfinalStringTOPIC_NAME="topic-java-client";
publicstaticvoidmain(String[]args)throwsInterruptedException
{
//1、创建Kafka消费者的配置对象
Propertiesproperties=newProperties();
//2、给Kafka配置对象添加配置信息:bootstrap.servers
properties.put("bootstrap.servers","192.168.200.100:9092");
properties.setProperty("group.id","test");
properties.setProperty("enable.auto.commit","true");
properties.setProperty("auto.commit.interval.ms","1000");
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//3、创建消费者对象
KafkaConsumerString,Stringconsumer=newKafkaConsumer(properties);
//4、订阅指定主题
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//5、从broker拉取信息
ConsumerRecordsString,Stringrecords=consumer.poll(Duration.ofMillis(100));
for(ConsumerRecordString,Stringrecord:records)
System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());
//6、每隔1秒做一次打印,让消费端程序持续运行
TimeUnit.SECONDS.sleep(1);
System.out.println("....进行中");
}
}
}
七、Kafka集群一、集群搭建 1、重要原则Kafka节点只要注册到同一个Zookeeper上就代表它们是同一个集群的Kafka通过broker.id来区分集群中的不同节点2、规划简单起见,我们只使用一个VMWare虚拟机,所以各个broker实例需要设定不同端口号Kafka程序不需要复制,对应各自不同的配置文件启动多个进程就能组成集群Zookeeper还是使用原来的2181即可
端口号配置文件日志目录实例017000/opt/k-cluster/server7000.properties/opt/k-cluster/log7000实例028000/opt/k-cluster/server8000.properties/opt/k-cluster/log8000实例039000/opt/k-cluster/server9000.properties/opt/k-cluster/log90003、具体操作①创建目录mkdir-p/opt/k-cluster/log7000
mkdir-p/opt/k-cluster/log8000
mkdir-p/opt/k-cluster/log9000
②复制配置文件cp/opt/kafka_2.13-3.6.0/config/server.properties/opt/k-cluster/server7000.properties
cp/opt/kafka_2.13-3.6.0/config/server.properties/opt/k-cluster/server8000.properties
cp/opt/kafka_2.13-3.6.0/config/server.properties/opt/k-cluster/server9000.properties
③修改配置文件[1]7000broker.id=1
listeners=PLAINTEXT://192.168.200.100:7000
advertised.listeners=PLAINTEXT://192.168.200.100:7000
log.dirs=/opt/k-cluster/log7000
[2]8000broker.id=2
listeners=PLAINTEXT://192.168.200.100:8000
advertised.listeners=PLAINTEXT://192.168.200.100:8000
log.dirs=/opt/k-cluster/log8000
[3]9000broker.id=3
listeners=PLAINTEXT://192.168.200.100:9000
advertised.listeners=PLAINTEXT://192.168.200.100:9000
log.dirs=/opt/k-cluster/log9000
4、启动集群各实例注意:此前需要先启动Zookeeper
kafka-server-start.sh-daemon/opt/k-cluster/server7000.properties
kafka-server-start.sh-daemon/opt/k-cluster/server8000.properties
kafka-server-start.sh-daemon/opt/k-cluster/server9000.properties
验证各个端口号:
lsof-i:2181
lsof-i:7000
lsof-i:8000
lsof-i:9000
如果因为内存不足而启动失败,可以修改对应启动脚本程序中的内存大小:
Zookeeper启动脚本程序:zookeeper-server-start.shZookeeper中Kafka堆内存大小变量名称:KAFKA_HEAP_OPTSKafka启动脚本程序:kafka-server-start.shKafka堆内存大小变量名称:KAFKA_HEAP_OPTS5、停止集群#停止Kafka,无需指定端口号就能停止各个实例:
kafka-server-stop.sh
#停止zk
zookeeper-server-stop.sh
二、使用集群 1、在集群上创建主题kafka-topics.sh\
--bootstrap-server192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000\
--create\
--partitions3\
--replication-factor3\
--topicmy-cluster-topic
2、查看集群主题kafka-topics.sh\
--bootstrap-server192.168.200.100:7000\
--describe--topicmy-cluster-topic
image-20231121164358865.png3、集群消息发送kafka-console-producer.sh\
--bootstrap-server192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000\
--topicmy-cluster-topic
4、集群消息消费kafka-console-consumer.sh\
--bootstrap-server192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000\
--from-beginning\
--topicmy-cluster-topic
5、集群消息消费相关问题①问题描述通过集群接收消息时,接收不到
②问题产生原因多个broker实例部署在同一个虚拟机上
192.168.200.100:7000192.168.200.100:8000192.168.200.100:9000这只是我们在测试环境下,非正式的这么安排,实际开发中不会把集群的所有实例放在一个机器上
③问题解决方案一消费端接收消息时指定分区
kafka-console-consumer.sh\
--bootstrap-server192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000\
--from-beginning\
--partition0\
--topicmy-cluster-topic
kafka-console-consumer.sh\
--bootstrap-server192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000\
--from-beginning\
--partition1\
--topicmy-cluster-topic
kafka-console-consumer.sh\
--bootstrap-server192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000\
--from-beginning\
--partition2\
--topicmy-cluster-topic
④问题解决方案二第一步:把apache-zookeeper-3.9.1-bin.tar.gz上传到Linux系统/opt目录下第二步:解压apache-zookeeper-3.9.1-bin.tar.gz文件cd/opt
tar-zxvfapache-zookeeper-3.9.1-bin.tar.gz
第三步:运行zkCli.sh脚本文件,登录到Zookeeper服务器/opt/apache-zookeeper-3.9.1-bin/bin/zkCli.sh
第四步:删除__consumer_offsets主题deleteall/brokers/topics/__consumer_offsets
第五步:退出Zookeeperquit
第六步:重启先关闭然后重新启动Zookeeper先关闭然后重新启动集群各实例八 客户端SpringBoot一、生产者 1、配置POMparent
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-parent/artifactId
version3.1.3/version
relativePath/
/parent
dependencies
dependency
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-web/artifactId
/dependency
dependency
groupIdorg.projectlombok/groupId
artifactIdlombok/artifactId
optionaltrue/optional
/dependency
dependency
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-test/artifactId
scopetest/scope
/dependency
!--spring-kafka--
dependency
groupIdorg.springframework.kafka/groupId
artifactIdspring-kafka/artifactId
/dependency
!--hutool--
dependency
groupIdcn.hutool/groupId
artifactIdhutool-all/artifactId
version5.8.19/version
/dependency
/dependencies
build
plugins
plugin
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-maven-plugin/artifactId
configuration
excludes
exclude
groupIdorg.projectlombok/groupId
artifactIdlombok/artifactId
/exclude
/excludes
/configuration
/plugin
/plugins
/build
2、配置YAMLspring:
kafka:
bootstrap-servers:192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000
producer:
key-serializer:org.apache.kafka.common.serialization.StringSerializer
value-serializer:org.apache.kafka.common.serialization.StringSerializer
3、主启动类
importorg.springframework.boot.SpringApplication;
importorg.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
publicclassKafkaMainType{
publicstaticvoidmain(String[]args){
SpringApplication.run(KafkaMainType.class,args);
}
}
4、配置类创建主题
importorg.apache.kafka.clients.admin.NewTopic;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.kafka.config.TopicBuilder;
@Configuration
publicclassKafkaConfig{
@Bean
publicNewTopicspringTestTopic(){
returnTopicBuilder.name("topic-spring-boot")//主题名称
.partitions(3)//分区数量
.replicas(3)//副本数量
.build();
}
}
到这里我们可以运行主启动类,看看主题是否创建成功
kafka-topics.sh--bootstrap-server192.168.200.100:7000--list
5、发送消息①命令行监听消息kafka-console-consumer.sh--bootstrap-server192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000--topictopic-spring-boot--partition0
kafka-console-consumer.sh--bootstrap-server192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000--topictopic-spring-boot--partition1
kafka-console-consumer.sh--bootstrap-server192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000--topictopic-spring-boot--partition2
②Java代码importjakarta.annotation.Resource;
importorg.junit.jupiter.api.Test;
importorg.springframework.boot.test.context.SpringBootTest;
importorg.springframework.kafka.core.KafkaTemplate;
@SpringBootTest
publicclassKafkaTest{
@Resource
privateKafkaTemplatekafkaTemplate;
@Test
publicvoidtestSendMessage(){
StringtopicName="topic-spring-boot";
Stringmessage="hellospringbootmessage";
kafkaTemplate.send(topicName,message);
}
}
二、消费者 1、配置POMparent
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-parent/artifactId
version3.1.3/version
relativePath/
/parent
dependencies
dependency
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-web/artifactId
/dependency
dependency
groupIdorg.projectlombok/groupId
artifactIdlombok/artifactId
optionaltrue/optional
/dependency
!--spring-kafka--
dependency
groupIdorg.springframework.kafka/groupId
artifactIdspring-kafka/artifactId
/dependency
!--hutool--
dependency
groupIdcn.hutool/groupId
artifactIdhutool-all/artifactId
version5.8.19/version
/dependency
/dependencies
build
plugins
plugin
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-maven-plugin/artifactId
configuration
excludes
exclude
groupIdorg.projectlombok/groupId
artifactIdlombok/artifactId
/exclude
/excludes
/configuration
/plugin
/plugins
/build
2、配置YAMLspring:
Kafka:
bootstrap-servers:192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000
consumer:
key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
value-deserializer:org.apache.kafka.common.serialization.StringDeserializer
group-id:consumer-group
3、主启动类
importorg.springframework.boot.SpringApplication;
importorg.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
publicclassKafkaMainTypeConsumer{
publicstaticvoidmain(String[]args){
SpringApplication.run(KafkaMainTypeConsumer.class,args);
}
}
4、接收消息的监听器
importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.springframework.kafka.annotation.KafkaListener;
importorg.springframework.stereotype.Component;
@Component
publicclassKafkaMessageListener{
@KafkaListener(topics={"topic-spring-boot"})
publicvoidsimpleConsumerPartition(ConsumerRecordString,Stringrecord){
System.out.println("进入simpleConsumer方法");
System.out.printf(
"分区=%d,偏移量=%d,key=%s,内容=%s,时间戳=%d%n",
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
}
}
注意:这里我们没有指定具体接收哪个分区的消息,所以如果接收不到消息,那么就需要登录Zookeeper删除__consumer_offsets
deleteall/brokers/topics/__consumer_offsets
三、实体类对象类型的消息 1、创建实体类
importlombok.AllArgsConstructor;
importlombok.Data;
@Data
@AllArgsConstructor
publicclassUserDTO{
privateStringname;
privateInteger
privateStringmobile;
}
2、发送消息的方法@Test
publicvoidtestSendEntity(){
StringtopicName="topic-spring-boot230628";
UserDTOuserDTO=newUserDTO("tom",25,"12345343");
kafkaTemplate.send(topicName,userDTO);
}
3、异常异常全类名:java.lang.ClassCastException异常信息:class com.atguigu.kafka.entity.UserDTO cannot be cast to class java.lang.String (com.atguigu.kafka.entity.UserDTO is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')异常原因:目前使用的序列化器是StringSerializer,不支持非字符串序列化解决办法:把序列化器换成支持复杂类型的4、修改YAML配置spring:
kafka:
bootstrap-servers:192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000
producer:
key-serializer:org.apache.kafka.common.serialization.StringSerializer
#value-serializer:org.apache.kafka.common.serialization.StringSerializer
value-serializer:org.springframework.kafka.support.serializer.JsonSerializer
阅读原文
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设、网站改版、域名注册、主机空间、手机网站建设、网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。 项目经理在线