千家信息网

如何进行Kafka学习

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,如何进行Kafka学习,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Apache Kafka一、消息队列分类1.1 点对
千家信息网最后更新 2025年12月03日如何进行Kafka学习

如何进行Kafka学习,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

Apache Kafka

一、消息队列分类

1.1 点对点

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并消费消息

注意:

1.消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息

2.Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费

1.2 发布/订阅

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费

二、消息队列对比

2.1 RabbitMQ

支持的协议多,非常重量级消息队列,对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持

2.2 ZeroMQ

号称最快的消息队列系统,尤其针对大吞吐量的需求场景,擅长高级/复杂的队列,但是技术也复杂,并且只提供非持久性队列

2.3 ActiveMQ

Apache下的一个子项,类似ZeroMQ,能够以代理人和点对点的技术实现队列

2.4 Redis

一个key-value的NoSQL数据库,但也支持MQ功能,数据量较小,性能优于RabbitMQ,数据超过10K就慢的无法忍受

三、Kafka简介

3.1 Kafka简介

Kafka是分布式发布-订阅消息系统。它最初由Linkedln公司开发,使用Scala语言编写,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的(分区处理),多订阅者,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据

3.2 Kafka特点

1.同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50M),每秒处理55万消息(110M)

2.可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘以及replication防止数据丢失

3.分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式。无需停机即可扩展机器

4.消息被处理的状态是在consumer端维护,而不是由server端维护

5.支持online(上线)和offline(下线)的场景

四、Kafka架构


重要说明:

1.在Kafka的体系中不存在单读的Conmuser,它会存在一个Conmuser Group,Conmuser Group里面会有多个Conmuser

2.可以把Consumer Group看成一个虚拟的Consumer,它消费的是一个具体的Topic的数据,但具体执行是由Consumer Group中的Consumer去执行的,Consumer是一个逻辑上的概念,是不存在的,而存在的是Consumer Group当中的Consumer, 一个Consumer Group对应的是Topic,Consumer Group中的Consumer对应的是Topic中的partition

3.一个消费者组里面的多个消费者对应的是什么呢?

Topic组里面不同Partition的数据,一个Partition里面的数据交给一个Consumer来处理,另一个Partition里面的数据交给另一个Consumer来处理,当然它们必须是同一个Consumer Group里面的Consumer,这就达到了并行的消费(每一个Consumer对应的是一个Partition里面的数据)

4.Kafka为什么会有Partition的概念?

带来的好处就是处理的速度更快,不同的Conmuser去消费不同Partition的消息,数据的消费就变成了并行的

五、Kafka的核心概念

5.1 Producer

特指消息的生产者

5.2 Consumer

特指消息的消费者

5.3 Consumer Group

消费者组,可以并行消费Topic中Partition的消息

5.4 Broker

缓存代理,Kafka集群中的一台或多台服务器统称为broker

1.message在broker中通过log追加的方式进行持久化存储。并进行分区(patitions)

2.为了减少磁盘写入的次数,broker会将消息展示buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数

3.Broker没有副本机制,一旦broker宕机,该broker的消息将不可用(但是消息是有副本的,可以把消息的副本同步到其它的broker中)

4.Broker不保存订阅者的状态,由订阅者自己保存

5.无状态导致消息的删除成为难题(可能删除的消息正在被订阅)Kafka采用基于时间的SLA(服务水平保证),消息保存一定的时间(通常为7天)后会被删除

6.消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id)进行重新读取消费消息

5.5 Topic

特指Kafka处理的消息源(feeds of messages)的不同分类

5.6 Partition

1.Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)

2.Kafka的Partitions分区的目的

2.1 kafka基于文件存储,通过分区,可以将日志内容分线到多个server上,来避免文件尺寸达到单击磁盘的上线,每个partition都会被当前server(kafka实例)保存

2.2 可以将一个topic切分任意多个partitions来提高消息保存/消费的效率

2.3 越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力

5.7 Message

1.消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息

2.Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic有可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的)每个partition存储一部分Message

3.partition中的每条Message包含了一下三个属性

属性名称 数据类型

offset long

MessageSize int32

data mssage的具体内容

5.8 Producers

消息和数据生产者,向kafka的一个topic发布消息的过程叫做producers

1.producer将消息发布到指定的topic中,同时producer也能决定将此消息归属于哪个partition。比如基于"round-robin"方式或者通过其他的一些算法等

2.异步发送:批量发送可以很有效的提高发送效率。kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去

5.9 Consumers

1.消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers

2.在Kafka中,我们可以认为一个group是一个"订阅者",一个Topic中的每个partition只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partition中的消息(消费者数据小于Partition的数量时)

3.注意:kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息

六、Kafka的持久化

6.1 数据持久化

1.发现线性的访问磁盘,很多时候比随机的内存访问快的多

2.传统的使用内存作为磁盘的缓存

3.Kafka直接将数据写入到日志文件

6.2 日志数据持久化特性

1.写操作:通过将数据追加到文件中实现

2.读操作:读的时候从文件读就好了

6.3 优势

1.读操作不会阻塞写稻作和其它操作,数据大小不对性能产生影响

2.没有容量限制(相对于内存来说)的硬盘空间建立消息系统

3.线性访问磁盘,速度快,可以保存任意一段时间

6.4 持久化具体实现

1.一个Tipic可以认为是一个类消息,每个topic将被分成多个partition,每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),partition是以文件的形式存储在文件系统中

2.Logs文件根据broker中的配置要求,保留一定时间后删除来释放磁盘空间(默认是7天)


说明:Partition是Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。Partition中每条消息都会被分配一个有序的id(offset)

6.5 索引

为数据文件建立索引:稀疏存储,每隔一定字节的数据建立一条索引。下图是一个partition的索引示意图


注意:

1. 现在对1、3、6、8 建立了索引,如果要查找7,则会先查找到8然后,再找到8后的一个索引6,然后两个索引之间做二分法,找到7的位置

2. 日志文件也会进行segement(分割),分而治之

七、Kafka的分布式实现

7.1 Kafka分布式架构图



注意:

1.当生产者将消息发送到Kafka后,就会去立刻通知ZooKeeper,zookeeper中会watch到相关的动作,当watch到相关的数据变化后,会通知消费者去消费消息

2.消费者是主动去Pull(拉)kafka中的消息,这样可以降低Broker的压力,因为Broker中的消息是无状态的,Broker也不知道哪个消息是可以消费的

3.当消费者消费了一条消息后,也必须要去通知ZooKeeper。zookeeper会记录下消费的数据,这样但系统出现问题后就可以还原,可以知道哪些消息已经被消费了

7.2生产环境部署架构图


说明:

1.Name Server集群指的是Zookeeper集群

七、Kafka的通讯协议

8.1 通讯协议简介

1.Kafka的通讯协议主要说的是,consumer去拉数据使用的通讯协议

2.Kafka的Producer、Broker和Consumer采用的是一套自行设计基于TCP层的协议,根据业务需求定制,而非实现一套类似于Protocol Buffer的通讯协议

3.基本数据类型

3.1定长数据类型:int8,int16,int32和int64,对应到Java中就是byte,short,int和long

3.2变长数据类型:bytes和string。变长的数据类型由两部分组成,分别是一个有符号整数N(标识内容的长度)和N个字节的内容。其中N为-1标识内容为null。Bytes的长度由int32标识,string的长度由int16表示

3.3数组:数组由两个部分组成,分别是一个有int32类型的数字标识的数组长度N和N个元素

8.2通讯协议详细说明

1.Kafka通讯的基本单位是Request/Response

2.基本结构:

RequestOrResponse ---> MessageSize(RequestMessage | ResponseMessage)

名称类型描述ApiKeyInt16标识这次请求的API编号ApiVersionInt16标识请求的API版本,有了版本后就可以做到向后兼容CorrelationIdInt32由客户端指定的一个数字唯一标识这次请求的id,服务器端在处理请求后也会把同样的CorrelationId写到Response中,这样客户端就能把某个请求和响应对应起来了ClientIdstring客户端指定的用来描述客户端的字符串,会被用来记录日志和监控,它唯一标识一个客户端Request-Request的具体内容

3.通讯过程:

3.1客户端打开与服务端的Socket

3.2往Socket写入一个int32的数字(数字标识这次发送的Request有多少字节)

3.3服务器端先读出一个int32的整数从而获取这次Request的大小

3.4然后读取对应字节数的数据从而得到Request的具体内容

3.5服务器端处理了请求之后也用同样的发送发誓来发送响应

4.RequestMessage结构

4.1RequestMessage ---> ApiKey ApiVersion CorrelationId ClientId Request

名称类型描述MessageSizeint32标识RequestMessage或者ResponseMessage的长度RequestMessageResponseMessage--标识Request或者Response的内容

5.ResponseMessage

5.1ResponseMessage ---> CorrelationId Response

名称类型描述CorrelationIdint32对应Request的CorrelationIdResponse--对应Request的Response,不同的Request的Response的字段是不一样的

Kafka采用是经典的Reactor(同步IO)模式,也就是1个Acceptor响应客户端的连接请求,N个Processor来读取数据,这种模式可以构建出高性能的服务器

6.Message:Producer生产的消息,键-值对

6.1Message --- > Crc MagicByte Attributes Key Value

名称类型描述CRCInt32标识这条消息(不包括CRC字段本身)的校验码MagicByteInt8标识消息格式的版本,用来做向后兼容,目前值为0AttributesInt8标识这条消息的元数据,目前最低两位用来标识压缩格式Keybytes标识这条消息的Key,可以为nullValuebytes标识这条消息的Value。Kafka支持消息嵌套,也就是把一条消息作为Value放到另外一个消息里面

说明:

CRC是一种消息检验方式,在Consumer拿到数据以后,CRC会获取MessageSize和MessageData的大小做比较,如果不一致则,那么这个操作的数据Consumer就不接收了,如果一直则才做处理。防止消息在传输过程中损坏,丢失的一种校验方式

7.MessageSet:用来组合多条Message,它在每条Message的基础上加上offset和MessageSize

7.1MessageSet --> [offset MessageSize Message]

名称类型描述OffsetInt64它用来作为log中的序列号,Producer在生产消息的时候还不知道具体的值是什么,可以随便填个数字进去MessageSizeInt32标识这条Message的大小Message-标识这条Message的具体内容,其格式见上一小结

8.Request/Response和Message/messageSet的关系

8.1 Request/Response是通讯层的结构,和网络的7层模型对比的话,它类似于TCP

8.2 Message/MessageSet定义的是业务层的结构,类似于网络7层模型中的HTTP层。Message/MessageSet只是Request/Response的payload中的一种数据结构

备注:Kafka的通讯协议中不包含Schema,格式也比较简单,这样设计的好处是协议自身的Overhead小,再加上把多条Message放到一期做压缩,提高压缩比率,从而在网络上传输的数据量会少一些

九、数据传输的事务定义

1.at most once:最多一次,这个和JMS中"非持久化"消息类似,发送一次,无论成败,将不会重发

消费者fetch(得到)消息,然后保存offset,然后处理消息; 当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理,那么伺候"未处理"的消息将不能被fetch到,这就是"at most once"

2.at least once:消息至少发送一次,如果消息未能接收成功,可能会重发,知道接收成功

消费者fetch消息,然后处理消息,然后保存offset,如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once",原因offset没有即使的提交给zookeeper,zookeeper恢复正常还是之前offset状态。

注:通常情况下"at least once"是我们的首选(相比at most once而言,重复接收数据总比丢失数据要好)

3.exactly once:消息只会发送一次

Kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的

十、Kafka安装

1.下载并上传kafka到服务器

2.解压缩并移动到/usr/local目录下

3.启动服务

3.1启动zookeeper服务 # ./zookeeper-server-start.sh ../config/zookeeper.properties > /dev/null 2>&1 &

3.2启动kafka服务 # ./kafka-server-start.sh ../config/server.properties > /dev/null 2>&1 &

3.3创建topic:

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

3.4查看主题

./kafka-topics.sh --list --zookeeper localhost:2181

3.5查看主题详情

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

3.6删除主题

./kafka-run-class.sh kafka.admin.TopicCommand --delete --topic test --zookeeper 192.168.31.220:2181

十一、Kafka客户端操作

11.1创建Producer

./kafka-console-producer.sh --broker-list localhost:9092 --topic test1

11.2创建Consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning

11.3参数使用帮组信息查看

生产者参数查看:./kafka-console-producer.sh

消费者参数查看:./kafka-console-consumer.sh

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

消息 消费 数据 标识 处理 消费者 订阅 文件 服务 多个 类型 生产 内容 通讯 磁盘 队列 不同 客户 订阅者 索引 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 计算机网络技术单招笔试 为什么国家要宣传网络安全 医疗数据库安全 关于网络安全的新闻评论 软件开发行业的对人的优劣势 静安区网络技术品牌 青浦区电子网络技术质量服务 云服务器修改管理员权限代码 大学好用软件开发 黄陂靠谱的软件开发 linux网络安全有哪些 如何查看一梦江湖角色所在服务器 大学软件开发专业书籍 第二次网络安全攻防演练 中国网络安全百强企业 漳州财务软件开发费用 华为服务器型号2288HV5 河南服务器电源哪家强 sw数据库遗失怎么解决 北京拉勾网络技术有限公司天眼查 无线网络技术农业领域 软件开发单一来源 数据库技术发展经历了几代 数据库pool 中国电子云网络安全新闻 广域网加速属于网络安全吗 河南服务器电源哪家强 数据库某个数据更新为空 丁祥龙杭州湖畔网络技术有限公司 青少年注意网络安全有什么
0