Skip to content

MQTT库使用简要说明

本项目中使用的是 @ohos/mqtt 库,提供了异步的 MQTT 客户端功能。

1. 客户端创建与连接

typescript
this.mqtt = MqttAsync.createMqtt({
  url: "mqtt://58.240.18.170:31883",   // MQTT服务器地址,支持mqtt和mqtts协议
  clientId: "your_client_id",           // 客户端ID
  persistenceType: 1,                   // 持久化类型(1表示内存持久化)
});

this.mqtt.connect({ userName: "", password: "", connectTimeout: 30 }, (err: Error, data: MqttResponse) => {
  if (data.code === 0) {
    this.mqttConnectStatus = true;     // 连接成功状态标记
  }
});
  • 使用 MqttAsync.createMqtt 创建客户端实例。

  • 调用 connect 方法发起连接,传入连接参数(用户名+密码+超时)及回调。

  • 通过回调判断连接是否成功。


2. 主题订阅

typescript
this.mqtt.subscribe({ topic: "order/service/sync", qos: 2 }, (err: Error, data: MqttResponse) => {
  // 订阅结果回调
});
  • 使用 subscribe 方法订阅指定主题。

  • 参数包含主题名和 QoS 等级。

  • 订阅完成后会调用回调,可根据错误或返回数据判断结果。

时序图image.png

  • CONNECT/CONNACK:客户端(发布者和订阅者)与Broker建立长连接。

  • SUBSCRIBE/SUBACK:订阅者订阅主题,Broker确认订阅。

  • PUBLISH:发布者将消息发送到Broker,Broker根据订阅将消息推送给订阅者。

  • PUBACK:QoS 1模式下,订阅者确认收到消息。

  • DISCONNECT:客户端断开连接。


3. 取消订阅

typescript
this.mqtt.unsubscribe({ topic: "order/service/sync", qos: 2 }, () => {
  console.info("取消订阅成功");
});
  • 调用 unsubscribe 取消指定主题的订阅。

  • 取消成功后回调通知。


4. 消息处理回调

typescript
this.mqtt.messageArrived((err: Error, data: MqttMessage) => {
  // data.topic 是消息主题
  // data.payload 是消息内容
  if (data.topic === "order/service/sync") {
    // 处理同步消息
  } else if (data.topic === "order/service/lamps") {
    // 处理灯光消息
  }
  // 其他主题消息处理
});
  • 使用 messageArrived 方法注册消息到达回调。

  • 每当订阅的主题收到消息时,回调被触发。

  • 根据 data.topic 判断消息来源,解析 data.payload 进行业务处理。

示例消息1(主题 order/service/sync)

json
{  "topic": "order/service/sync",  "payload": "{\"sn\":\"12345\",\"device\":[1,0,1,0],\"lamps\":[1,0,1,0,1,0,1,0,1,0]}",  "qos": 2,  "retained": false}
  • 这里payload是字符串,里面包含序列化的JSON,必须先JSON.parse()解析为对象。
  • 设备状态(机械臂、机床等)保存在device数组中。
  • 指示灯状态保存在lamps数组中。

示例消息2(主题 order/service/lamps)

json
{  "topic": "order/service/lamps",  "payload": "{\"sn\":\"12345\",\"index\":3,\"action\":1}",  "qos": 2,  "retained": false}
  • 用来控制单个指示灯的开关。
  • index指灯的编号,action 1代表开,0代表关。

示例消息3(主题 order/service/switch)

json
{  "topic": "order/service/switch",  "payload": "{\"sn\":\"12345\",\"index\":1,\"action\":0}",  "qos": 2,  "retained": false}
  • 用来控制设备开关状态。
  • index是设备编号,action 1为开,0为关。

本项目中的MQTT订阅/取消订阅及消息处理说明

订阅流程(subscribe 方法)

  • 判断 MQTT 是否已连接,未连接则先调用 connect

  • 连接成功后,调用 subscribeData 方法订阅三个主题:

    • "order/service/sync"

    • "order/service/lamps"

    • "order/service/switch"

  • 订阅成功后,向主题 "order/adapter/sync" 发布一条消息,通知服务端同步状态。

取消订阅流程(cancellationSubscribe 方法)

  • 判断当前是否已订阅(通过状态值判断)。

  • 调用 unsubscribe 方法,分别取消订阅三个主题:

    • "order/service/sync"

    • "order/service/lamps"

    • "order/service/switch"

  • 取消成功后,更新UI状态(恢复按钮显示等)。

消息处理(messageArrived 回调)

  • 监听所有订阅主题消息。

  • 解析消息内容,验证消息中的 sn 是否和本地保存的设备编号匹配,确保处理的是自己的数据。

  • 根据主题不同执行不同业务逻辑:

    • "order/service/sync":更新设备状态及指示灯状态数组。

    • "order/service/lamps":更新单个指示灯状态。

    • "order/service/switch":更新机械臂、机床、传送带和巡检车状态。


总结

功能代码方法说明
创建客户端MqttAsync.createMqtt()创建MQTT客户端实例
建立连接mqtt.connect()连接MQTT服务器,触发连接回调
订阅主题mqtt.subscribe()订阅一个或多个主题
取消订阅mqtt.unsubscribe()取消指定主题订阅
接收消息mqtt.messageArrived()注册消息接收回调,处理订阅主题消息
发布消息mqtt.publish()发送消息到指定主题