千家信息网

如何进行kafka的安装和使用

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章将为大家详细讲解有关如何进行kafka的安装和使用,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。1. kafka介绍1.1. 主要功能根据官网
千家信息网最后更新 2025年12月03日如何进行kafka的安装和使用

这篇文章将为大家详细讲解有关如何进行kafka的安装和使用,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

1. kafka介绍

1.1. 主要功能

根据官网的介绍,ApacheKafka?是一个分布式流媒体平台,它主要有3种功能:

  1:It lets you publish and subscribe to streams of records.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因

  2:It lets you store streams of records in a fault-tolerant way.以容错的方式记录消息流,kafka以文件的方式来存储消息流

  3:It lets you process streams of records as they occur.可以再消息发布的时候进行处理

1.2. 使用场景

1:Building real-time streaming data pipelines that reliably get data between systems or applications.在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能

2:Building real-time streaming applications that transform or react to the streams of data。构建实时的流数据处理程序来变换或处理数据流,数据处理功能

1.3. 详细介绍

Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制

  1.3.1 消息传输流程

    Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。

    Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息

    Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

    从上图中就可以看出同一个Topic下的消费者和生产者的数量并不是对应的。

  1.3.2 kafka服务器消息存储策略

    谈到kafka的存储,就不得不提到分区,即partitions,创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。

  在每个分区中,消息以顺序存储,最晚接收的的消息会最后被消费。

  1.3.3 与生产者的交互

    生产者在向kafka集群发送消息的时候,可以通过指定分区来发送到指定的分区中

    也可以通过指定均衡策略来将消息发送到不同的分区中

    如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的分区中

  1.3.4 与消费者的交互

    在消费者消费消息时,kafka使用offset来记录当前消费的位置

    在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的的消费的记录位置offset各不项目,不互相干扰。

    对于一个group而言,消费者的数量不应该多余分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费

    因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。

2. Kafka安装与使用

2.1. 下载

  你可以在kafka官网 http://kafka.apache.org/downloads下载到最新的kafka安装包,选择下载二进制版本的tgz文件,根据网络状态可能需要fq,这里我们选择的版本是0.11.0.1,目前的最新版

2.2. 安装

  Kafka是使用scala编写的运行与jvm虚拟机上的程序,虽然也可以在windows上使用,但是kafka基本上是运行在linux服务器上,因此我们这里也使用linux来开始今天的实战。

  首先确保你的机器上安装了jdk,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用

  说是安装,如果只需要进行最简单的尝试的话我们只需要解压到任意目录即可,这里我们将kafka压缩包解压到/home目录

2.3. 配置

  在kafka解压目录下下有一个config的文件夹,里面放置的是我们的配置文件

  consumer.properites 消费者配置,这个配置文件用于配置于2.5节中开启的消费者,此处我们使用默认的即可

  producer.properties 生产者配置,这个配置文件用于配置于2.5节中开启的生产者,此处我们使用默认的即可

  server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,目前仅介绍几个最基础的配置


    1. broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可

    2. listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,例如:

          listeners=PLAINTEXT:// 192.168.180.128:9092。并确保服务器的9092端口能够访问

    3.zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,使用默认配置即可

          zookeeper.connect=localhost:2181

2.4. 运行

  1. 启动zookeeper

cd进入kafka解压目录,输入

bin/zookeeper-server-start.sh config/zookeeper.properties &

启动zookeeper成功后会看到如下的输出

    
2.启动kafka

cd进入kafka解压目录,输入

bin/kafka-server-start.sh config/server.properties

启动kafka成功后会看到如下的输出

2.5. 第一个消息

   2.5.1 创建一个topic

    Kafka通过topic对同一类的数据进行管理,同一类的数据使用同一个topic可以在处理数据时更加的便捷

    在kafka解压目录打开终端,输入

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    创建一个名为test的topic

 在创建topic后可以通过输入

bin/kafka-topics.sh --list --zookeeper localhost:2181

来查看已经创建的topic

  2.4.2 创建一个消息消费者

   在kafka解压目录打开终端,输入(from-beginning每次都是从头消费,不想从头消费可以取消参数)

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

   可以创建一个用于消费topic为test的消费者

消费者创建完成之后,因为还没有发送任何数据,因此这里在执行后没有打印出任何数据

不过别着急,不要关闭这个终端,打开一个新的终端,接下来我们创建第一个消息生产者

  2.4.3 创建一个消息生产者

    在kafka解压目录打开一个新的终端,输入

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    在执行完毕后会进入的编辑器页面

在发送完消息之后,可以回到我们的消息消费者终端中,可以看到,终端中已经打印出了我们刚才发送的消息




Python伪代码版本
消费者

  1. [root@ip-10-1-2-175 sh]# more cus.py

  2. import time, json

  3. from pykafka import KafkaClient

  4. client = KafkaClient(hosts="10.1.2.175:9092") # 可接受多个Client这是重点

  5. topic = client.topics['test'] # 选择一个topic

  6. # 生成一个消费者

  7. balanced_consumer = topic.get_balanced_consumer(consumer_group='goods_group',auto_commit_enable=True,zookeeper_connect='localhost:2181')


  8. for message in balanced_consumer:

  9. print message


生产者

  1. [root@ip-10-1-2-175 sh]# more prod.py

  2. import time, json

  3. from pykafka import KafkaClient

  4. def pro():

  5. client = KafkaClient(hosts="10.1.2.175:9092")

  6. topic = client.topics['test'] # 选择一个topic

  7. producer = topic.get_producer() # 创建一个生产者


  8. goods_dict = {'option_type':'insert','option_obj':{'goods_name':'goods-1'} }

  9. goods_json = json.dumps(goods_dict)

  10. producer.produce(goods_json) # 生产消息

  11. producer.stop()

  12. if __name__ == '__main__':

  13. pro()



启动消费者
[root@ip-10-1-2-175 sh]# python cus.py


启动生产者
[root@ip-10-1-2-175 sh]# python prod.py


查看消费者
[root@ip-10-1-2-175 sh]# python cus.py

关于如何进行kafka的安装和使用就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

消息 消费 消费者 配置 生产 生产者 数据 服务器 服务 目录 文件 终端 集群 处理 存储 输入 运行 不同 功能 数量 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 金山区网络技术转让咨询热线 网络安全考研哪个学校 沈阳有学软件开发的地方么 工控主板 服务器 架设ssh服务器 安卓怎么清理软件登录数据库 英语基础不好学软件开发好嘛 淅川软件开发设计服务至上 华大智造服务器管理如何 数据库技术与应用教学用书 服务器eos管理 警校网络安全与执法前景怎么样 ftview 报警服务器 数据库2014的安装 数据库原理及应用实例 项目软件开发课程 车载软件开发基本知识 清除单元格定义的数据库 国外有没有网络安全法律法规 银行软件开发公司工资排行 金山区网络技术转让咨询热线 金融股票软件开发怎么样 未转变者怎么找到自己的服务器 一般哪些行业用到数据库 陕西戴尔服务器虚拟化解决方案 转转下载软件开发 虹口区本地网络技术厂家价格 数据库2014的安装 设备点检软件开发计划表 滨湖区项目软件开发代理品牌
0