kafka集群安装及管理(一)
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,一、环境配置1.系统环境[root@date ~]# cat /etc/centos-releaseCentOS Linux release 7.4.1708 (Core)2..安装JAVA环境yum
千家信息网最后更新 2025年12月01日kafka集群安装及管理(一)
一、环境配置
1.系统环境
[root@date ~]# cat /etc/centos-releaseCentOS Linux release 7.4.1708 (Core)
2..安装JAVA环境
yum -y install java java-1.8.0-openjdk-devel#jps需要jdk-devel支持
3.下载kafka
[root@slave1 ~]# ls kafka_2.11-1.0.0.tgzkafka_2.11-1.0.0.tgz
二、配置zookeeper
1.修改kafka中zookeeper的配置文件
[root@slave1 ~]# cat /opt/kafka/config/zookeeper.properties | grep -v "^$" | grep -v "^#"tickTime=2000initLimit=10syncLimit=5maxClientCnxns=300dataDir=/opt/zookeeper/datadataLogDir=/opt/zookeeper/logclientPort=2181server.1=20.0.5.11:2888:3888server.2=20.0.5.12:2888:3888server.3=20.0.5.13:2888:3888
2.复制配置文件到其他节点
[root@slave1 ~]# pscp.pssh -h zlist /opt/kafka/config/zookeeper.properties /opt/kafka/config/[root@slave1 ~]# cat zlist20.0.5.1120.0.5.1220.0.5.13
3.在各个节点创建数据和日志目录
[root@slave1 ~]# pssh -h zlist 'mkdir /opt/zookeeper/data'[root@slave1 ~]# pssh -h zlist 'mkdir /opt/zookeeper/log'
4.创建myid文件
[root@slave1 ~]# pssh -H slave1 -i 'echo 1 > /opt/zookeeper/data/myid'[root@slave1 ~]# pssh -H slave2 -i 'echo 2 > /opt/zookeeper/data/myid'[root@slave1 ~]# pssh -H slave3 -i 'echo 3 > /opt/zookeeper/data/myid'
5.启动zookeeper
[root@slave1 ~]# pssh -h zlist 'nohup /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties &'[root@slave1 ~]# pssh -h zlist -i 'jps'[1] 23:29:36 [SUCCESS] 20.0.5.123492 QuorumPeerMain3898 Jps[2] 23:29:36 [SUCCESS] 20.0.5.117369 QuorumPeerMain9884 Jps[3] 23:29:36 [SUCCESS] 20.0.5.133490 QuorumPeerMain3898 Jps
三、配置kafka
1.修改server.properties
[root@slave1 ~]# cat /opt/kafka/config/server.propertiesbroker.id=1#机器唯一标识host.name=20.0.5.11#当前broker机器ipport=9092#broker监听端口num.network.threads=3#服务器接受请求和响应请求的线程数num.io.threads=8#io线程数socket.send.buffer.bytes=102400#发送缓冲区大小,数据先存储到缓冲区了到达一定的大小后在发送socket.receive.buffer.bytes=102400#接收缓冲区大小,达到一定大小后序列化到磁盘socket.request.max.bytes=104857600#向kafka请求消息或者向kafka发送消息的请求的最大数log.dirs=/opt/kafkalog#消息存放目录delete.topic.enable=true#能够通过命令删除topicnum.partitions=1#默认的分区数,一个topic默认1个分区数num.recovery.threads.per.data.dir=1#设置恢复和清理超时数据的线程数量offsets.topic.replication.factor=3#用于配置offset记录的topic的partition的副本个数transaction.state.log.replication.factor=3transaction.state.log.min.isr=3log.retention.hours=168#消息保存时间log.segment.bytes=1073741824#日志文件最大值,当日志文件的大于最大值,则创建一个新的log.retention.check.interval.ms=300000#日志保留检查间隔zookeeper.connect=20.0.5.11:2181,20.0.5.12:2181,20.0.5.13:2181#zookeeper地址zookeeper.connection.timeout.ms=6000#连接zookeeper超时时间
2.复制broke配置文件到其他服务节点上(修改broker.id和host.name)
3.启动kafka broke
[root@slave1 ~]# pssh -h zlist '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /root/kafka.log 2>1&'[1] 02:13:05 [SUCCESS] 20.0.5.11[2] 02:13:05 [SUCCESS] 20.0.5.12[3] 02:13:05 [SUCCESS] 20.0.5.13[root@slave1 ~]# pssh -h zlist -i 'jps'[1] 02:14:51 [SUCCESS] 20.0.5.123492 QuorumPeerMain6740 Jps6414 Kafka[2] 02:14:51 [SUCCESS] 20.0.5.133490 QuorumPeerMain4972 Kafka5293 Jps[3] 02:14:51 [SUCCESS] 20.0.5.117369 QuorumPeerMain11534 Kafka11870 Jps
4.创建topic
[root@slave1 ~]# /opt/kafka/bin/kafka-topics.sh --create --zookeeper 20.0.5.11:2181,20.0.5.12:2181,20.0.5.13:2181 --replication-factor 3 --partitions 3 --topic test1Created topic "test1".[root@slave1 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 20.0.5.11:2181,20.0.5.12:2181,20.0.5.13:2181 --topic test1Topic:test1 PartitionCount:3 ReplicationFactor:3 Configs: Topic: test1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: test1 Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: test1 Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
5.启动producer和consumer
[root@slave4 ~]# /opt/kafka/bin/kafka-console-producer.sh --broker-list 20.0.5.12:9092 --topic test1[root@slave5 ~]# /opt/kafka/bin/kafka-console-consumer.sh --zookeeper 20.0.5.13:2181 --topic test1

6.删除topic
[root@slave1 ~]# /opt/kafka/bin/kafka-topics.sh --delete --zookeeper 20.0.5.11:2181 --topic test1Topic test1 is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.
配置
文件
大小
日志
消息
最大
数据
环境
线程
缓冲区
节点
缓冲
时间
最大值
机器
目录
服务
个数
副本
命令
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
个人简介计算机网络技术
公司网络安全管理制度体系情况
可靠的软件开发培训班
互联网 网络安全论文
信阳师范网络技术学费
authors+数据库
高职专业目录 网络安全
成都网络技术专业学校
广州网络安全运维有哪些
高级查询数据库技术
redis同步数据库数据
高科技互联网片头
成吉思汗2最新服务器
上海揆安 网络安全检查
长宁区正规软件开发销售公司
郑州千觉互联网科技有限公司
虎丘区正规服务器代理厂家
网络安全赛中职组
怎么租国外的服务器
宜兴加工软件开发规范
滁州机架式服务器哪家好
高淳游戏软件开发
西子奥迪斯服务器杳故障
武汉有哪些软件开发外包
网络安全和技术的区别
软件开发培训学校哌拉西林
全国邮编数据库
软件开发算法思维
公众网络安全素养是什么
科摩多软件开发公司