Kafka消息队列概述及封装使用

8/13/2021 消息队列RabbitMQKafkaZooKeeperkafka管理面板

# 1. 前言

# 1.1 消息队列概述

# 1.1.1 消息队列技术选型

消息队列中间件是分布式系统中重要的组件,主要解决异步处理,应用解耦,流量削峰、日志处理和消息通讯等问题。实现高性能、高可用、可伸缩和最终一致性架构。比较流行的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

消息队列中间件对比:

消息队列中间件的对比

如何技术选型:

  • 中小型软件公司,建议选RabbitMQ。一方面,Erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。他的弊端也在这里,虽然RabbitMQ是开源的,然而国内能定制化开发Erlang的程序员太少,所幸RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要。不考虑RocketMQ和kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除。不考虑RocketMQ的原因是,RocketMQ是阿里出品,如果阿里放弃维护RocketMQ,中小型公司一般抽不出人来进行RocketMQ的定制化开发,因此不推荐。
  • 大型软件公司,根据具体使用在RocketMQ和Kafka之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对RocketMQ,大型软件公司也可以抽出人手对RocketMQ进行定制化开发,毕竟国内有能力改Java源码的人,还是相当多的。至于Kafka,根据业务场景选择,如果有日志采集功能,肯定是首选Kafka了。具体该选哪个,看使用场景。

# 1.1.2 消息队列的特性及使用原因

消息队列的特性

  • 业务无关,一个具有普适性质的消息队列组件不需要考虑上层的业务模型,只做好消息的分发就可以了,上层业务的不同模块反而需要依赖消息队列所定义的规范进行通信。
  • FIFO,先投递先到达的保证是一个消息队列和一个buffer的本质区别。
  • 容灾,对于普适的消息队列组件来说,节点的动态增删和消息的持久化,都是支持其容灾能力的重要基本特性。当然,这个特性对于游戏服务器中大部分应用中的消息队列来说不是必须的,这个也是跟应用情景有关的,很多时候没有这种持久化的需求。
  • 性能,消息队列的吞吐量上去了,整个系统的内部通信效率也会有提高。

为什么需要消息队列

  • 当系统中出现“生产”和“消费”的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中,“消息队列 ”是在消息的传输过程中保存消息的容器。

# 1.1.3 消息队列应用场景

以下介绍消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削峰、日志处理和消息通讯五个场景。

[1] 异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种:串行的方式和并行方式。

串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户。

异步处理-1

并行方式:将注册信息写入数据库成功后,发送注册邮件的同时发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。

异步处理-2

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间是100毫秒。传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈,引入消息队列异步处理,改造后的架构如下:

异步处理-3

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是55毫秒。因此架构改变后,系统的吞吐量比串行提高了3倍,比并行提高了两倍。

[2] 应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:

应用解耦-1

传统模式的缺点是:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合。

引入应用消息队列后的方案,如下图:

应用解耦-2

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。

假如在下单时库存系统不能正常使用,也不影响正常下单。因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

[3] 流量削峰

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。可以控制活动的人数,可以缓解短时间内高流量压垮应用。

流量削峰

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理。

[4] 日志处理

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:

日志处理-1

日志采集客户端,负责日志数据采集,定时写受写入Kafka队列。Kafka消息队列,负责日志数据的接收,存储和转发。日志处理应用,订阅并消费kafka队列中的日志数据。

以下是新浪kafka日志处理应用案例:

日志处理-2
  • Kafka:接收用户日志的消息队列
  • Logstash:做日志解析,统一成JSON输出给Elasticsearch
  • Elasticsearch:实时日志分析服务的核心技术,实时的数据存储服务,兼具强大的搜索和统计功能
  • Kibana:基于Elasticsearch的数据可视化组件

[5] 消息通讯

消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列、聊天室等。

点对点通讯:客户端A和客户端B使用同一队列,进行消息通讯。

消息通讯-1

聊天室通讯:客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。

消息通讯-2

# 1.2 Kafka概述

# 1.2.1 Kafka简介及基本概念

Kafka (opens new window) 最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于ZooKeeper协调的分布式日志系统(也可以当做MQ系统),Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

Kafka架构

Kafka基本概念如下:

  • Topic:一组消息数据的标记符;
  • Producer:生产者,用于生产数据,可将生产后的消息送入指定的 Topic;
  • Consumer:消费者,获取数据,可消费指定的 Topic 里面的数据
  • Group:消费者组,同一个 group 可以有多个消费者,一条消息在一个 group 中,只会被一个消费者 获取;
  • Partition:分区,为了保证 kafka 的吞吐量,一个 Topic 可以设置多个分区。同一分区只能被一个消费者订阅。

# 1.2.2 Kafka的特性及使用场景

Kafka特性

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;
  • 可扩展性:kafka集群支持热扩展;
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
  • 高并发:支持数千个客户端同时读写;
  • 支持实时在线处理和离线处理:可以使用Storm这种实时流处理系统对消息进行实时进行处理,同时还可以使用Hadoop这种批处理系统进行离线处理。

Kafka使用场景

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等;
  • 消息系统:解耦和生产者和消费者、缓存消息等;
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

# 1.2.3 Kafka历史消息的删除及压缩策略

[1] 删除策略

  • Kafka日志管理器允许定制删除策略。目前的策略是删除修改时间在N天之前的日志(按时间删除),也可以使用另外一个策略:保留最后的N GB数据的策略(按大小删除)。为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。
  • Kafka消费日志删除思想:Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。

[2] 压缩策略

  • 将数据压缩,只保留每个key最后一个版本的数据。首先在broker的配置中设置log.cleaner.enable=true启用cleaner,这个默认是关闭的。在Topic的配置中设置log.cleanup.policy=compact启用压缩策略。

  • 压缩策略的细节如下:在整个数据流中,每个Key都有可能出现多次,压缩时将根据Key将消息聚合,只保留最后一次出现时的数据。这样,无论什么时候消费消息,都能拿到每个Key的最新版本的数据。压缩后的offset可能是不连续的,比如下图中没有5和7,因为这些offset的消息被merge了,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,比如,当试图获取offset为5的消息时,实际上会拿到offset为6的消息,并从这个位置开始消费。

    Kafka压缩策略

# 1.2.4 Kafka消息持久化策略及副本机制

[1] Kafka消息持久化策略

首先简要说明一下Kafka持久化消息的优点:

  • Kafka通过消息持久化解耦了消息的生产者和消费者,这也是采用队列的优势,使得生产者和消费者只需要关心自己的逻辑,而不需要直接感知彼此的存在。
  • Kafka支持对消费过的消息进行“消息重演(Message Replay)”,而重演的基础就是实现消息持久化。

Kafka为了保证消息落盘的实时,确保消息不丢失,没有采用直接写内存,等写满后一次性刷盘的策略,而是将数据立即写入文件系统的日志中,写入成功后才将结果返回给客户端告知客户端–消息已经成功写入。这么做的目的是,一方面实时地保存了数据,另一方面又减少了对内存的消耗,将内存空间尽量留给页缓存使用,从而提升了整体的性能。

[2] Kafka副本机制

Kafka为了实现服务高可靠,采用了多副本机制。这么做的目的是为了解决分布式系统都会面临的高可靠保证的问题。如果只保留一份数据,如果该节点宕机,则节点中保存的消息日志就会丢失,从而造成难以估计的问题。因此Kafka采用目前应用较为广泛的一种解决策略:即采用冗余机制,通俗的说就是 “不要把鸡蛋放在一个篮子里。”

Kafka的冗余机制简单地说就是将消息日志备份多份,而这些备份的日志在Kafka的概念中成为“副本(replica)”,之所以采用副本机制,核心的目的就是为了防止数据丢失。对于副本,又有两个角色,分别为:leader replica(领导者副本)、 follower replica(跟随者副本),Kafka的Leader选举依赖Zookeeper进行。

  • 只有leader角色的副本才对外提供服务,即只有leader副本才能够响应客户端发送的消息写入即消息消费的请求。而follower副本不对客户端提供服务,它只能被动获取leader副本的数据,保持与leader的消息同步。
  • 当出现leader副本所在的broker发生宕机,Kafka会在剩余的副本中选举新的leader继续提供服务,这也是Kafka与其他消息队列,如:RocketMQ的不同之处,也是它设计的精要及难点。

# 1.2.5 Kafka如何保证消息不丢失

Kafka提供的Producer和Consumer之间的消息传递保证语义(Message Delivery Guarantee Semantics)有以下三种

  • At least once(至少一次)——消息绝不会丢,但可能会重复传递;
  • At most once(至多一次)——消息可能会丢,但绝不会重复传递;
  • Exactly once(正好一次)——每条消息只会被精确地传递一次:既不会多,也不会少。

可以分为两个问题来看:发送消息的保证和消费消息的保证。

Kafka消息传递语义

Producer端

  • 从Producer的角度来看,At least once意味着Producer发送完一条消息后,会确认消息是否发送成功。如果Producer没有收到Broker的ack确认消息,那么会不断重试发送消息。这样就意味着消息可能被发送不止一次,也就存在这消息重复的可能性。
  • 从Producer的角度来看,At most once意味着Producer发送完一条消息后,不会确认消息是否成功送达。这样从Producer的角度来看,消息仅仅被发送一次,也就存在者丢失的可能性。
  • 从Producer的角度来看,Exactly once意味着Producer消息的发送是幂等的。这意味着不论消息重发多少遍,最终Broker上记录的只有一条不重复的数据。

[1] Producer At least once配置

Kafka默认的Producer消息送达语义就是At least once,这意味着我们不用做任何配置就能够实现At least once消息语义。

  • 原因是Kafka中默认acks=1并且retries=2147483647

[2] Producer At most once配置

  • acks=0。acks配置项表示Producer期望的Broker的确认数,默认值为1。可选项:[0,1,all]。如果设置为0,表示Producer发送完消息后不会等待任何Broker的确认;设置为1表示Producer会等待Broker集群中的leader的确认写入消息;设置为all表示Producer需要等待Broker集群中leader和其所有follower的确认写入消息。
  • retries=0。retires配置项表示当消息发送失败时,Producer重发消息的次数。默认值为2147483647。当配置了acks=0时,retries配置项就失去了作用,因此这儿可以不用配置。

[3] Exactly once配置

是Kafka从版本0.11之后提供的高级特性,可以通过配置Producer的以下配置项来实现Exactly once语义。

  • enable.idempotence=true。enable.idempotence配置项表示是否使用幂等性。当enable.idempotence配置为true时,acks必须配置为all。并且建议max.in.flight.requests.per.connection的值小于5。
  • acks=all。

Consumer端

  • 从Consumer的角度来看,At most once意味着Consumer对一条消息最多消费一次,因此有可能存在消息消费失败依旧提交offset的情况。考虑下面的情况:Consumer首先读取消息,然后提交offset,最后处理这条消息。在处理消息时,Consumer宕机了,此时offset已经提交,下一次读取消息时读到的是下一条消息了,这就是At most once消费。
  • 从Consumer的角度来看,At least once意味着Consumer对一条消息可能消费多次。考虑下面的情况:Consumer首先读取消息,然后处理这条消息,最后提交offset。在处理消息时成功后,Consumer宕机了,此时offset还未提交,下一次读取消息时依旧是这条消息,那么处理消息的逻辑又将被执行一遍,这就是At least once消费。
  • 从Consumer的角度来看,Exactly once意味着消息的消费处理逻辑和offset的提交是原子性的,即消息消费成功后offset改变,消息消费失败offset也能回滚。

[1] Consumer At least once配置

  • enable.auto.commit=false。禁止后台自动提交offset。
  • 手动调用consumer.commitSync()来提交offset。手动调用保证了offset即时更新。

通过手动提交offset,就可以实现Consumer At least once语义。

[2] Consumer At most once配置

  • enable.auto.commit=true。后台定时提交offset。
  • auto.commit.interval.ms配置为一个很小的数值。auto.commit.interval.ms表示后台提交offset的时间间隔。

通过自动提交offset,并且将定时提交时间间隔设置的很小,就可以实现Consumer At most once语义。

[3] Consumer Exactly once配置

  • isolation.level=read_committed。isolation.level表示何种类型的message对Consumer可见,也即事务机制。

# 1.2.6 Kafka如何保证消息顺序性

要想实现消息有序,需要从Producer和Consumer两方面来考虑。针对消息有序的业务需求,还分为全局有序和局部有序。

  • 全局有序:一个Topic下的所有消息都需要按照生产顺序消费。
  • 局部有序:一个Topic下的消息,只需要满足同一业务字段的要按照生产顺序消费。例如:Topic消息是订单的流水表,包含订单orderId,业务要求同一个orderId的消息需要按照生产顺序进行消费。

[1] 全局有序的实现

由于Kafka的一个Topic可以分为了多个Partition,Producer发送消息的时候,是分散在不同 Partition的。当Producer按顺序发消息给Broker,但进入Kafka之后,这些消息就不一定进到哪个Partition,会导致顺序是乱的。因此要满足全局有序,需要1个Topic只能对应1个Partition,而且对应的consumer也要使用单线程或者保证消费顺序的线程模型,否则消费端造成的消费乱序。

[2] 局部有序的实现

要满足局部有序,只需要在发消息的时候指定Partition Key,Kafka对其进行Hash计算,根据计算结果决定放入哪个Partition。这样Partition Key相同的消息会放在同一个Partition。此时,Partition的数量仍然可以设置多个,提升Topic的整体吞吐量。

[3] 消息重试对顺序消息的影响

对于一个有着先后顺序的消息A、B,正常情况下应该是A先发送完成后再发送B,但是在异常情况下,在A发送失败的情况下,B发送成功,而A由于重试机制在B发送完成之后重试发送成功了。这时对于本身顺序为AB的消息顺序变成了BA。

针对这种问题,严格的顺序消费还需要max.in.flight.requests.per.connection参数的支持。该参数指定了生产者在收到服务器响应之前可以发送多少个消,。把它设为1就可以保证消息是按照发送的顺序写入服务器的。此外,对于某些业务场景,设置max.in.flight.requests.per.connection=1会严重降低吞吐量,如果放弃使用这种同步重试机制,则可以考虑在消费端增加失败标记的记录,然后用定时任务轮询去重试这些失败的消息并做好监控报警。

# 1.3 ZooKeeper概述

# 1.3.1 ZooKeeper简介

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby的一个开源实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。它是由多个节点共同组成一个分布式集群,挂掉任意一个节点,仍然可以正常工作,客户端无感知故障,客户端向任意一个节点写入数据,其他节点可以立即看到最新的数据。

ZooKeeper架构

# 1.3.2 ZooKeeper在Kafka中的作用

Zookeeper在Kafka中的作用非常重要,主要包括以下几个方面:

  • 配置管理:Kafka的配置信息和元数据都存储在Zookeeper中,包括Kafka集群的配置信息、主题和分区信息、Broker节点的状态信息等。Kafka启动时会从Zookeeper中获取相应的配置信息和元数据。
  • Broker选举:Kafka的集群中每个Broker节点都有一个唯一的ID,Zookeeper可以通过Master选举算法帮助Kafka集群选举出一个新的Broker节点来代替已经失效的节点,以保证集群的高可用性。
  • 生产者和消费者组管理:Zookeeper可以协助Kafka管理生产者和消费者组的注册和分配。生产者在向Kafka发送消息之前需要向Zookeeper注册,消费者组需要向Zookeeper注册并分配分区。
  • 分布式锁:Kafka中的分布式锁机制也是通过Zookeeper实现的,通过Zookeeper的分布式锁机制,可以保证多个Kafka实例之间的操作不会相互冲突,从而保证数据的一致性和可靠性。

# 1.4 RabbitMQ基本介绍

# 1.4.1 RabbitMQ简介

RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议)的开源实现。它作为一个消息代理,主要负责接收并转发消息。它提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

# 1.4.2 RabbitMQ整体架构

RabbitMQ整体架构如下图所示:

RabbitMQ整体架构

基本概念的解释如下:

  • Producer(生产者):消息生产者。
  • Consumer(消费者):消息消费者。
  • Connection:生产者/消费者 和 broker 之间的 TCP连接。
  • Channel:Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel 之间是完全隔离的。
  • Brokder:接收和分发消息的应用,RabbitMQ Server就是 Message Broker。
  • Virtual host:出于多租户和安全因素设计的,把AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQserver 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建exchange/queue 等。
  • Exchange(交换机):message 到达broker的第一站,根据分发规则,匹配查询表中的routingkey,分发消息到queue 中去。
  • Queue(队列):消息最终被送到这里等待 consumer 取走。
  • Binding:exchange和queue之间的虚拟连接,Binding 中可以包含routingkey。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。

# 2. 消息队列服务的搭建

以下我将采用Docker的方式进行搭建,VPS系统用的是Debian 11 x86_64。VPS的购买及配置、Docker及Docker Compose的概念及使用...这些基本的就不再赘述了,如果不会的话见我的其他博客:VPS基本部署环境的搭建与配置 (opens new window)Docker容器化及项目环境管理 (opens new window)

# 2.1 准备Docker及Docker Compose环境

# 2.2.1 搭建Docker环境

$ apt-get update -y && apt-get install curl -y  # 安装curl
$ curl https://get.docker.com | sh -   # 安装docker
$ sudo systemctl start docker  # 启动docker服务
$ docker version # 查看docker版本(客户端要与服务端一致)
1
2
3
4

# 2.2.2 搭建Docker Compose环境

// 下载安装docker-compose,最新版见:https://github.com/docker/compose/releases
$ sudo curl -L https://github.com/docker/compose/releases/download/1.29.2/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose       
// 赋予docker-compose执行权限
$ sudo chmod +x /usr/local/bin/docker-compose
// 查看docker-compose版本号,验证是否安装成功
$ docker-compose --version
1
2
3
4
5
6

docker-compose

# 2.2 搭建RabbitMQ服务

# 2.2.1 拉取镜像并运行容器

$ docker pull rabbitmq:3.8-management
$ docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq:3.8-management
1
2

注:默认RabbitMQ镜像是不带web端管理插件的,所以指定了镜像tag为3.8-management,表示下载包含web管理插件版本镜像。

# 2.2.2 RabbitMQ创建用户并可视化查看

用Chrome访问http://ip:15672即可访问RabbitMQ的Web端管理界面,默认用户名和密码都是guest,出现如下界面代表已经成功了。

RabbitMQ

默认的 guest 账户有访问限制,只能通过本地网络访问,远程网络访问受限,所以在使用时我们一般另外添加用户。

$ docker exec -i -t rabbitmq  bin/bash  
$ rabbitmqctl add_user root 123456   // 添加用户(实际密码设置复杂一些)
$ rabbitmqctl set_permissions -p / root ".*" ".*" ".*"   // 赋予root用户所有权限
$ rabbitmqctl set_user_tags root administrator           // 赋予root用户administrator角色
$ rabbitmqctl list_users  // 查看所有用户即可看到root用户已经添加成功
$ exit 
1
2
3
4
5
6

# 2.3 搭建ZooKeeper与Kafka服务

经实测,本机部署的单机版与Docker部署的单机版Kafka,使用同样的配置参数,实际性能大致相同。

# 2.3.1 本机部署单机版Kafka

本机部署单机版Kafka可以参照官方教程:https://kafka.apache.org/quickstart (opens new window)

Step1:从 Kafka官网 (opens new window)下载最新版安装包,并解压

$ wget https://dlcdn.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
$ tar -xzf kafka_2.13-3.3.1.tgz
$ cd kafka_2.13-3.3.1
1
2
3

Step2:后台启动ZooKeeper服务

$ nohup /root/kafka/kafka_2.13-3.3.1/bin/zookeeper-server-start.sh /root/kafka/kafka_2.13-3.3.1/config/zookeeper.properties 1>/dev/null 2>&1 &
1

注:如果2181端口没被占用,可以使用默认配置启动,当然也可以根据实际情况对 zookeeper.properties 的配置进行修改。

Step3:后台启动Kafka服务

$ nohup /root/kafka/kafka_2.13-3.3.1/bin/kafka-server-start.sh /root/kafka/kafka_2.13-3.3.1/config/server.properties 1>/dev/null 2>&1 &
1

注:如果9092端口没被占用,可以使用默认配置启动,当然也可以根据实际情况对 server.properties 的配置进行修改,性能调优会用到这个。

# 2.3.2 Docker部署单机版Kafka

[1] 部署单机版 Kafka 服务

kafka的运行依赖于zookeeper,因而编写zookeeper与kafka的编排文件docker-compose.yml内容如下:

version: '3.2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    restart: always
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://IP:9092
      - KAFKA_LISTENERS=PLAINTEXT://:9092
    volumes:
      - ./docker.sock:/var/run/docker.sock
    restart: always
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

注:KAFKA_ADVERTISED_LISTENERS 填写为 PLAINTEXT://IP:9092,这里的 IP 填写成你的公网 IP,如果没带上这个的话,PC是无法连接到服务器上的 kafka 服务的。这里搭建的 kafka 服务仅用于测试,没有设置用户名及密码,勿用于公网生产环境。

编写完毕后,在该文件下的目录下依次执行下面两条命令即可构建好zookeeper和kafka容器:

$ docker-compose build     // 构建镜像
$ docker-compose up -d     // 运行容器
1
2

配置文件目录:/opt/kafka_2.13-2.8.1/config

[2] 验证Kafka是否搭建成功

进入到kafka容器中 并创建topic生产者,执行如下命令:

$ docker exec -it kafka /bin/bash
$ cd /opt/kafka_2.13-2.8.1/bin/
$ ./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 8 --topic test
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
1
2
3
4

执行上述命令后,另起一个窗口,执行如下命令,创建kafka消费者消费消息。

$ docker exec -it kafka /bin/bash
$ cd /opt/kafka_2.13-2.8.1/bin/
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
1
2
3

执行完上诉命令后,在生产者窗口中输入任意内容回车,即可在消费者的窗口查看到消息。

注:kafka_2.13-2.8.1的含义为,2.13是Scala版本,2.8.1是Kafka版本。

# 2.3.3 Docker部署集群版Kafka

把编排文件docker-compose.yml修改成如下内容,即可部署集群版Kafka(如下是3个节点,如果需要更多可以在后面继续追加)

version: '3.3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - 2181:2181
    volumes:
      - ./data/zookeeper/data:/data
      - ./data/zookeeper/datalog:/datalog
      - ./data/zookeeper/logs:/logs
    restart: always
  kafka1:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    container_name: kafka1
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://IP:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_LOG_DIRS: /data/kafka-data
      KAFKA_LOG_RETENTION_HOURS: 168
    volumes:
      - ./data/kafka1/data:/data/kafka-data
    restart: unless-stopped  
  kafka2:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    container_name: kafka2
    ports:
      - 9093:9093
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://IP:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
      KAFKA_LOG_DIRS: /data/kafka-data
      KAFKA_LOG_RETENTION_HOURS: 168
    volumes:
      - ./data/kafka2/data:/data/kafka-data
    restart: unless-stopped
  kafka3:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    container_name: kafka3
    ports:
      - 9094:9094
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://IP:9094
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
      KAFKA_LOG_DIRS: /data/kafka-data
      KAFKA_LOG_RETENTION_HOURS: 168
    volumes:
      - ./data/kafka3/data:/data/kafka-data
    restart: unless-stopped
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

# 2.3.4 搭建SASL账号密码验证的Kafka

自0.9.0.0版本开始Kafka社区添加了许多功能用于提高Kafka的安全性,Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现,此处主要介绍SASL方式。

新建一个目录,放置以下4个文件(需要改动的只有server_jaas.conf)

$ mkdir -p ./kafka-sasl/conf
1

zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/opt/zookeeper-3.4.13/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zookeeper.sasl.client=true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

server_jaas.conf

Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="your_password";
};

Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="your_password"
    user_super="your_password"
    user_admin="your_password";
};

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="your_password"
    user_admin="your_password";
};

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="your_password";
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

log4j.properties

# Define some default values that can be overridden by system properties
zookeeper.root.logger=INFO, CONSOLE
zookeeper.console.threshold=INFO
zookeeper.log.dir=.
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=DEBUG
zookeeper.tracelog.dir=.
zookeeper.tracelog.file=zookeeper_trace.log

#
# ZooKeeper Logging Configuration
#

# Format is "<default threshold> (, <appender>)+

# DEFAULT: console appender only
log4j.rootLogger=${zookeeper.root.logger}

# Example with rolling log file
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE

# Example with rolling log file and tracing
#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE

#
# Log INFO level and above messages to the console
#
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n

#
# Add ROLLINGFILE to rootLogger to get log file output
#    Log DEBUG level and above messages to a log file
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold}
log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file}

# Max log file size of 10MB
log4j.appender.ROLLINGFILE.MaxFileSize=10MB
# uncomment the next line to limit number of backup files
log4j.appender.ROLLINGFILE.MaxBackupIndex=10

log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n


#
# Add TRACEFILE to rootLogger to get log file output
#    Log DEBUG level and above messages to a log file
log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
log4j.appender.TRACEFILE.Threshold=TRACE
log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file}

log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
### Notice we are including log4j's NDC here (%x)
log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

configuration.xsl

<?xml version="1.0"?>
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
<xsl:output method="html"/>
<xsl:template match="configuration">
<html>
<body>
<table border="1">
<tr>
 <td>name</td>
 <td>value</td>
 <td>description</td>
</tr>
<xsl:for-each select="property">
<tr>
  <td><a name="{name}"><xsl:value-of select="name"/></a></td>
  <td><xsl:value-of select="value"/></td>
  <td><xsl:value-of select="description"/></td>
</tr>
</xsl:for-each>
</table>
</body>
</html>
</xsl:template>
</xsl:stylesheet>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

然后再创建一个 Docker Compose 编排文件。

docker-compose.yml

version: "3"

services:

  zookeeper:
    image: wurstmeister/zookeeper
    hostname: zookeeper_sasl
    container_name: zookeeper_sasl
    restart: always
    ports:
      - 2181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      SERVER_JVMFLAGS: -Djava.security.auth.login.config=/opt/zookeeper-3.4.13/secrets/server_jaas.conf
    volumes:
      - ./kafka-sasl/conf:/opt/zookeeper-3.4.13/conf
      - ./kafka-sasl/conf/:/opt/zookeeper-3.4.13/secrets/ 

  kafka:
    image: wurstmeister/kafka:2.11-0.11.0.3
    restart: always
    hostname: broker
    container_name: kafka_sasl
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://IP:9092
      KAFKA_ADVERTISED_PORT: 9092 
      KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
      KAFKA_PORT: 9092 
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
      KAFKA_SUPER_USERS: User:admin
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" #设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问,默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问
      KAFKA_ZOOKEEPER_CONNECT: zookeeper_sasl:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_OPTS: -Djava.security.auth.login.config=/opt/kafka/secrets/server_jaas.conf
    volumes:
      - ./kafka-sasl/conf/:/opt/kafka/secrets/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

编写完毕后,在该文件下的目录下依次执行下面两条命令即可构建好zookeeper和kafka容器:

$ docker-compose build     // 构建镜像
$ docker-compose up -d     // 运行容器
1
2

代码请求测试:

# -*- coding: utf-8 -*-

import time
import json
from datetime import datetime
from kafka import KafkaProducer


def producer_event(server_info):
    producer = KafkaProducer(bootstrap_servers=server_info,
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='PLAIN',
                             sasl_plain_username='admin',
                             sasl_plain_password='your_password')
    topic = "test_kafka_topic"
    print("kafka连接成功")
    for i in range(7200):
        data = {
            "name": "hello world"
        }
        data_json = json.dumps(data)
        producer.send(topic, data_json.encode()).get(timeout=30)
        print("数据推送成功,当前时间为:{},数据为:{}".format(datetime.now(), data_json))
        time.sleep(1)
    producer.close()


server = "IP:9092"
producer_event(server)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 2.3.5 搭建Kafka管理平台

[1] kafka-map

kafka-map是一个美观简洁且强大的kafka web管理工具。

项目地址:https://github.com/dushixiang/kafka-map (opens new window)

docker run -d \
    -p 8080:8080 \
    -v /root/kafka-map/data:/usr/local/kafka-map/data \
    -e DEFAULT_USERNAME=your_user \
    -e DEFAULT_PASSWORD=your_password \
    --name kafka-map \
    --restart always dushixiang/kafka-map:latest
1
2
3
4
5
6
7

用Chrome访问http://ip:8080即可访问 kafka-map 管理界面

kafka-map

注:如果配置了2.3.4节的SASL账号密码验证,这里安全验证选择“SASL_PLAINTEXT”,协议机制选择“PLAIN”(虽然连上了,但一些功能不好使了)

[2] kafka-manager

kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源,用户可以在Web界面执行一些简单的集群管理操作。

$ docker pull sheepkiller/kafka-manager
$ docker run --name kafka-manager -itd -p 9000:9000 -e ZK_HOSTS="IP:2181" sheepkiller/kafka-manager  // 把IP处换成你的服务器IP地址
1
2

用Chrome访问http://ip:9000即可访问 kafka-manager 管理界面

kafka管理面板-1

连接kafka:点击Cluster,选择Add Cluster,填写Cluster Name(随便起)、Cluster Zookeeper Hosts(zookeeper地址)保存即可。

kafka管理面板-2

[3] KnowStreaming

Know Streaming是一套云原生的Kafka管控平台,脱胎于众多互联网内部多年的Kafka运营实践经验,专注于Kafka运维管控、监控告警、资源治理、多活容灾等核心场景。在用户体验、监控、运维管控上进行了平台化、可视化、智能化的建设,提供一系列特色的功能,极大地方便了用户和运维人员的日常使用。

项目地址:https://github.com/didi/KnowStreaming (opens new window)

官方的一键脚本会将所部署机器上的 MySQL、JDK、ES 等进行删除重装。因此不建议使用它进行部署,下面采用手动部署的方式。

Step0:准备MySQL、ElasticSearch、JDK等基础环境

软件名 版本要求
MySQL v5.7 或 v8.0
ElasticSearch v7.6+
JDK v8+

注:这些环境我之前都用Docker搭建过了,我的版本是MySQL5.7、ElasticSearch7.16.2(KnowStreaming目前不支持使用设置了密码的ES,如果设置了就另外再搭一个吧)、JDK8(官方推荐JDK11,但是JDK8也可以用)

Step1:下载安装包并解压

// 下载安装包
$ wget https://s3-gzpu.didistatic.com/pub/knowstreaming/KnowStreaming-3.0.0-beta.1.tar.gz
// 解压安装包到指定目录
$ tar -zxf KnowStreaming-3.0.0-beta.1.tar.gz -C /data/
1
2
3
4

Step2:导入MySQL数据和ES索引结构

$ cd /data/KnowStreaming

用Navicat创建数据库,create database know_streaming;
打开./init/sql目录,然后执行里面的这5个sql文件,ddl-ks-km.sql、ddl-logi-job.sql、ddl-logi-security.sql、dml-ks-km.sql、dml-logi.sql

打开 ./bin目录,修改一下init_es_template.sh文件里的ES连接信息,执行该脚本。
1
2
3
4
5
6

Step3:修改配置文件

$ cd /data/KnowStreaming
$ vim ./conf/application.yml

修改监听端口、MySQL及ES连接信息
1
2
3
4

Step4:启动项目

在bin目录有官方提供的启动脚本,但我这里因为没用它的那个方式进行搭建JDK,执行该脚本时报错,这里就不用它了。该项目就是个很常规的Java项目,自己启动就行了。

我这里把conf目录的配置文件都剪切到了libs目录,将其与jar包放置在一起,在bin目录写了个start.sh脚本用于启动程序。

#!/bin/bash

#define default variable
app_path="/data/KnowStreaming/libs"
app_log="/data/KnowStreaming/app.log"

if [ -e $app_log ]; then
	touch ${app_log}
fi

#goto directory
cd ${app_path}

#start app
nohup java -jar *.jar  1>${app_log} &
tail -fn 100 ${app_log}
exit 0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

启动后,访问http://ip:port地址访问即可,默认账号及密码:admin / admin2022_ 进行登录(另注:v3.0.0-beta.2版本开始,默认账号密码为admin / admin)。若要停止该项目,lsof -i:port搭配 kill -9 PID使用即可。

KnowStreaming

# 2.3.6 不停机查看及修改消息保留时长

需求情景:生产者程序将处理后的数据存入Kafka,但消费者的处理能力不行,数据有大量积压。磁盘还有大量空间,为了防止丢数据,需要在不停机的情况下修改kafka的消息保留时长。

基于时间保留:通过保留期属性,消息就有了TTL(time to live 生存时间)。到期后,消息被标记为删除,从而释放磁盘空间。对于kafka主题中所有消息具有相同的生存时间,但可以在创建主题之前设置属性,或对已存在的主题在运行时修改属性。Kafka支持配置保留策略,可以通过以下三个时间配置属性中的一个来进行调整:log.retention.hourslog.retention.minuteslog.retention.ms,Kafka用更高精度值覆盖低精度值,所以log.retention.ms具有最高的优先级。

以2.3.4节搭建的kafka为例,演示如何查看及不停机修改消息保留时长。

[1] 查看全局的消息保留时长

$ docker exec -it kafka_sasl /bin/bash
$ cd  /opt/kafka_2.11-0.11.0.3
$ grep -i 'log.retention.[hms].*\=' config/server.properties
log.retention.hours=168
1
2
3
4

[2] 不停机修改某个Topic的消息保留时长并查看

$ docker exec -it kafka_sasl /bin/bash
$ cd  /opt/kafka_2.11-0.11.0.3/bin
$ ./kafka-configs.sh --zookeeper zookeeper_sasl:2181 --alter --entity-name yoyo_admin_topic --entity-type topics --add-config retention.ms=60000
Completed Updating config for entity: topic 'yoyo_admin_topic'.
$ ./kafka-topics.sh --describe --zookeeper zookeeper_sasl:2181 --topic yoyo_admin_topic
Topic:yoyo_admin_topic  PartitionCount:1        ReplicationFactor:1     Configs:retention.ms=60000
        Topic: yoyo_admin_topic Partition: 0    Leader: 0       Replicas: 0     Isr: 0
1
2
3
4
5
6
7

注意事项:

  • kafka集群的情况:如果是kafka集群,改一个节点的即可,其他的都会跟着一同改变。
  • kafka用更高精度值覆盖低精度值:默认就是log.retention.hours=168,所以修改时不能使用retention.hours属性,需要换算成更高精度的minutes、ms。常用的ms值:86400000(1天)、259200000(3天)、604800000(7天)、25920000000(30天)
  • 需要修改的地方:将zookeeper_sasl:2181换成实际的zookeeper地址,将yoyo_admin_topic换成实际的topic,为了快速看到效果,保留时长仅设置了60000ms,正式修改按照实际的来。
  • 测试流程:提前在topic里写入数据,然后修改topic的消息保留时长并查看,1分钟后去查看该topic的消息是否还存在,发现消息已经被删除了。

# 2.3.7 Kafka分区数应设置多少及默认配置

kafka的每个topic都可以创建多个partition,理论上partition的数量无上限。通常情况下,越多的partition会带来越高的吞吐量,但是同时也会给broker节点带来相应的性能损耗和潜在风险,虽然这些影响很小,但不可忽略,所以确定partition的数量需要权衡一些因素。

[1] 越多的partition可以提供更高的吞吐量

  • 单个partition是kafka并行操作的最小单元。每个partition可以独立接收推送的消息以及被consumer消费,相当于topic的一个子通道,partition和topic的关系就像高速公路的车道和高速公路的关系一样,起始点和终点相同,每个车道都可以独立实现运输,不同的是kafka中不存在车辆变道的说法,入口时选择的车道需要从一而终。
  • kafka的吞吐量显而易见,在资源足够的情况下,partition越多速度越快。这里提到的资源充足解释一下,假设我现在一个partition的最大传输速度为p,目前kafka集群共有三个broker,每个broker的资源足够支撑三个partition最大速度传输,那我的集群最大传输速度为3*3*p=9p。

[2] 越多的分区需要打开更多的文件句柄

  • 在kafka的broker中,每个分区都会对照着文件系统的一个目录。
  • 在kafka的数据日志文件目录中,每个日志数据段都会分配两个文件,一个索引文件和一个数据文件。因此,随着partition的增多,需要的文件句柄数急剧增加,必要时需要调整操作系统允许打开的文件句柄数。

[3] 更多的分区会导致端对端的延迟

  • kafka端对端的延迟为producer端发布消息到consumer端消费消息所需的时间,即consumer接收消息的时间减去produce发布消息的时间。
  • kafka在消息正确接收后才会暴露给消费者,即在保证in-sync副本复制成功之后才会暴露,瓶颈则来自于此。
  • leader broker上的副本从其他broker的leader上复制数据的时候只会开启一个线程,假设partition数量为n,每个副本同步的时间为1ms,那in-sync操作完成所需的时间即n * 1ms,若n为10000,则需要10秒才能返回同步状态,数据才能暴露给消费者,这就导致了较大的端对端的延迟。

[4] 越多的partition意味着需要更多的内存

  • 在新版本的kafka中可以支持批量提交和批量消费,而设置了批量提交和批量消费后,每个partition都会需要一定的内存空间。
  • 无限的partition数量很快就会占据大量的内存,造成性能瓶颈。假设每个partition占用的内存为100k,当partition为100时,producer端和consumer端都需要10M的内存;当partition为100000时,producer端和consumer端则都需要10G内存。

[5] 越多的partition会导致更长时间的恢复期

  • kafka通过多副本复制技术,实现kafka的高可用性和稳定性。每个partition都会有多个副本存在于多个broker中,其中一个副本为leader,其余的为follower。
  • kafka集群其中一个broker出现故障时,在这个broker上的leader会需要在其他broker上重新选择一个副本启动为leader,这个过程由kafka controller来完成,主要是从Zookeeper读取和修改受影响partition的一些元数据信息。
  • 通常情况下,当一个broker有计划的停机,该broker上的partition leader会在broker停机前有次序的一一移走,假设移走一个需要1ms,10个partition leader则需要10ms,这影响很小,并且在移动其中一个leader的时候,其他九个leader是可用的。因此实际上每个partition leader的不可用时间为1ms。但是在宕机情况下,所有的10个partition
  • leader同时无法使用,需要依次移走,最长的leader则需要10ms的不可用时间窗口,平均不可用时间窗口为5.5ms,假设有10000个leader在此宕机的broker上,平均的不可用时间窗口则为5.5s。
  • 更极端的情况是,当时的broker是kafka controller所在的节点,那需要等待新的kafka leader节点在投票中产生并启用,之后新启动的kafka leader还需要从zookeeper中读取每一个partition的元数据信息用于初始化数据。在这之前partition leader的迁移一直处于等待状态。

可以在/config/sever.properties配置文件中,设置默认分区数,以后每次创建topic默认都是分区数。

以2.3.4节搭建的kafka为例,演示如何修改该配置:

$ docker exec -it kafka_sasl /bin/bash
$ cd /opt/kafka_2.13-2.8.1/bin/config
$ vi server.properties
1
2
3

sever.properties里有如下配置,默认分区数为1,我们可以根据自己需要进行修改

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=10
1
2
3
4

之后退出容器并重启容器

$ exit
$ docker restart kafka_sasl
1
2

# 3. 使用Springboot操作Kafka

# 3.1 基本使用示例

以下用一个简单的使用示例,演示一下Springboot整合Kafka完成生产消费过程,示例项目结构如下:

.
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── yoyo
│   │   │           └── springbootkafka
│   │   │               ├── SpringbootKafkaApplication.java
│   │   │               ├── consumer
│   │   │               │   └── KafkaConsumer.java
│   │   │               ├── domain
│   │   │               │   └── User.java
│   │   │               └── producer
│   │   │                   └── KafkaProducer.java
│   │   └── resources
│   │       └── application.yml
│   └── test
│       └── java
│           └── com
│               └── yoyo
│                   └── springbootkafka
│                       └── SpringbootKafkaApplicationTests.java
└── pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 3.1.1 项目依赖及配置文件

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.sxw</groupId>
    <artifactId>springboot-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-kafka</name>
    <description>springboot kafka demo</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.2.2.RELEASE</version>
            </plugin>
        </plugins>
    </build>

</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

application.yml

spring:
  application:
    name: springboot-kafka
  kafka:
    # 指定kafka代理地址,可以多个
    bootstrap-servers: ip:9092
    producer:
      retries: 0
      # 每次批量发送消息的数量
      batch-size: 16384
      # 缓存容量
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 指定默认消费者group id
      group-id: consumer-tutorial
      auto-commit-interval: 100
      auto-offset-reset: earliest
      enable-auto-commit: true
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 指定listener 容器中的线程数,用于提高并发量
    listener:
      concurrency: 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 3.1.2 生产消费过程的测试代码

KafkaProducer.java

package com.yoyo.springbootkafka.producer;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * 消息生产者
 */
@Component
public class KafkaProducer<T> {

    private Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * kafka 发送消息
     *
     * @param obj 消息对象
     */
    public void send(T obj) {
        String jsonObj = JSON.toJSONString(obj);
        logger.info(">>> message = {}", jsonObj);

        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("kafka.tut", jsonObj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.info("Produce: The message failed to be sent:" + throwable.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //TODO 业务处理
                logger.info("Produce: The message was sent successfully:");
                logger.info("Produce: >>> result: " + stringObjectSendResult.toString());
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

KafkaConsumer.java

package com.yoyo.springbootkafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * 消息消费者
 */
@Component
public class KafkaConsumer<T> {

    private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    /**
     * 监听kafka.tut 的 topic
     *
     * @param record
     * @param topic  topic
     */
    @KafkaListener(id = "tut", topics = "kafka.tut")
    public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        //判断是否NULL
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {
            //获取消息
            Object message = kafkaMessage.get();

            logger.info("Receive: +++++++++++++++ Topic:" + topic);
            logger.info("Receive: +++++++++++++++ Record:" + record);
            logger.info("Receive: +++++++++++++++ Message:" + message);
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

User.java

package com.yoyo.springbootkafka.domain;

import lombok.Data;

import java.util.Date;

@Data
public class User {

    private long id;
    private String msg;
    private Date sendTime;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

SpringbootKafkaApplication.java

package com.yoyo.springbootkafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbootKafkaApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringbootKafkaApplication.class, args);
	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13

SpringbootKafkaApplicationTests.java

package com.yoyo.springbootkafka;

import com.yoyo.springbootkafka.domain.User;
import com.yoyo.springbootkafka.producer.KafkaProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Date;
import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootKafkaApplicationTests {

	@Autowired
	private KafkaProducer<User> kafkaSender;

	@Test
	public void kafkaSend() throws InterruptedException {
		// 模拟发送消息
		for (int i = 0; i < 5; i++) {
			User user = new User();
			user.setId(System.currentTimeMillis());
			user.setMsg(UUID.randomUUID().toString());
			user.setSendTime(new Date());
			kafkaSender.send(user);
			Thread.sleep(3000);
		}
	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 3.1.3 启动测试类查看效果

配置好之后,在 SpringbootKafkaApplicationTests.java 文件处启动项目,我们可以看到如下输出,成功完成了生产消费过程。

Springboot与Kafka整合的测试

# 3.2 生产者与消费者的封装

为了方便在项目中调用kafka实现生产消费过程,这里对kafka进行模块封装,封装模块的项目结构如下:

.
├── src
│   └──main.java.com.yoyo.admin.kafka_common
│       ├── consumer
│       │   └── KafkaConsumer.java
│       └── producer
│           └── KafkaAnalyzeProducer.java
|           └── SendCallBack.java
└── pom.xml
1
2
3
4
5
6
7
8
9

# 3.2.1 项目依赖及配置文件

pom.xml

		<!-- kafka依赖 -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<!-- lombok依赖 让代码更简洁 https://www.projectlombok.org/-->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
1
2
3
4
5
6
7
8
9
10

application.properties

## kafka config
spring.kafka.bootstrap-servers=127.0.0.1:9092
# encryption config
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='your_username' password='your_password';
# default topic config
spring.kafka.template.default-topic=your_topic
# producer config
# number of retries, set a value greater than 0, then the client will resend the records that failed to be sent
spring.kafka.producer.retries=3
# batch size, 16k
spring.kafka.producer.batch-size=16384
# buffer storage, 32m
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1
# specify the message key and decoding way of the message body
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# consumer config
spring.kafka.consumer.group-id=spring_customer
# whether autocommit (manual submitted to shut down, otherwise an error)
spring.kafka.consumer.enable-auto-commit=false
# consumption offset configuration
# none: If the value of the previous offset is not found for the consumer, that is, the offset is not automatically maintained or the offset is manually maintained, an exception is thrown
# earliest: When there is a submitted offset under each partition: start consumption from the offset; when there is no submitted offset under each partition: start consumption from the beginning
# latest: When there is a submitted offset under each partition: start consumption from the offset; when there is no submitted offset under each partition: start consumption from the latest data
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# listener config
# record: submitted after each record is processed by the consumer listener (ListenerConsumer)
# batch: submitted after each batch of poll() data is processed by ListenerConsumer
# time: Submit when the data of each batch of poll() is processed by ListenerConsumer and the time since the last submission is greater than TIME
# count: Submit when the number of processed records is greater than or equal to COUNT after each batch of poll() data is processed by ListenerConsumer
# count_time: Submit when one of the conditions in TIME or COUNT is met
# manual: After each batch of poll() data is processed by ListenerConsumer, manually call Acknowledgment.acknowledge() and submit
# manual_immediate: Submit immediately after manually calling Acknowledgment.acknowledge(), which is generally recommended
spring.kafka.listener.ack-mode=manual_immediate
# the number of threads running in the listener container.
spring.kafka.listener.concurrency=1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 3.2.2 kafka模块封装

./consumer/KafkaConsumer.java

package com.yoyo.admin.kafka_common.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
 * kafka消费者
 */
@Component
@Slf4j
public class KafkaConsumer {

    /**
     * kafka的监听器 消费消息
     * @param record
     * @param item
     */
    @KafkaListener(topics = "yoyo_admin_topic", groupId = "spring_customer")
    public void topicListener(ConsumerRecord<String, String> record, Acknowledgment item) {
        log.info("开始消费:{}==,{}==,{};",record.topic(),record.partition(),record.value());
        item.acknowledge(); // 手动提交
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

./producer/KafkaAnalyzeProducer.java

package com.yoyo.admin.kafka_common.producer;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SuccessCallback;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * kafka生产者
 */
@Slf4j
@Component
public class KafkaAnalyzeProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 发送数据到kafka
     *
     * @param topic        topic名称
     * @param message      发送信息字符串
     * @param sendCallBack 发送回调
     */
    public void send(String topic, String message, SendCallBack sendCallBack) {

        ListenableFuture listenableFuture = kafkaTemplate.send(topic, message);
        // 发送成功后回调
        SuccessCallback<String> successCallback = new SuccessCallback() {
            @SneakyThrows
            @Override
            public void onSuccess(Object result) {
                sendCallBack.sendSuccessCallBack(topic, message);
            }
        };
        // 发送失败回调
        FailureCallback failureCallback = new FailureCallback() {
            @SneakyThrows
            @Override
            public void onFailure(Throwable ex) {
                sendCallBack.sendFailCallBack(topic, message, ex);
            }
        };

        listenableFuture.addCallback(successCallback, failureCallback);
    }

    /**
     * producer 同步方式发送数据
     *
     * @param topic   topic名称
     * @param message producer发送的数据
     */
    public void sendSynchronize(String topic, String message) {
        kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("----事件kafka记录解析完成放入topic:{},发送失败{}", topic, message, throwable);
            }

            @Override
            public void onSuccess(Object o) {
                log.info("----事件kafka记录解析完成放入topic:{},发送成功:{}", topic, message);
            }
        });
    }

    /**
     * producer 异步方式发送数据
     *
     * @param topic   topic名称
     * @param message producer发送的数据
     */
    public void sendAsynchronize(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
        kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

./producer/SendCallBack.java

package com.yoyo.admin.kafka_common.producer;

import java.text.ParseException;

public interface SendCallBack {

    /**
     * 生产成功回调
     * @param topic topic
     * @param msg 信息字符串
     */
    void sendSuccessCallBack(String topic,String msg) throws ParseException;

    /**
     * 生产失败回调
     * @param topic topic
     * @param msg 信息字符串
     * @param ex 异常
     */
    void sendFailCallBack(String topic,String msg,Throwable ex) throws ParseException;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 3.2.3 kafka生产消费过程

KafkaController.java

package com.yoyo.admin.web_manage.controller;

import com.alibaba.fastjson.JSON;
import com.yoyo.admin.common.utils.ResultDataUtils;
import com.yoyo.admin.kafka_common.producer.KafkaAnalyzeProducer;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;


@Api(tags = "Kafka数据管理")
@RestController
@RequestMapping("/api/kafka")
public class KafkaController {

    @Autowired
    private KafkaAnalyzeProducer kafkaAnalyzeProducer;

    @ApiOperation("向Kafka的指定Topic同步发送消息")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "topic", value = "Topic路径", dataType = "String", required = true, paramType = "body"),
            @ApiImplicitParam(name = "message", value = "消息数据", dataType = "Map", required = true, paramType = "body"),
    })
    @PostMapping("/sendKafkaSyncMessage")
    public ResponseEntity<?> sendKafkaSyncMessage(@RequestBody Map<String, Object> data) {
        String topic = data.get("topic").toString();
        String message = JSON.toJSONString(data.get("message"));
        try {
            // 向Kafka的指定Topic同步发送消息
            kafkaAnalyzeProducer.sendSynchronize(topic, message);
            return ResultDataUtils.success();
        } catch (Exception ex) {
            return ResultDataUtils.error(ex.getMessage());
        }
    }

    @ApiOperation("向Kafka的指定Topic异步发送消息")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "topic", value = "Topic路径", dataType = "String", required = true, paramType = "body"),
            @ApiImplicitParam(name = "message", value = "消息数据", dataType = "Map", required = true, paramType = "body"),
    })
    @PostMapping("/sendKafkaAsyncMessage")
    public ResponseEntity<?> sendKafkaAsyncMessage(@RequestBody Map<String, Object> data) {
        String topic = data.get("topic").toString();
        String message = JSON.toJSONString(data.get("message"));
        try {
            // 向Kafka的指定Topic异步发送消息
            kafkaAnalyzeProducer.sendAsynchronize(topic, message);
            return ResultDataUtils.success();
        } catch (Exception ex) {
            return ResultDataUtils.error(ex.getMessage());
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

# 4. 使用Python操作Kafka

# 4.1 kafka-python库简介

项目简介:Apache Kafka 分布式流处理系统的 Python 客户端。kafka-python 的设计功能与官方 java 客户端非常相似,带有一些 pythonic 接口。

项目地址:https://github.com/dpkp/kafka-python (opens new window)

官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/modules.html (opens new window)

$ pip install kafka-python
1

# 4.2 生产者与消费者示例

# 4.2.1 生产者示例

# -*- coding: utf-8 -*-

import json
from kafka import KafkaProducer


class KProducer:
    def __init__(self, bootstrap_servers, topic):
        """
        kafka_deploy 生产者
        :param bootstrap_servers: 地址
        :param topic:  topic
        """
        # 参数说明详见:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda m: json.dumps(m).encode('ascii'),   # json 格式化发送的内容
            acks=1,                     # 在考虑请求完成之前,生产者要求leader收到的确认数量。默认值:1
            buffer_memory=33554432,    # 用于缓冲等待发送到服务器的记录的内存总字节数,默认值:33554432(32MB)
            max_block_ms=60000,        # 阻塞的毫秒数,默认值:60000
            retries=3,                 # 重试次数,默认值:0
            batch_size=16384,          # 一个批次可以使用的内存大小,按照字节数计算,默认值:16384(16KB)
            linger_ms=10,              # 生产者在发送批次之前等待更多消息加入批次的时间,默认值0
            max_request_size=1048576)  # 请求的最大大小,默认值:1048576(1MB)
        self.topic = topic

    def sync_producer(self, data_li: list, partition):
        """
        同步发送 数据
        :param data_li:  发送数据
        :return:
        """
        for data in data_li:
            future = self.producer.send(self.topic, value=data, partition=partition)
            record_metadata = future.get(timeout=10)  # 同步确认消费
            partition = record_metadata.partition  # 数据所在的分区
            offset = record_metadata.offset  # 数据所在分区的位置
            print('save success, partition: {}, offset: {}'.format(partition, offset))

    def asyn_producer(self, data_li: list, partition):
        """
        异步发送数据
        :param data_li:发送数据
        :return:
        """
        for data in data_li:
            self.producer.send(self.topic, value=data, partition=partition)
        self.producer.flush()  # 批量提交

    def asyn_producer_callback(self, data_li: list, partition):
        """
        异步发送数据 + 发送状态处理
        :param data_li:发送数据
        :return:
        """
        for data in data_li:
            self.producer.send(self.topic, value=data, partition=partition).add_callback(self.send_success).add_errback(self.send_error)
        self.producer.flush()  # 批量提交

    def send_success(self, *args, **kwargs):
        """异步发送成功回调函数"""
        print('save success')
        return

    def send_error(self, *args, **kwargs):
        """异步发送错误回调函数"""
        print('save error')
        return

    def close_producer(self):
        try:
            self.producer.close()
        except:
            pass


if __name__ == '__main__':
  	partition = 0
    send_data_li = [{"test": 1}, {"test": 2}]
    kp = KProducer(topic='topic', bootstrap_servers='127.0.0.1:9092')

    # 同步发送
    # kp.sync_producer(send_data_li, partition)

    # 异步发送
    # kp.asyn_producer(send_data_li, partition)

    # 异步+回调
    kp.asyn_producer_callback(send_data_li, partition)

    kp.close_producer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

# 4.2.2 消费者示例

基本消费过程:

# -*- coding: utf-8 -*-

from kafka import KafkaConsumer


if __name__ == '__main__':

    # 创建一个消费者,指定了topic,bootstrap_servers,这种方式只会获取新产生的数据

    bootstrap_server_list = [
        '127.0.0.1:9092'
    ]

    consumer = KafkaConsumer(
        # kafka 集群地址
        bootstrap_servers=','.join(bootstrap_server_list),
        enable_auto_commit=True,  # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
        auto_commit_interval_ms=5000,  # 自动提交的周期(毫秒)
    )

    consumer.subscribe(["topic"])  # 消息的主题,可以指定多个

    for msg in consumer:  # 迭代器,等待下一条消息
        print(msg)  # 打印消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

在指定topic指定partition上从指定offset开始读取数据:

# -*- coding: utf-8 -*-

import time
from kafka import KafkaConsumer, TopicPartition

if __name__ == '__main__':

    # 创建一个消费者
    bootstrap_server_list = [
        '127.0.0.1:9092', '127.0.0.1:9093', '127.0.0.1:9094', '127.0.0.1:9095', '127.0.0.1:9096'
    ]
    consumer = KafkaConsumer(
        # kafka 集群地址
        bootstrap_servers=','.join(bootstrap_server_list),
        enable_auto_commit=True,  # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
        auto_commit_interval_ms=5000,  # 自动提交的周期(毫秒)
    )

    # 在指定topic指定partition上从指定offset开始读取数据
    consumer.assign([TopicPartition(topic='dog_gene_data_topic_1', partition=0)])
    print(consumer.assignment())  # 获取指定的要消费的分区
    # 获取给定分区的第一个偏移量
    print(consumer.beginning_offsets(consumer.assignment()))
    # 针对分区,指定抓取的偏移量
    offset = 10000
    consumer.seek(TopicPartition(topic='topic', partition=0), offset)
    for msg in consumer:
        recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        print(recv)
        time.sleep(0.1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 4.3 请求SASL账号密码验证的kafka

# -*- coding: utf-8 -*-

import time
import json
from datetime import datetime
from kafka import KafkaProducer


def producer_event(server_info):
    producer = KafkaProducer(bootstrap_servers=server_info,
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='PLAIN',
                             sasl_plain_username='admin',
                             sasl_plain_password='your_password')
    topic = "test_kafka_topic"
    print("kafka连接成功")
    for i in range(7200):
        data = {
            "name": "hello world"
        }
        data_json = json.dumps(data)
        producer.send(topic, data_json.encode()).get(timeout=30)
        print("数据推送成功,当前时间为:{},数据为:{}".format(datetime.now(), data_json))
        time.sleep(1)
    producer.close()


server = "IP:9092"
producer_event(server)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 4.4 Kafka生产者性能调优

# 4.4.1 设置数据压缩类型

在代码里添加 compression_type 的配置

compression_type='lz4'     # 生产者生成的所有数据压缩类型。有效值为“gzip”、“snappy”、“lz4”或“none”
1

注意事项:

1)需要单独安装库包用于压缩

$ pip install python-snappy==0.6.1
$ pip install lz4==4.0.2
1
2

2)压缩算法的选择

查阅资料:吞吐量要求比较高的建议使用 lz4,网络带宽和磁盘资源不充足的建议使用 snappy

对比维度 结果
压缩比 zstd > lz4 > gzip > snappy
压缩解压缩吞吐量 lz4> snappy > zstd > gzip

实际测试:compression_type 支持的3种压缩算法中,lz4 的压缩率最高,压缩后的吞吐量也最高。

# 4.4.2 调整缓冲池及批次大小等参数

调整成合适的 buffer_memory、batch_size、max_request_size,以下只是示例,需要根据实际业务情况及服务器配置而定。

buffer_memory=3355443200,    # 用于缓冲等待发送到服务器的记录的内存总字节数,默认值:33554432(32MB)
batch_size=20971520,         # 一个批次可以使用的内存大小,按照字节数计算,默认值:16384(16KB)
max_request_size=20971520    # 请求的最大大小,默认值:1048576(1MB)
1
2
3

# 4.4.3 调整消息发送延时

linger.ms,这个值默认是 0,意思就是消息必须立即被发送。一般设置个 5-100 毫秒。如果 linger.ms 设置的太小,会导致频繁网络请求,吞吐量下降;如果 linger.ms 太长, 会导致一条消息需要等待很久才能被发送出去,增加网络延时。

linger_ms=50                 # 生产者在发送批次之前等待更多消息加入批次的时间,默认值0
1

注:这个值设置成0的话,会导致频繁请求,实际执行速度不如主动设置几十毫秒延时。

# 5. 参考资料

[1] 为什么需要消息队列?使用消息队列有什么好处?from 51CTO (opens new window)

[2] 消息队列常见的 5 个应用场景 from segmentfault (opens new window)

[3] 也许你真的不懂RabbitMQ和Kafka的区别 from 腾讯云 (opens new window)

[4] 什么时候用RabbitMQ 什么时候用Apache Kafka?from CSDN (opens new window)

[5] 常用消息队列总结和区别 from 智能后端和架构 (opens new window)

[6] Kafka的简介 from cnblogs (opens new window)

[7] 入门 Kafka 你所需要了解的基本概念和开发模式 from segmentfault (opens new window)

[8] 全网最详细RabbitMQ基本概念 from 稀土掘金 (opens new window)

[9] Spring Boot2.0 整合 Kafka from 博客园 (opens new window)

[10] Spring Boot2.0 整合 Kafka from Github (opens new window)

[11] Python操作Kafka的通俗总结(kafka-python)from 知乎 (opens new window)

[12] Python 操作 Kafka --- kafka-python from CSDN (opens new window)

[13] kafka的文件清除策略 from CSDN (opens new window)

[14] Kafka消息超过最大值限制max.request.size from CSDN (opens new window)

[15] Kafka producer的几个重要配置参数 from Jmx's Blog (opens new window)

[16] 彻底搞懂 Kafka 消息大小相关参数设置的规则 from 知乎 (opens new window)

[17] kafka生产调优手册 from Jeff的技术栈 (opens new window)

[18] 调优Kafka,你做到了吗 from 技术文章摘抄 (opens new window)

[19] KAFKA参数调优实战,看这篇文章就够了 from CSDN (opens new window)

[20] Kafka参数调优 from CSDN (opens new window)

[21] docker-compose 搭建 kafka 集群 from 博客园 (opens new window)

[22] Docker搭建带SASL用户密码验证的Kafka from 博客园 (opens new window)

[23] docker-compose配置带密码验证的kafka from 博客园 (opens new window)

[24] docker创建带SASL认证的kafka from CSDN (opens new window)

[25] kafka密码SASL认证配置及SpringBoot配置加密后的Kafka参数 from CSDN (opens new window)

[26] SpringBoot整合Kafka from 博客园 (opens new window)

[27] 从未如此简单:10分钟带你逆袭Kafka from 51CTO (opens new window)

[28] 配置Kafka消息保留时间 from CSDN (opens new window)

[29] Kafka 不停机修改某一个topic数据保存时间 from CSDN (opens new window)

[30] kafka-9-python操作kafka及偏移量的处理 from CSDN (opens new window)

[31] Kafka新手入门 from KnowStreaming官网 (opens new window)

[32] Kafka删除历史消息的策略 from CSDN (opens new window)

[33] kafka如何保证消息不丢失 from 稀土掘金 (opens new window)

[34] Zookeeper 在 Kafka 中的作用知道吗?from 开发者客栈 (opens new window)

[35] kafka分区数设置多少合适 from 稀土掘金 (opens new window)

[36] 研磨消息中间件kafka之消息持久化及副本 from 朝闻道 (opens new window)

[37] 一文理解Kafka如何保证消息顺序性 from 腾讯云 (opens new window)

Last Updated: 6/1/2024, 9:45:58 PM