消息队列与Kafka的基本整合使用

  1. 1. 前言
    1. 1.1 消息队列基本介绍
      1. 1.1.1 消息队列的特性及使用原因
      2. 1.1.2 消息队列应用场景
    2. 1.2 RabbitMQ基本介绍
      1. 1.2.1 RabbitMQ简介
      2. 1.2.2 RabbitMQ整体架构
    3. 1.3 ZooKeeper与Kafka基本介绍
      1. 1.3.1 ZooKeeper简介
      2. 1.3.2 Kafka简介
      3. 1.3.3 Kafka基本概念
      4. 1.3.4 Kafka的特性及使用场景
  2. 2. 消息队列服务的搭建
    1. 2.1 准备Docker及Docker Compose环境
      1. 2.2.1 搭建Docker环境
      2. 2.2.2 搭建Docker Compose环境
    2. 2.2 搭建RabbitMQ服务
      1. 2.2.1 拉取镜像并运行容器
      2. 2.2.2 RabbitMQ创建用户并可视化查看
    3. 2.3 搭建ZooKeeper与Kafka服务
      1. 2.3.1 本机部署单机版Kafka
      2. 2.3.2 Docker部署单机版Kafka
      3. 2.3.3 Docker部署集群版Kafka
      4. 2.3.4 搭建Kafka管理平台
  3. 3. 使用Springboot操作Kafka
    1. 3.1 项目依赖及配置文件
    2. 3.2 生产消费过程的测试代码
    3. 3.3 启动测试类查看效果
  4. 4. 使用Python操作Kafka
    1. 4.1 kafka-python库简介
    2. 4.2 生产者与消费者示例
    3. 4.3 Kafka生产者性能调优
  5. 5. 参考资料

1. 前言

1.1 消息队列基本介绍

消息队列中间件是分布式系统中重要的组件,主要解决异步处理,应用解耦,流量削峰、日志处理和消息通讯等问题。实现高性能、高可用、可伸缩和最终一致性架构。比较流行的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

消息队列中间件对比:

消息队列中间件的对比

如何技术选型:

  • 中小型软件公司,建议选RabbitMQ。一方面,Erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。他的弊端也在这里,虽然RabbitMQ是开源的,然而国内能定制化开发Erlang的程序员太少,所幸RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要。不考虑RocketMQ和kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除。不考虑RocketMQ的原因是,RocketMQ是阿里出品,如果阿里放弃维护RocketMQ,中小型公司一般抽不出人来进行RocketMQ的定制化开发,因此不推荐。
  • 大型软件公司,根据具体使用在RocketMQ和Kafka之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对RocketMQ,大型软件公司也可以抽出人手对RocketMQ进行定制化开发,毕竟国内有能力改Java源码的人,还是相当多的。至于Kafka,根据业务场景选择,如果有日志采集功能,肯定是首选Kafka了。具体该选哪个,看使用场景。

1.1.1 消息队列的特性及使用原因

消息队列的特性

  • 业务无关,一个具有普适性质的消息队列组件不需要考虑上层的业务模型,只做好消息的分发就可以了,上层业务的不同模块反而需要依赖消息队列所定义的规范进行通信。
  • FIFO,先投递先到达的保证是一个消息队列和一个buffer的本质区别。
  • 容灾,对于普适的消息队列组件来说,节点的动态增删和消息的持久化,都是支持其容灾能力的重要基本特性。当然,这个特性对于游戏服务器中大部分应用中的消息队列来说不是必须的,这个也是跟应用情景有关的,很多时候没有这种持久化的需求。
  • 性能,消息队列的吞吐量上去了,整个系统的内部通信效率也会有提高。

为什么需要消息队列

  • 当系统中出现“生产”和“消费”的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中,“消息队列 ”是在消息的传输过程中保存消息的容器。

1.1.2 消息队列应用场景

以下介绍消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削峰、日志处理和消息通讯五个场景。

[1] 异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种:串行的方式和并行方式。

串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户。

异步处理-1

并行方式:将注册信息写入数据库成功后,发送注册邮件的同时发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。

异步处理-2

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间是100毫秒。传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈,引入消息队列异步处理,改造后的架构如下:

异步处理-3

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是55毫秒。因此架构改变后,系统的吞吐量比串行提高了3倍,比并行提高了两倍。

[2] 应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:

应用解耦-1

传统模式的缺点是:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合。

引入应用消息队列后的方案,如下图:

应用解耦-2

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。

假如在下单时库存系统不能正常使用,也不影响正常下单。因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

[3] 流量削峰

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。可以控制活动的人数,可以缓解短时间内高流量压垮应用。

流量削峰

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理。

[4] 日志处理

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:

日志处理-1

日志采集客户端,负责日志数据采集,定时写受写入Kafka队列。Kafka消息队列,负责日志数据的接收,存储和转发。日志处理应用,订阅并消费kafka队列中的日志数据。

以下是新浪kafka日志处理应用案例:

日志处理-2
  • Kafka:接收用户日志的消息队列
  • Logstash:做日志解析,统一成JSON输出给Elasticsearch
  • Elasticsearch:实时日志分析服务的核心技术,实时的数据存储服务,兼具强大的搜索和统计功能
  • Kibana:基于Elasticsearch的数据可视化组件

[5] 消息通讯

消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列、聊天室等。

点对点通讯:客户端A和客户端B使用同一队列,进行消息通讯。

消息通讯-1

聊天室通讯:客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。

消息通讯-2

1.2 RabbitMQ基本介绍

1.2.1 RabbitMQ简介

RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议)的开源实现。它作为一个消息代理,主要负责接收并转发消息。它提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

1.2.2 RabbitMQ整体架构

RabbitMQ整体架构如下图所示:

RabbitMQ整体架构

基本概念的解释如下:

  • Producer(生产者):消息生产者。
  • Consumer(消费者):消息消费者。
  • Connection:生产者/消费者 和 broker 之间的 TCP连接。
  • Channel:Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel 之间是完全隔离的。
  • Brokder:接收和分发消息的应用,RabbitMQ Server就是 Message Broker。
  • Virtual host:出于多租户和安全因素设计的,把AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQserver 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建exchange/queue 等。
  • Exchange(交换机):message 到达broker的第一站,根据分发规则,匹配查询表中的routingkey,分发消息到queue 中去。
  • Queue(队列):消息最终被送到这里等待 consumer 取走。
  • Binding:exchange和queue之间的虚拟连接,Binding 中可以包含routingkey。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。

1.3 ZooKeeper与Kafka基本介绍

1.3.1 ZooKeeper简介

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby的一个开源实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。它是由多个节点共同组成一个分布式集群,挂掉任意一个节点,仍然可以正常工作,客户端无感知故障,客户端向任意一个节点写入数据,其他节点可以立即看到最新的数据。

1.3.2 Kafka简介

Kafka 最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于ZooKeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

Kafka架构

1.3.3 Kafka基本概念

  • Topic:一组消息数据的标记符;
  • Producer:生产者,用于生产数据,可将生产后的消息送入指定的 Topic;
  • Consumer:消费者,获取数据,可消费指定的 Topic 里面的数据
  • Group:消费者组,同一个 group 可以有多个消费者,一条消息在一个 group 中,只会被一个消费者 获取;
  • Partition:分区,为了保证 kafka 的吞吐量,一个 Topic 可以设置多个分区。同一分区只能被一个消费者订阅。

1.3.4 Kafka的特性及使用场景

Kafka特性

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;
  • 可扩展性:kafka集群支持热扩展;
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
  • 高并发:支持数千个客户端同时读写;
  • 支持实时在线处理和离线处理:可以使用Storm这种实时流处理系统对消息进行实时进行处理,同时还可以使用Hadoop这种批处理系统进行离线处理。

Kafka使用场景

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等;
  • 消息系统:解耦和生产者和消费者、缓存消息等;
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

2. 消息队列服务的搭建

以下我将采用Docker的方式进行搭建,VPS系统用的是Debian 11 x86_64。VPS的购买及配置、Docker及Docker Compose的概念及使用…这些基本的就不再赘述了,如果不会的话见我的另一篇博客:VPS基本部署环境的搭建与配置

2.1 准备Docker及Docker Compose环境

2.2.1 搭建Docker环境

1
2
3
4
$ apt-get update -y && apt-get install curl -y  # 安装curl
$ curl https://get.docker.com | sh - # 安装docker
$ sudo systemctl start docker # 启动docker服务
$ docker version # 查看docker版本(客户端要与服务端一致)

2.2.2 搭建Docker Compose环境

1
2
3
4
5
6
// 下载安装docker-compose,最新版见:https://github.com/docker/compose/releases
$ sudo curl -L https://github.com/docker/compose/releases/download/1.29.2/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
// 赋予docker-compose执行权限
$ sudo chmod +x /usr/local/bin/docker-compose
// 查看docker-compose版本号,验证是否安装成功
$ docker-compose --version

docker-compose

2.2 搭建RabbitMQ服务

2.2.1 拉取镜像并运行容器

1
2
$ docker pull rabbitmq:3.8-management
$ docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq:3.8-management

注:默认RabbitMQ镜像是不带web端管理插件的,所以指定了镜像tag为3.8-management,表示下载包含web管理插件版本镜像。

2.2.2 RabbitMQ创建用户并可视化查看

用Chrome访问http://ip:15672即可访问RabbitMQ的Web端管理界面,默认用户名和密码都是guest,出现如下界面代表已经成功了。

RabbitMQ

默认的 guest 账户有访问限制,只能通过本地网络访问,远程网络访问受限,所以在使用时我们一般另外添加用户。

1
2
3
4
5
6
$ docker exec -i -t rabbitmq  bin/bash  
$ rabbitmqctl add_user root 123456 // 添加用户(实际密码设置复杂一些)
$ rabbitmqctl set_permissions -p / root ".*" ".*" ".*" // 赋予root用户所有权限
$ rabbitmqctl set_user_tags root administrator // 赋予root用户administrator角色
$ rabbitmqctl list_users // 查看所有用户即可看到root用户已经添加成功
$ exit

2.3 搭建ZooKeeper与Kafka服务

经实测,本机部署的单机版与Docker部署的单机版Kafka,使用同样的配置参数,实际性能大致相同。

2.3.1 本机部署单机版Kafka

本机部署单机版Kafka可以参照官方教程:https://kafka.apache.org/quickstart

Step1:从 Kafka官网 下载最新版安装包,并解压

1
2
3
$ wget https://dlcdn.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
$ tar -xzf kafka_2.13-3.3.1.tgz
$ cd kafka_2.13-3.3.1

Step2:后台启动ZooKeeper服务

1
$ nohup /root/kafka/kafka_2.13-3.3.1/bin/zookeeper-server-start.sh /root/kafka/kafka_2.13-3.3.1/config/zookeeper.properties 1>/dev/null 2>&1 &

注:如果2181端口没被占用,可以使用默认配置启动,当然也可以根据实际情况对 zookeeper.properties 的配置进行修改。

Step3:后台启动Kafka服务

1
$ nohup /root/kafka/kafka_2.13-3.3.1/bin/kafka-server-start.sh /root/kafka/kafka_2.13-3.3.1/config/server.properties 1>/dev/null 2>&1 &

注:如果9092端口没被占用,可以使用默认配置启动,当然也可以根据实际情况对 server.properties 的配置进行修改,性能调优会用到这个。

2.3.2 Docker部署单机版Kafka

[1] 部署单机版 Kafka 服务

kafka的运行依赖于zookeeper,因而编写zookeeper与kafka的编排文件docker-compose.yml内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
version: '3.2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
restart: always
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://IP:9092
- KAFKA_LISTENERS=PLAINTEXT://:9092
volumes:
- ./docker.sock:/var/run/docker.sock
restart: always

注:KAFKA_ADVERTISED_LISTENERS 填写为 PLAINTEXT://IP:9092,这里的 IP 填写成你的公网 IP,如果没带上这个的话,PC是无法连接到服务器上的 kafka 服务的。这里搭建的 kafka 服务仅用于测试,没有设置用户名及密码,勿用于公网生产环境。

编写完毕后,在该文件下的目录下依次执行下面两条命令即可构建好zookeeper和kafka容器:

1
2
$ docker-compose build     // 构建镜像
$ docker-compose up -d // 运行容器

配置文件目录:/opt/kafka_2.13-2.8.1/config

[2] 验证Kafka是否搭建成功

进入到kafka容器中 并创建topic生产者,执行如下命令:

1
2
3
4
$ docker exec -it kafka /bin/bash
$ cd /opt/kafka_2.13-2.8.1/bin/
$ ./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 8 --topic test
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

执行上述命令后,另起一个窗口,执行如下命令,创建kafka消费者消费消息。

1
2
3
$ docker exec -it kafka /bin/bash
$ cd /opt/kafka_2.13-2.8.1/bin/
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

执行完上诉命令后,在生产者窗口中输入任意内容回车,即可在消费者的窗口查看到消息。

注:kafka_2.13-2.8.1的含义为,2.13是Scala版本,2.8.1是Kafka版本。

2.3.3 Docker部署集群版Kafka

把编排文件docker-compose.yml修改成如下内容,即可部署集群版Kafka(如下是3个节点,如果需要更多可以在后面继续追加)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
version: '3.3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- 2181:2181
volumes:
- ./data/zookeeper/data:/data
- ./data/zookeeper/datalog:/datalog
- ./data/zookeeper/logs:/logs
restart: always
kafka1:
image: wurstmeister/kafka
depends_on:
- zookeeper
container_name: kafka1
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://IP:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_LOG_DIRS: /data/kafka-data
KAFKA_LOG_RETENTION_HOURS: 168
volumes:
- ./data/kafka1/data:/data/kafka-data
restart: unless-stopped
kafka2:
image: wurstmeister/kafka
depends_on:
- zookeeper
container_name: kafka2
ports:
- 9093:9093
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://IP:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
KAFKA_LOG_DIRS: /data/kafka-data
KAFKA_LOG_RETENTION_HOURS: 168
volumes:
- ./data/kafka2/data:/data/kafka-data
restart: unless-stopped
kafka3:
image: wurstmeister/kafka
depends_on:
- zookeeper
container_name: kafka3
ports:
- 9094:9094
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://IP:9094
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
KAFKA_LOG_DIRS: /data/kafka-data
KAFKA_LOG_RETENTION_HOURS: 168
volumes:
- ./data/kafka3/data:/data/kafka-data
restart: unless-stopped

2.3.4 搭建Kafka管理平台

[1] kafka-map

kafka-map是一个美观简洁且强大的kafka web管理工具。

项目地址:https://github.com/dushixiang/kafka-map

1
2
3
4
5
6
7
docker run -d \
-p 8080:8080 \
-v /root/kafka-map/data:/usr/local/kafka-map/data \
-e DEFAULT_USERNAME=your_user \
-e DEFAULT_PASSWORD=your_password \
--name kafka-map \
--restart always dushixiang/kafka-map:latest

用Chrome访问http://ip:8080即可访问 kafka-map 管理界面

kafka-map

[2] kafka-manager

kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源,用户可以在Web界面执行一些简单的集群管理操作。

1
2
$ docker pull sheepkiller/kafka-manager
$ docker run --name kafka-manager -itd -p 9000:9000 -e ZK_HOSTS="IP:2181" --net=host sheepkiller/kafka-manager // 把IP处换成你的服务器IP地址

用Chrome访问http://ip:9000即可访问 kafka-manager 管理界面

kafka管理面板-1

连接kafka:点击Cluster,选择Add Cluster,填写Cluster Name(随便起)、Cluster Zookeeper Hosts(zookeeper地址)保存即可。

kafka管理面板-2

3. 使用Springboot操作Kafka

以下用一个简单的项目示例,演示一下Springboot整合Kafka完成生产消费过程,示例项目结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
.
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │   └── yoyo
│   │   │   └── springbootkafka
│   │   │   ├── SpringbootKafkaApplication.java
│   │   │   ├── consumer
│   │   │   │   └── KafkaConsumer.java
│   │   │   ├── domain
│   │   │   │   └── User.java
│   │   │   └── producer
│   │   │   └── KafkaProducer.java
│   │   └── resources
│   │   └── application.yml
│   └── test
│   └── java
│   └── com
│   └── yoyo
│   └── springbootkafka
│   └── SpringbootKafkaApplicationTests.java
└── pom.xml

3.1 项目依赖及配置文件

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.sxw</groupId>
<artifactId>springboot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-kafka</name>
<description>springboot kafka demo</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.2.2.RELEASE</version>
</plugin>
</plugins>
</build>

</project>

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
spring:
application:
name: springboot-kafka
kafka:
# 指定kafka代理地址,可以多个
bootstrap-servers: ip:9092
producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
# 缓存容量
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定默认消费者group id
group-id: consumer-tutorial
auto-commit-interval: 100
auto-offset-reset: earliest
enable-auto-commit: true
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 指定listener 容器中的线程数,用于提高并发量
listener:
concurrency: 3

3.2 生产消费过程的测试代码

KafkaProducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.yoyo.springbootkafka.producer;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
* 消息生产者
*/
@Component
public class KafkaProducer<T> {

private Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

/**
* kafka 发送消息
*
* @param obj 消息对象
*/
public void send(T obj) {
String jsonObj = JSON.toJSONString(obj);
logger.info(">>> message = {}", jsonObj);

//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("kafka.tut", jsonObj);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
logger.info("Produce: The message failed to be sent:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
//TODO 业务处理
logger.info("Produce: The message was sent successfully:");
logger.info("Produce: >>> result: " + stringObjectSendResult.toString());
}
});
}
}

KafkaConsumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.yoyo.springbootkafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
* 消息消费者
*/
@Component
public class KafkaConsumer<T> {

private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

/**
* 监听kafka.tut 的 topic
*
* @param record
* @param topic topic
*/
@KafkaListener(id = "tut", topics = "kafka.tut")
public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
//判断是否NULL
Optional<?> kafkaMessage = Optional.ofNullable(record.value());

if (kafkaMessage.isPresent()) {
//获取消息
Object message = kafkaMessage.get();

logger.info("Receive: +++++++++++++++ Topic:" + topic);
logger.info("Receive: +++++++++++++++ Record:" + record);
logger.info("Receive: +++++++++++++++ Message:" + message);
}
}

}

User.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.yoyo.springbootkafka.domain;

import lombok.Data;

import java.util.Date;

@Data
public class User {

private long id;
private String msg;
private Date sendTime;

}

SpringbootKafkaApplication.java

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.yoyo.springbootkafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbootKafkaApplication {

public static void main(String[] args) {
SpringApplication.run(SpringbootKafkaApplication.class, args);
}

}

SpringbootKafkaApplicationTests.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.yoyo.springbootkafka;

import com.yoyo.springbootkafka.domain.User;
import com.yoyo.springbootkafka.producer.KafkaProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Date;
import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootKafkaApplicationTests {

@Autowired
private KafkaProducer<User> kafkaSender;

@Test
public void kafkaSend() throws InterruptedException {
// 模拟发送消息
for (int i = 0; i < 5; i++) {
User user = new User();
user.setId(System.currentTimeMillis());
user.setMsg(UUID.randomUUID().toString());
user.setSendTime(new Date());
kafkaSender.send(user);
Thread.sleep(3000);
}
}

}

3.3 启动测试类查看效果

配置好之后,在 SpringbootKafkaApplicationTests.java 文件处启动项目,我们可以看到如下输出,成功完成了生产消费过程。

Springboot与Kafka整合的测试

4. 使用Python操作Kafka

4.1 kafka-python库简介

项目简介:Apache Kafka 分布式流处理系统的 Python 客户端。kafka-python 的设计功能与官方 java 客户端非常相似,带有一些 pythonic 接口。

项目地址:https://github.com/dpkp/kafka-python

官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/modules.html

1
$ pip install kafka-python

4.2 生产者与消费者示例

生产者示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# -*- coding: utf-8 -*-

import json
from kafka import KafkaProducer


class KProducer:
def __init__(self, bootstrap_servers, topic):
"""
kafka_deploy 生产者
:param bootstrap_servers: 地址
:param topic: topic
"""
# 参数说明详见:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda m: json.dumps(m).encode('ascii'), # json 格式化发送的内容
acks=1, # 在考虑请求完成之前,生产者要求leader收到的确认数量。默认值:1
buffer_memory=33554432, # 用于缓冲等待发送到服务器的记录的内存总字节数,默认值:33554432(32MB)
max_block_ms=60000, # 阻塞的毫秒数,默认值:60000
retries=3, # 重试次数,默认值:0
batch_size=16384, # 一个批次可以使用的内存大小,按照字节数计算,默认值:16384(16KB)
linger_ms=10, # 生产者在发送批次之前等待更多消息加入批次的时间,默认值0
max_request_size=1048576) # 请求的最大大小,默认值:1048576(1MB)
self.topic = topic

def sync_producer(self, data_li: list):
"""
同步发送 数据
:param data_li:发送数据
:return:
"""
for data in data_li:
future = self.producer.send(self.topic, data)
record_metadata = future.get(timeout=10) # 同步确认消费
partition = record_metadata.partition # 数据所在的分区
offset = record_metadata.offset # 数据所在分区的位置
print('save success, partition: {}, offset: {}'.format(partition, offset))

def asyn_producer(self, data_li: list):
"""
异步发送数据
:param data_li:发送数据
:return:
"""
for data in data_li:
self.producer.send(self.topic, data)
self.producer.flush() # 批量提交

def asyn_producer_callback(self, data_li: list):
"""
异步发送数据 + 发送状态处理
:param data_li:发送数据
:return:
"""
for data in data_li:
self.producer.send(self.topic, data).add_callback(self.send_success).add_errback(self.send_error)
self.producer.flush() # 批量提交

def send_success(self, *args, **kwargs):
"""异步发送成功回调函数"""
print('save success')
return

def send_error(self, *args, **kwargs):
"""异步发送错误回调函数"""
print('save error')
return

def close_producer(self):
try:
self.producer.close()
except:
pass


if __name__ == '__main__':
send_data_li = [{"test": 1}, {"test": 2}]
kp = KProducer(topic='topic', bootstrap_servers='127.0.0.1:9092')

# 同步发送
# kp.sync_producer(send_data_li)

# 异步发送
# kp.asyn_producer(send_data_li)

# 异步+回调
kp.asyn_producer_callback(send_data_li)

kp.close_producer()

消费者示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# -*- coding: utf-8 -*-

from kafka import KafkaConsumer


if __name__ == '__main__':

# 创建一个消费者,指定了topic,bootstrap_servers,这种方式只会获取新产生的数据

bootstrap_server_list = [
'127.0.0.1:9092'
]

consumer = KafkaConsumer(
# kafka 集群地址
bootstrap_servers=','.join(bootstrap_server_list),
enable_auto_commit=True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
auto_commit_interval_ms=5000, # 自动提交的周期(毫秒)
)

consumer.subscribe(["topic"]) # 消息的主题,可以指定多个

for msg in consumer: # 迭代器,等待下一条消息
print(msg) # 打印消息

4.3 Kafka生产者性能调优

[1] 设置数据压缩类型

在代码里添加 compression_type 的配置

1
compression_type='lz4'     # 生产者生成的所有数据压缩类型。有效值为“gzip”、“snappy”、“lz4”或“none”

注意事项:

1)需要单独安装库包用于压缩

1
2
$ pip install python-snappy==0.6.1
$ pip install lz4==4.0.2

2)压缩算法的选择

查阅资料:吞吐量要求比较高的建议使用 lz4,网络带宽和磁盘资源不充足的建议使用 snappy

对比维度 结果
压缩比 zstd > lz4 > gzip > snappy
压缩解压缩吞吐量 lz4> snappy > zstd > gzip

实际测试:compression_type 支持的3种压缩算法中,lz4 的压缩率最高,压缩后的吞吐量也最高。

[2] 调整缓冲池及批次大小等参数

调整成合适的 buffer_memory、batch_size、max_request_size,以下只是示例,需要根据实际业务情况及服务器配置而定。

1
2
3
buffer_memory=3355443200,    # 用于缓冲等待发送到服务器的记录的内存总字节数,默认值:33554432(32MB)
batch_size=20971520, # 一个批次可以使用的内存大小,按照字节数计算,默认值:16384(16KB)
max_request_size=20971520 # 请求的最大大小,默认值:1048576(1MB)

[3] 调整消息发送延时

linger.ms,这个值默认是 0,意思就是消息必须立即被发送。一般设置个 5-100 毫秒。如果 linger.ms 设置的太小,会导致频繁网络请求,吞吐量下降;如果 linger.ms 太长, 会导致一条消息需要等待很久才能被发送出去,增加网络延时。

1
linger_ms=50                 # 生产者在发送批次之前等待更多消息加入批次的时间,默认值0

注:这个值设置成0的话,会导致频繁请求,实际执行速度不如主动设置几十毫秒延时。

5. 参考资料

[1] 为什么需要消息队列?使用消息队列有什么好处?from 51CTO

[2] 消息队列常见的 5 个应用场景 from segmentfault

[3] 也许你真的不懂RabbitMQ和Kafka的区别 from 腾讯云

[4] 什么时候用RabbitMQ 什么时候用Apache Kafka?from CSDN

[5] 常用消息队列总结和区别 from 智能后端和架构

[6] Kafka的简介 from cnblogs

[7] 入门 Kafka 你所需要了解的基本概念和开发模式 from segmentfault

[8] 全网最详细RabbitMQ基本概念 from 稀土掘金

[9] Spring Boot2.0 整合 Kafka from 博客园

[10] Spring Boot2.0 整合 Kafka from Github

[11] Python操作Kafka的通俗总结(kafka-python)from 知乎

[12] Python 操作 Kafka — kafka-python from CSDN

[13] kafka的文件清除策略 from CSDN

[14] Kafka消息超过最大值限制max.request.size from CSDN

[15] Kafka producer的几个重要配置参数 from Jmx’s Blog

[16] 彻底搞懂 Kafka 消息大小相关参数设置的规则 from 知乎

[17] kafka生产调优手册 from Jeff的技术栈

[18] 调优Kafka,你做到了吗 from 技术文章摘抄

[19] KAFKA参数调优实战,看这篇文章就够了 from CSDN

[20] Kafka参数调优 from CSDN

[21] docker-compose 搭建 kafka 集群 from 博客园