MQTT库使用简要说明
本项目中使用的是 @ohos/mqtt 库,提供了异步的 MQTT 客户端功能。
1. 客户端创建与连接
this.mqtt = MqttAsync.createMqtt({
url: "mqtt://58.240.18.170:31883", // MQTT服务器地址,支持mqtt和mqtts协议
clientId: "your_client_id", // 客户端ID
persistenceType: 1, // 持久化类型(1表示内存持久化)
});
this.mqtt.connect({ userName: "", password: "", connectTimeout: 30 }, (err: Error, data: MqttResponse) => {
if (data.code === 0) {
this.mqttConnectStatus = true; // 连接成功状态标记
}
});使用
MqttAsync.createMqtt创建客户端实例。调用
connect方法发起连接,传入连接参数(用户名+密码+超时)及回调。通过回调判断连接是否成功。
2. 主题订阅
this.mqtt.subscribe({ topic: "order/service/sync", qos: 2 }, (err: Error, data: MqttResponse) => {
// 订阅结果回调
});使用
subscribe方法订阅指定主题。参数包含主题名和 QoS 等级。
订阅完成后会调用回调,可根据错误或返回数据判断结果。
时序图
CONNECT/CONNACK:客户端(发布者和订阅者)与Broker建立长连接。
SUBSCRIBE/SUBACK:订阅者订阅主题,Broker确认订阅。
PUBLISH:发布者将消息发送到Broker,Broker根据订阅将消息推送给订阅者。
PUBACK:QoS 1模式下,订阅者确认收到消息。
DISCONNECT:客户端断开连接。
3. 取消订阅
this.mqtt.unsubscribe({ topic: "order/service/sync", qos: 2 }, () => {
console.info("取消订阅成功");
});调用
unsubscribe取消指定主题的订阅。取消成功后回调通知。
4. 消息处理回调
this.mqtt.messageArrived((err: Error, data: MqttMessage) => {
// data.topic 是消息主题
// data.payload 是消息内容
if (data.topic === "order/service/sync") {
// 处理同步消息
} else if (data.topic === "order/service/lamps") {
// 处理灯光消息
}
// 其他主题消息处理
});使用
messageArrived方法注册消息到达回调。每当订阅的主题收到消息时,回调被触发。
根据
data.topic判断消息来源,解析data.payload进行业务处理。
示例消息1(主题 order/service/sync)
{ "topic": "order/service/sync", "payload": "{\"sn\":\"12345\",\"device\":[1,0,1,0],\"lamps\":[1,0,1,0,1,0,1,0,1,0]}", "qos": 2, "retained": false}- 这里
payload是字符串,里面包含序列化的JSON,必须先JSON.parse()解析为对象。 - 设备状态(机械臂、机床等)保存在
device数组中。 - 指示灯状态保存在
lamps数组中。
示例消息2(主题 order/service/lamps)
{ "topic": "order/service/lamps", "payload": "{\"sn\":\"12345\",\"index\":3,\"action\":1}", "qos": 2, "retained": false}- 用来控制单个指示灯的开关。
index指灯的编号,action 1代表开,0代表关。
示例消息3(主题 order/service/switch)
{ "topic": "order/service/switch", "payload": "{\"sn\":\"12345\",\"index\":1,\"action\":0}", "qos": 2, "retained": false}- 用来控制设备开关状态。
index是设备编号,action 1为开,0为关。
本项目中的MQTT订阅/取消订阅及消息处理说明
订阅流程(subscribe 方法)
判断 MQTT 是否已连接,未连接则先调用
connect。连接成功后,调用
subscribeData方法订阅三个主题:"order/service/sync""order/service/lamps""order/service/switch"
订阅成功后,向主题
"order/adapter/sync"发布一条消息,通知服务端同步状态。
取消订阅流程(cancellationSubscribe 方法)
判断当前是否已订阅(通过状态值判断)。
调用
unsubscribe方法,分别取消订阅三个主题:"order/service/sync""order/service/lamps""order/service/switch"
取消成功后,更新UI状态(恢复按钮显示等)。
消息处理(messageArrived 回调)
监听所有订阅主题消息。
解析消息内容,验证消息中的
sn是否和本地保存的设备编号匹配,确保处理的是自己的数据。根据主题不同执行不同业务逻辑:
"order/service/sync":更新设备状态及指示灯状态数组。"order/service/lamps":更新单个指示灯状态。"order/service/switch":更新机械臂、机床、传送带和巡检车状态。
总结
| 功能 | 代码方法 | 说明 |
|---|---|---|
| 创建客户端 | MqttAsync.createMqtt() | 创建MQTT客户端实例 |
| 建立连接 | mqtt.connect() | 连接MQTT服务器,触发连接回调 |
| 订阅主题 | mqtt.subscribe() | 订阅一个或多个主题 |
| 取消订阅 | mqtt.unsubscribe() | 取消指定主题订阅 |
| 接收消息 | mqtt.messageArrived() | 注册消息接收回调,处理订阅主题消息 |
| 发布消息 | mqtt.publish() | 发送消息到指定主题 |