socket-mqtt: Netty4.x + MQTT

这是一个基于Netty4.x + MQTT实现的Push推送基础框架。相比于原生Netty,

socket-mqtt:

为C/S模式开发封装简单统一的编程模式

简单高性能的代码

统一的连接管理方案

统一的线程管理方案

网络基础问题的解决与支持:如心跳保持、压缩解压缩、编码与解码、加密与解密等

各种网络参数、连接池实现、监听器实现等可配置可替换

可实现对等集群

提供数据统计/监控组件

支持普通socket、MQTT、MQTT web socket协议

项目结构

codec: 封装编码与解码

compression: 封装压缩与解压缩

count: 封装统计信息

database: 基于hsql的内存数据库

encrypt: 封装加密与解密

future: 封装同步和异步调用

listener: 封装事件监听,包括消息、通道、异常三类事件监听器

service: 封装C/S模型、通道、心跳管理、消息分发等核心模块

Linux内核参数配置

# 允许回收TCP连接

net.ipv4.tcp_tw_reuse = 1

net.ipv4.tcp_tw_recycle = 1

# TCP 缓冲区内存

net.ipv4.tcp_rmem = 4096 87380 8388608

net.ipv4.tcp_wmem = 4096 87380 8388608

net.ipv4.tcp_mem = 94500000 915000000 927000000

# ulimits 优化

fs.file-max = 1065353

kernel.pid_max = 65536

*softnofile655360

*hardnofile655360

压测报告

单Broker8核16G,支持44万连接;1万客户端 单消息1024B 下行tps: 16万+;

4000客户端 Publish 单消息1024B 上行tps: 17万+,千兆网卡流量基本打满。

备注:Mqtt Server启动内存只分配了5G,如果分配到10G,理论上可以支持百万连接。还有,测试开启了心跳上报。

消息下行能力

1万Clients订阅的消息下行能力

对应下行负载情况

消息上行能力

4000Clients订阅消息上行能力

对应上行负载情况

查看连接数情况

查看连接数(telnet 10.43.204.61 8001; get status)

查看连接数(ss -l)

使用说明

各种测试类的源码在src/test/java/com/yb/socket包路径下:

包括:

普通socket Server/Client

MQTT socket Server/Client

带注册中心的普通socket/MQTT socket

基于内存数据库的模拟订阅推送

服务启动配置选项

Server server = new Server();

// 设置Broker端口

server.setPort(8000);

// 设置启动信息统计。默认true

server.setOpenCount(true);

// 设置启用心跳功能。默认true

server.setCheckHeartbeat(true);

// 设置启动服务状态,默认端口8001。通过telnet server_ip 8001; get status查看服务信息

server.setOpenStatus(true);

// 服务状态端口。默认8001

server.setStatusPort(8001);

// 设置服务名称

server.setServiceName("Demo");

// 设置工作线程数量。默认CPU个数+1

server.setWorkerCount(64);

// 是否开户业务处理线程池。默认false

server.setOpenExecutor(true);

// 设置tcp no delay。默认true

server.setTcpNoDelay(true);

// 是否启用keepAlive。默认true

server.setKeepAlive(true);

// 自定义监听器,可处理相关事件

server.addEventListener(new EchoMessageEventListener());

// 设置Broker启动协议。SocketType.MQTT - MQTT协议; SocketType.NORMAL - 普通Socket协议;SocketType.MQTT_WS - MQTT web socket协议;

server.setSocketType(SocketType.MQTT);

// 绑定端口启动服务

server.bind();

MQTT web socket server DEMO

Server server = new Server();

server.setPort(8000);

server.addEventListener(new EchoMessageEventListener());

server.setSocketType(SocketType.MQTT_WS);

server.bind();

//模拟推送

String message = "this is a web socket message!";

MqttRequest mqttRequest = new MqttRequest((message.getBytes()));

while (true) {

if (server.getChannels().size() > 0) {

logger.info("模拟推送消息");

for (WrappedChannel channel : server.getChannels().values()) {

server.send(channel, "yb/notice/", mqttRequest);

}

}

Thread.sleep(1000L);

}

MQTT web socket client(浏览器)

可用在线mqtt测试:http://www.tongxinmao.com/txm/webmqtt.php

TopicPayloadTimeQoS

yb/notice/this is a web socket message!2019-2-27 16:54:540

Normal socket server DEMO

Server server = new Server();

server.setPort(8000);

server.addEventListener(new JsonEchoMessageEventListener());

server.addChannelHandler("decoder", new JsonDecoder());

server.addChannelHandler("encoder", new JsonEncoder());

server.bind();

//模拟推送

JSONObject message = new JSONObject();

message.put("action", "echo");

message.put("message", "this is a normal socket message!");

Request request = new Request();

request.setSequence(0);

request.setMessage(message);

while (true) {

if (server.getChannels().size() > 0) {

logger.info("模拟推送消息");

for (WrappedChannel channel : server.getChannels().values()) {

channel.send(request);

Thread.sleep(5000L);

}

}

}

Normal socket client DEMO

Client client = new Client();

client.setIp("127.0.0.1");

client.setPort(8000);

client.setConnectTimeout(10000);

client.addChannelHandler("decoder", new JsonDecoder());

client.addChannelHandler("encoder", new JsonEncoder());

client.connect();

for (int i = 0; i < 2; i++) {

JSONObject message = new JSONObject();

message.put("action", "echo");

message.put("message", "hello world!");

Request request = new Request();

request.setSequence(i);

request.setMessage(message);

Response response = (Response) client.sendWithSync(request, 3000);

logger.info("成功接收到同步的返回: '{}'.", response);

}

client.shutdown();

带注册中心 center DEMO

Server server = new Server();

server.setPort(9000);

server.setCheckHeartbeat(false);

server.addChannelHandler("decoder", new JsonDecoder());

server.addChannelHandler("encoder", new JsonEncoder());

server.addEventListener(new com.yb.socket.center.CenterMockMessageEventListener());

server.bind();

带注册中心 server DEMO

Server server = new Server();

server.setPort(8000);

server.setCheckHeartbeat(false);

server.setCenterAddr("127.0.0.1:9000,127.0.0.1:9010");

server.addEventListener(new JsonEchoMessageEventListener());

server.bind();

带注册中心 client DEMO

Client client = new Client();

client.setCheckHeartbeat(false);

client.setCenterAddr("127.0.0.1:9000,127.0.0.1:9010");

client.addChannelHandler("decoder", new JsonDecoder());

client.addChannelHandler("encoder", new JsonEncoder());

client.connect();

JSONObject message = new JSONObject();

message.put("action", "echo");

message.put("message", "hello");

for (int i = 0; i < 5; i++) {

Request request = new Request();

request.setSequence(i);

request.setMessage(message);

client.send(request);

Thread.sleep(5000L);

}

后续规划

支持MQTT主题过滤机制

支持SSL连接方式

完整的QoS服务质量等级实现DEMO

遗嘱消息, 保留消息及消息分发重试

压测工具

参考项目

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐