Nginx + Docker 手动集群方式运行 EMQ
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,EMQ X 在支持客户的过程中,了解到有客户使用 Nginx 做负载均衡,Docker 容器手动加入集群的方式运行 EMQ 集群,现将主要过程记录下来。业务需求使用 Nginx 作为反向代理Nginx
千家信息网最后更新 2025年12月02日Nginx + Docker 手动集群方式运行 EMQ
EMQ X 在支持客户的过程中,了解到有客户使用 Nginx 做负载均衡,Docker 容器手动加入集群的方式运行 EMQ 集群,现将主要过程记录下来。
业务需求
- 使用 Nginx 作为反向代理
- Nginx 需要提前分配好代理 server 的地址
- 使用 Docker 容器运行 EMQ
- EMQ 自动重启
- EMQ 重启后自动集群
配置
Nginx 配置
$ cat /etc/nginx/tcpstream.conf## tcp LB and SSL passthrough for backend ##stream { upstream mqtt_broker { server 127.0.0.1:21871; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21872; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21873; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21874; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21875; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21881; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21891; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21882; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21892; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21883; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21893; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21884; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21894; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21885; #max_fails=5 fail_timeout=30s; server 127.0.0.1:21895; #max_fails=5 fail_timeout=30s; }log_format basic '$proxy_protocol_addr - $remote_addr [$time_local] ' '$protocol $status $bytes_sent $bytes_received ' '$session_time "$upstream_addr" ' '"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"'; access_log /var/log/nginx/access.log basic; error_log /var/log/nginx/error.log; server { listen 8884 ssl; # proxy_protocol; proxy_next_upstream on; #proxy_bind $remote_addr transparent; proxy_ssl off; proxy_pass mqtt_broker; proxy_protocol on; #ssl_on; # adding some extra proxy settings proxy_timeout 350s; #proxy_buffer_size 128k; #ssl_certificate /etc/nginx/certs/solace.pem; #ssl_certificate_key /etc/nginx/certs/solace.pem; ssl_certificate /etc/nginx/certs/cert.pem; ssl_certificate_key /etc/nginx/certs/key.pem; #ssl_verify_client off; ssl_protocols TLSv1 TLSv1.1 TLSv1.2; ssl_ciphers HIGH:!aNULL:!MD5; }}Docker 配置
客户自行编译的 Docker image,并非使用 EMQ 提供的官方镜像。
Dockerfile 目录如下:
$ ll /opt/Docker/总用量 28-rw-r--r-- 1 alexeyp emq 620 10月 22 17:26 Dockerfilelrwxrwxrwx 1 alexeyp emq 13 10月 24 13:59 emqttd -> emqttd.2.3.11drwxr-xr-x 10 alexeyp emq 110 10月 24 14:27 emqttd.2.3.11-rwxr-xr-x 1 alexeyp emq 3463 10月 26 05:03 StartEmqInstance.sh-rwxr-xr-x 1 alexeyp alexeyp 270 10月 25 10:46 status.shDockerfile:
$ cat DockerfileFROM centos:latestRUN yum -y updateEXPOSE 60000-65000WORKDIR /opt/emqttdADD ./emqttd /opt/emqttdADD ./vsparc.rpm /tmp/vsparc.rpmADD ./StartEmqInstance.sh /opt/emqttd/StartEmqInstance.shRUN yum install -y epel-releaseRUN yum install -y which less sed net-tools telnet gtest /tmp/vsparc.rpmENV TZ Australia/MelbourneCMD bash /opt/emqttd/StartEmqInstance.sh && bash可以看到 Docker 容器启动后会执行一个 StartEmqInstance.sh 的脚本,查看该脚本:
$ cat StartEmqInstance.sh#!/bin/bashDIR=$(dirname $0)HOSTNAME=$(hostname -s)function adjust_instance(){ local INST=$1 local INST_ROOT=$2 cat $INST_ROOT/etc/emq.conf | \ sed -re "s/^node\.name\s*=.*$/node.name = emq$INST@127.0.0.1/" | \ #sed -re "s/^cluster\.name\s*=.*$/cluster.name = $HOSTNAME/" | \ sed -re "s/^listener\.tcp\.external\s*=.*$/listener.tcp.external = 0.0.0.0:6188$INST/" | \ sed -re "s/^listener\.tcp\.external1\s*=.*$/listener.tcp.external1 = 0.0.0.0:6189$INST/" | \ sed -re "s/^listener\.tcp\.external2\s*=.*$/listener.tcp.external2 = 0.0.0.0:6187$INST/" | \ sed -re "s/^listener\.tcp\.internal\s*=.*$/listener.tcp.internal = 127.0.0.1:6298$INST/" | \ sed -re "s/^listener\.ssl\.external\s*=.*$/listener.ssl.external = 6288$INST/" | \ sed -re "s/^listener\.ws\.external\s*=.*$/listener.ws.external = 6208$INST/" | \ sed -re "s/^listener\.wss\.external\s*=.*$/listener.ws.external = 6308$INST/" | \ sed -re "s/^listener\.api\.mgmt\s*=.*$/listener.api.mgmt = 6408$INST/" | \ sed -re "s/^(##\s)?listener\.tcp\.external\.proxy_protocol\s=.*$/listener.tcp.external.proxy_protocol = on/" | \ sed -re "s/^(##\s)?listener\.tcp\.external1\.proxy_protocol\s=.*$/listener.tcp.external1.proxy_protocol = on/" | \ sed -re "s/^(##\s)?listener\.tcp\.external2\.proxy_protocol\s=.*$/listener.tcp.external2.proxy_protocol = on/" | \ sed -re "s/^(##\s)?listener\.tcp\.external\.proxy_protocol_timeout\s=.*$/listener.tcp.external.proxy_protocol_timeout = 30s/" | \ sed -re "s/^(##\s)?listener\.tcp\.external1\.proxy_protocol_timeout\s=.*$/listener.tcp.external1.proxy_protocol_timeout = 30s/" | \ sed -re "s/^(##\s)?listener\.tcp\.external2\.proxy_protocol_timeout\s=.*$/listener.tcp.external2.proxy_protocol_timeout = 30s/" | \ sed -re "s/^(##\s)?node.dist_listen_min\s*=.*$/node.dist_listen_min = 6000$INST/" | \ sed -re "s/^(##\s)?node.dist_listen_max\s*=.*$/node.dist_listen_max = 6000$INST/" | \ cat - > $INST_ROOT/etc/emq.conf.new mv $INST_ROOT/etc/emq.conf.new $INST_ROOT/etc/emq.conf}function cluster_instance(){ local INST=$1 for DEST in 1 2 3 4 5; do if [ $DEST == $INST ]; then continue; fi DEST_NODE="emq$DEST@127.0.0.1" RESULT=$(/opt/emqttd/bin/emqttd_ctl cluster join $DEST_NODE 2>&1) echo "$RESULT" echo "$RESULT" | grep -E 'successfully|already' > /dev/null RC=$? [ $RC == 0 ] && break done}cd "$DIR"if [ "$EMQ_INSTANCE_NUMBER" == "" ]; then echo "Environment variable EMQ_INSTANCE_NUMBER(1..10) is not set." echo "eMQ instance name is not configured." exit 1else adjust_instance $EMQ_INSTANCE_NUMBER $DIRfifunction run_application(){ local CMD="$1" local RC=1 while [ $RC != 0 ]; do $CMD RC=$? echo "### Exited: $CMD" echo "### rc = $RC" #[ $RC != 0 ] && sleep 3 RC=1 done echo "### Done: $CMD"}function start_node(){ bin/emqttd start STARTED=0 while [ $STARTED == 0 ]; do sleep 1 /opt/emqttd/bin/emqttd_ctl status | grep "is running" [ $? == 0 ] && break done cluster_instance $EMQ_INSTANCE_NUMBER > /tmp/cluster_instance.log}start_nodesleep 5run_application "/usr/local/bin/emqtt-stats-collector" waitIDLE_TIME=0while [[ $IDLE_TIME -lt 5 ]]do IDLE_TIME=$((IDLE_TIME+1)) if [[ ! -z "$( /opt/emqttd/bin/emqttd_ctl status|grep 'is running'|awk '{print $1}')" ]]; then IDLE_TIME=0 else echo "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqttd not running, waiting for recovery in $((60-IDLE_TIME*5)) seconds" fi sleep 5doneecho "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqttd exit abnormally"exit 1脚本内容稍多而且有些复杂,需要结合 start.sh 脚本和 etc/emq.conf一起看
$ cat start.sh#!/bin/bashfor INST in 1 2 3 4 5do docker ps | grep -E "\sinstance_$INST$" if [ $? != 0 ]; then #docker run -itd ---ulimit nofile=1048576 -restart=always -v /opt/Docker/emqtt/emq$INST/data/mnesia:/opt/emqttd/data/mnesia -e EMQ_INSTANCE_NUMBER=$INST --name=instance_$INST --network host emq:test & docker run -itd --ulimit nofile=1048576 -e EMQ_INSTANCE_NUMBER=$INST --name=instance_$INST --network host emq:latest & fidonewaitEMQ 配置
etc/emq.conf`的全文就不贴出来了,主要是增加了两个 tcp 监听端口,并且关闭了`listener.tcp.external.tune_buffer$ cat etc/emq.conf......##--------------------------------------------------------------------listener.tcp.external = 0.0.0.0:21881listener.tcp.external.acceptors = 16listener.tcp.external.max_clients = 512000listener.tcp.external.access.1 = allow alllistener.tcp.external.proxy_protocol = onlistener.tcp.external.proxy_protocol_timeout = 30slistener.tcp.external.backlog = 1024listener.tcp.external.send_timeout = 15slistener.tcp.external.send_timeout_close = on## listener.tcp.external.tune_buffer = onlistener.tcp.external.nodelay = truelistener.tcp.external.reuseaddr = true##--------------------------------------------------------------------listener.tcp.external1 = 0.0.0.0:21891listener.tcp.external1.acceptors = 16listener.tcp.external1.max_clients = 512000listener.tcp.external1.access.1 = allow alllistener.tcp.external1.proxy_protocol = onlistener.tcp.external1.proxy_protocol_timeout = 30slistener.tcp.external1.backlog = 1024listener.tcp.external1.send_timeout = 15slistener.tcp.external1.send_timeout_close = on## listener.tcp.external1.tune_buffer = onlistener.tcp.external1.nodelay = truelistener.tcp.external1.reuseaddr = true##--------------------------------------------------------------------listener.tcp.external2 = 0.0.0.0:21871listener.tcp.external2.acceptors = 16listener.tcp.external2.max_clients = 512000listener.tcp.external2.access.1 = allow alllistener.tcp.external2.proxy_protocol = onlistener.tcp.external2.proxy_protocol_timeout = 30slistener.tcp.external2.backlog = 1024listener.tcp.external2.send_timeout = 15slistener.tcp.external2.send_timeout_close = on## listener.tcp.external2.tune_buffer = onlistener.tcp.external2.nodelay = truelistener.tcp.external2.reuseaddr = true......业务分析
Docker 容器初始化
Docker 容器创建之后, StartEmqInstance.sh执行 adjust_instance()将 etc/emq.conf中监听的端口修改为Nginx 的代理 server
sed -re "s/^node\.name\s*=.*$/node.name = emq$INST@127.0.0.1/" | \ sed -re "s/^listener\.tcp\.external\s*=.*$/listener.tcp.external = 0.0.0.0:6188$INST/" sed -re "s/^listener\.tcp\.external1\s*=.*$/listener.tcp.external1 = 0.0.0.0:6189$INST/" sed -re "s/^listener\.tcp\.external2\s*=.*$/listener.tcp.external2 = 0.0.0.0:6187$INST/" sed -re "s/^listener\.tcp\.internal\s*=.*$/listener.tcp.internal = 127.0.0.1:6298$INST/"并通过 join 命令来实现集群功能
function cluster_instance(){ local INST=$1 for DEST in 1 2 3 4 5; do if [ $DEST == $INST ]; then continue; fi DEST_NODE="emq$DEST@127.0.0.1" RESULT=$(/opt/emqttd/bin/emqttd_ctl cluster join $DEST_NODE 2>&1) echo "$RESULT" echo "$RESULT" | grep -E 'successfully|already' > /dev/null RC=$? [ $RC == 0 ] && break done}循环检查 EMQ 的状态,当 EMQ 停止了之后退出容器
IDLE_TIME=0while [[ $IDLE_TIME -lt 5 ]]do IDLE_TIME=$((IDLE_TIME+1)) if [[ ! -z "$( /opt/emqttd/bin/emqttd_ctl status|grep 'is running'|awk '{print $1}')" ]]; then IDLE_TIME=0 else echo "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqttd not running, waiting for recovery in $((60-IDLE_TIME*5)) seconds" fi sleep 5doneecho "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqttd exit abnormally"exit 1访问
客户端通过 SSL 方式连接 地址,Nginx 将连接以 TCP 方式负载到 EMQ 节点。
PS:关于 Nginx 如何反向代理 tcp 和 ssl 的设置,可以参考 EMQ X 消息服务器 Nginx 反向代理
自动重启和自动集群
容器启动后通过 StartEmqInstance.sh脚本查询 EMQ 的状态,当 EMQ 停止时退出容器,配合 --restart=always来达到重启容器的目的。
EMQ 将集群信息储存在 data/mnesia中,将容器的中的目录映射到宿主机,当容器重启之后会读取宿主机映射的相关目录,实现重启后自动集群。
存在问题
- Docker 的 host 网络模式使用宿主机的网络,当宿主机有其他业务在执行的时候,容易出现端口冲突
解决方案
- 修改
/proc/sys/net/ipv4/ip_local_port_range指定系统分配的端口为1024 60000,然后将 EMQ 的业务端口分配为 60000 之后的端口
实践案例
建议使用 kubernetes 来编排 docker 容器:
- EMQ 可以通过
kube-apiserver来实现自动集群的功能。 - 该客户目前只是在单机部署docker集群,使用 kubernetes 可以轻易实现多个节点之间部署集群。
- kubernetes 的
deployment可以监控emqx pod的状态,实现自动重启、弹性扩容等功能。 - 每个
emqx pod都有独立的虚拟 IP,不会出现端口冲突的问题。 - kubernetes 的
Service可以实现固定 IP 和负载均衡的需求,在Service创建的请求中,可以通过设置spec.clusterIP字段来指定自己的集群 IP 地址,将 Nginx 的代理 server 设置成clusterIP即可,Service可自行实现负载均衡。
容器
集群
端口
代理
客户
脚本
业务
宿主
宿主机
配置
方式
均衡
功能
地址
状态
目录
分配
运行
可以通过
网络
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
传奇3单机数据库启动不了
完美世界挂箱服务器
台湾软件开发定制
网络安全与管理试卷
您的数据库有异常
网络技术能不能建立中文系统
电脑服务器账号密码忘了怎么办
计算机网络安全漏洞防范措施提要
厦门软件开发学习
数据库中查询索引是否存在
软件开发遵循iso标准
蜀桑源软件开发 电话
尚云客网络技术招聘
怎么设计网站的sql数据库
中国银行软件开发中心加班严重吗
无线同屏服务器
t300 服务器
地理信息数据库的概念
b2b2c数据库设计
app软件开发专业大学排名
杭州的互联网科技有限公司
怎么安装软件到本地服务器上
乡镇网络安全隐患排查报告
wifi的网络安全密钥
arm处理器软件开发员手册
服务器挂载是啥
电子科技大学工业互联网特色专业
做网络安全手抄报怎么做
智能科技互联网公司
物联网网络安全防范措施