千家信息网

windows下如何实现kafka+ELK的日志系统

发表于:2025-11-07 作者:千家信息网编辑
千家信息网最后更新 2025年11月07日,这篇文章主要介绍windows下如何实现kafka+ELK的日志系统,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!用到的软件:zookeeper、kafka、logstash(
千家信息网最后更新 2025年11月07日windows下如何实现kafka+ELK的日志系统

这篇文章主要介绍windows下如何实现kafka+ELK的日志系统,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

用到的软件:zookeeper、kafka、logstash(6.3.2版本)、ES(6.3.2版本)、Kibana(6.3.2版本)。具体安装步骤不在此说明,基本都是下载解压,改一下配置文件,即可使用。(以下所述均在Windows下)
1、zookeeper:
kafka中自带zookeeper,可以不用装zookeeper,如果想自己另装,需配置环境变量,如下:
ZOOKEEPER_HOME => D:\nomalAPP\zookeeper-3.4.13
path 里面加入 %ZOOKEEPER_HOME%\bin
如果配置好以后,在cmd里运行zkserver报找不到java错误的话,可能是java环境变量的位置放置有问题,可以将path里面配置的java环境位置移到最前面。

2、kafka:
kafka如果启动报找不到java的错误,原因在于kafka-run-class.bat的179行,找到里面的 %CLASSPATH% ,将其用双引号括起来。即改为:set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
kafka的配置文件server.properties里面要修改的:
日志所放置的目录(可以选择用默认的):log.dirs=D:/nomalAPP/kafka_2.12-2.0.0/kafka-logs,
连接zookeeper的ip和端口:zookeeper.connect=localhost:2181
其他的均可用默认配置。
启动命令:cmd锁定安装目录,然后 .\bin\windows\kafka-server-start.bat .\config\server.properties

3、logstash:
在config目录下,logstash.yml里面,如果想启动多个,在里面可以配置 http.port=9600-9700,如果没有配置这一项,会使用默认的9600端口。
在config目录下,增加一个logstash.conf的配置文件,用来配置数据源,和数据过滤,数据输出的位置,如下:

input {            #   ==>    用kafka的日志作为数据源   kafka{       ····#(kafka的IP和端口)        bootstrap_servers => "10.8.22.15:9092"          # (在有多个相同类型的数据源时,需要配置)        client_id => "test1"          # (消费者分组,可以通过组 ID 去指定,不同的组之间消费是相互不受影响的,相互隔离)        group_id => "test1"           auto_offset_reset => "latest"            # (消费者线程个数)        consumer_threads => 5           # (在输出消息的时候会输出自身的信息包括:消费消息的大小, topic 来源以及 consumer 的 group 信息)        decorate_events => true            # (主题)        topics => ["bas-binding-topic"]            # (用于ES索引)        type => "bas-binding-topic"          # (数据格式)        codec => "json"             tags => ["bas-binding-topic"]      }        kafka{          bootstrap_servers => "10.8.22.15:9092"          client_id => "test2"          group_id => "test2"          auto_offset_reset => "latest"          consumer_threads => 5        #decorate_events => true          topics => ["bas-cus-request-topic"]          type => "bas-cus-request-topic"          codec => "json"        tags => ["bas-cus-request-topic"]      }  }filter {    filter插件负责过滤解析input读取的数据,可以用grok插件正则解析数据,    date插件解析日期,json插件解析json等等}output {    # 输出到ES    if "bas-binding-topic" in [tags] {       elasticsearch{               # ES的IP和端口            hosts => ["localhost:9201"]              # 索引名称:主题+时间            index => "bas-binding-topic-%{+YYYY.MM.dd+HH:mm:ss}"              timeout => 300        }        stdout {            codec => rubydebug         }  }  if "bas-cus-request-topic" in [tags] {       elasticsearch{              hosts => ["localhost:9201"]              index => "bas-cus-request-topic-%{+YYYY.MM.dd+HH:mm:ss}"              timeout => 300        }        stdout {            codec => rubydebug         }  }}

启动logstash的命令: .\bin\logstash.bat -f .\config\logstash.conf

4、ES:
在config目录下,elasticsearch.yml配置文件中,ES的默认端口为9200,可以通过http.port=9201来修改
启动命令: .\bin\elasticsearch.bat
启动后,在浏览器访问: http://localhost:9201,
如果出现了一些信息,表示启动成功。

5、Kibana:
在config目录下,kibana.yml配置文件中,通过elasticsearch.url: "http://localhost:9201" 来配置地址
启动命令: .\bin\kibana.bat
访问 "http://localhost:9201" ,如果出现需要输入用户名密码的界面,表示ES启动失败,如果直接显示Kibana界面,则启动成功。

logstash数据输出到ES时,会选择ES默认的映射来解析数据。如果觉得默认映射不满足使用条件,可以自定模板:
借助postman工具创建ES模板,如下:

收到此响应表示创建成功。然后可以通过GET请求查询你刚才创建的模板,通过DELETE请求删除刚才创建的模板。

向kafka主题中发送日志信息,发送的信息会在cmd窗口显示,如下:

同时,在Kibana界面会看到相应的索引,索引名称就是在logstash.conf的输出中配置的,如下:

Visualize:可以选择自己想要的索引去进行图形分析,效果如下:

Dashboard:将做过的图整合到仪表盘,效果如下:

另外,可以设置自动刷新,即当有新的数据发送到kafka时,所做的图会自动根据主题来进行刷新。
项目中,可以通过logback来讲日志输出到kafka,具体配置如下:

              %msg%n                        bas-binding-topic                        bootstrap.servers=localhost:9092,10.8.22.13:9092                         

然后在记日志的时候,记到对应的主题,日志就会写到kafka相应的位置。可以通过消费者来消费主题,查看日志是否成功写入,如下:

@SuppressWarnings("resource")  public static void main(String[] args) {    Properties properties = new Properties();    properties.put("bootstrap.servers", "127.0.0.1:9092");    properties.put("group.id", "group-1");    properties.put("enable.auto.commit", "false");    properties.put("auto.commit.interval.ms", "1000");    properties.put("auto.offset.reset", "earliest");    properties.put("session.timeout.ms", "30000");    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    properties.put("value.deserializer",         "org.apache.kafka.common.serialization.StringDeserializer");    KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);    kafkaConsumer.subscribe(Arrays.asList("bas-cus-request-topic", "bas-binding-topic"));    while (true) {      ConsumerRecords records = kafkaConsumer.poll(Long.MAX_VALUE);      System.err.println("+++++++++++++++++++++++++++++++++++++++++");      for (ConsumerRecord record : records) {        System.err.println(record.offset() + ">>>>>>" + record.value());      }      System.err.println("+++++++++++++++++++++++++++++++++++++++++++++");      break;    }  }

以上是"windows下如何实现kafka+ELK的日志系统"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!

配置 数据 日志 输出 主题 目录 消费 信息 可以通过 文件 端口 索引 成功 位置 命令 插件 模板 数据源 消费者 版本 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全的系统学习科目 金华云兜兜互联网科技公司 在小米做嵌入式软件开发怎么样 软件开发收费 网络安全原创周知识竞赛 织梦网站数据库怎么进入 互联网科技论坛 服务器的租用费用 运城淘客app软件开发 数据库怎么防止注入 九天服务器如何开终端 无锡智企网络技术有限公司 盐田区网络技术转移包括什么 网络安全攻防需要哪些硬件 农业机具数据库设计 360防火墙和服务器安全狗 调查数据统计分析软件开发 常州综合服务管理软件开发 打开数据库的各种方式有何不同 msql数据库建表 上海索拉软件开发有限责任公司 计算机网络技术女生多么 超算网络技术科技有限公司 福建服务器维修虚拟主机 软件开发模型 迭代 上海企业软件开发服务价格对比 域名服务器网站进不去 数据库婚姻状况40 数据库学科信息怎么填 逃跑吧少年服务器崩溃进不了游戏
0