#前言

主要讲下Android如何使用MQTT通讯。用到的软件或者框架有:

如果已经有MQTT相关服务,可以跳过第一项,从第二项开始看。

#一、安装EMQ及启动EMQ

1.安装所需要的依赖包

$ sudo yum install -y yum-utils device-mapper-persistent-data lvm2

复制代码

2.使用以下命令设置稳定存储库,以 CentOS7 为例

$ sudo yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/7/emqx-ce.repo

复制代码

3.安装最新版本的 EMQ X

$ sudo yum install emqx

复制代码

4.安装特定版本的 EMQ X

查询可用版本

$ yum list emqx --showduplicates | sort -r

emqx.x86_64 3.1.0-1.el7 emqx-stable

emqx.x86_64 3.0.1-1.el7 emqx-stable

emqx.x86_64 3.0.0-1.el7 emqx-stable

复制代码根据第二列中的版本字符串安装特定版本,例如 3.1.0

$ sudo yum install emqx-3.1.0

复制代码

5.启动 EMQ X

直接启动

$ emqx start

emqx 3.1.0 is started successfully!

$ emqx_ctl status

Node 'emqx@127.0.0.1' is started

emqx v3.1.0 is running

复制代码systemctl 启动

$ sudo systemctl start emqx

复制代码service 启动

$ sudo service emqx start

复制代码

EMQ管理后台

地址:xxx.xxx.xxx:18083,地址为服务器ip或者域名,端口为18083端口877e33d582dfdd7d5049aafb559939fd.png

二、Android使用MQTT

1.在Android中导入依赖

implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'

复制代码

项目地址:github.com/eclipse/pah…

2.创建MQTT连接的一个Service

public class MqttMessageService extends Service {

/**

* 订阅的主题

*/

private String subTopic;

@Override

public void onCreate() {

super.onCreate();

//初始化mqtt配置

initMqtt();

//连接mqtt

connectMqtt();

}

/**

* 连接mqtt

*/

private void connectMqtt() {

try {

mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {

@Override

public void onSuccess(IMqttToken asyncActionToken) {

}

@Override

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {

LogUtils.e("MQTT连接失败!!!!" + exception.getCause());

new Thread(new Runnable() {

@Override

public void run() {

LogUtils.e("30s后,尝试重新连接" );

try {

Thread.sleep(1000*30);

} catch (InterruptedException e) {

e.printStackTrace();

}

connectMqtt();

}

}).start();

}

});

} catch (MqttException e) {

e.printStackTrace();

}

}

/**

* mqtt初始化

*/

private void initMqtt() {

try {

//写上自己的url

String uri = "";

subTopic = "topic_test" ;

mqttAndroidClient = new MqttAndroidClient(

getApplicationContext(),

uri, "test");

mqttAndroidClient.setCallback(new MqttCallbackExtended() {

@Override

public void connectComplete(boolean reconnect, String serverURI) {

if (reconnect) {

LogUtils.i("MQTT重新连接成功!serverURI:" + serverURI);

} else {

LogUtils.i("MQTT连接成功!serverURI:" + serverURI);

}

subscribeAllTopics();

}

@Override

public void connectionLost(Throwable cause) {

LogUtils.i("MQTT连接断开!" + cause.getCause());

}

@Override

public void messageArrived(String topic, MqttMessage message) {

LogUtils.i("收到了消息:"+topic+ message.toString()+ message.getQos());

}

@Override

public void deliveryComplete(IMqttDeliveryToken token) {

try {

// MqttMessage mqttMessage = token.getMessage();

// LogUtils.i("消息发送成功:" + mqttMessage.toString());

} catch (MqttException e) {

e.printStackTrace();

}

}

});

// 新建连接设置

mqttConnectOptions = new MqttConnectOptions();

mqttConnectOptions.setUserName(MqttConfig.EMQ_USERNAME);

mqttConnectOptions.setPassword(MqttConfig.EMQ_PASSWORD.toCharArray());

//断开后,是否自动连接

mqttConnectOptions.setAutomaticReconnect(true);

//是否清空客户端的连接记录。若为true,则断开后,broker将自动清除该客户端连接信息

mqttConnectOptions.setCleanSession(true);

//设置超时时间,单位为秒

mqttConnectOptions.setConnectionTimeout(15);

//心跳时间,单位为秒。即多长时间确认一次Client端是否在线

mqttConnectOptions.setKeepAliveInterval(30);

//允许同时发送几条消息(未收到broker确认信息)

mqttConnectOptions.setMaxInflight(30);

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 订阅所有主题

*/

private void subscribeAllTopics() {

//订阅主消息主题和更新消息主题

subscribeToTopic(subTopic , 2);

}

/**

* 订阅一个主主题

*

* @param subTopic 主题名称

*/

private void subscribeToTopic(String subTopic, int qos) {

try {

mqttAndroidClient.subscribe(subTopic, qos, null, new IMqttActionListener() {

@Override

public void onSuccess(IMqttToken asyncActionToken) {

LogUtils.i("MQTT订阅消息成功:" + subTopic);

}

@Override

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {

LogUtils.d("MQTT订阅消息失败!" + subTopic);

}

});

} catch (MqttException ex) {

LogUtils.d("subscribeToTopic: Exception whilst subscribing");

ex.printStackTrace();

}

}

/**

* 发布主题

*

* @param topic 主题

* @param msg 内容

* @param qos qos

*/

public void publishMessage(String topic, String msg, int qos) {

if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {

try {

LogUtils.d("publishMessage: 发送" + msg);

mqttAndroidClient.publish(topic, msg.getBytes(), qos, false);

} catch (Exception e) {

LogUtils.e("publishMessage: Error Publishing: " + e.getMessage());

e.printStackTrace();

}

} else {

LogUtils.e("publishMessage失败,MQTT未连接 ");

}

}

public void publishMessage(String topic, String msg) {

publishMessage(topic, msg, 2);

}

@Override

public int onStartCommand(Intent intent, int flags, int startId) {

return Service.START_NOT_STICKY;

}

@Override

public void onDestroy() {

LogUtils.e("关闭MQTT");

//断开mqtt连接

try {

if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {

mqttAndroidClient.disconnect();

mqttAndroidClient.unregisterResources();

}

} catch (Exception e) {

e.printStackTrace();

}

super.onDestroy();

}

}

复制代码

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐