设备端检测自己是否在线

[复制链接]
581|0
 楼主| IoT8888 发表于 2019-9-4 15:13 | 显示全部楼层 |阅读模式
ST, ic, ce, TE, link
基于MQTT接入的设备靠心跳保活,但心跳是周期性的、且自动收发和超时重连,这些特性给主动检测设备端是否在线带来了一定难度。本文提供通过消息收发是否正常判定设备是否在线的原理、流程、实现方式。

原理
如果设备可以发送、接收消息,那么该设备的通信是没问题的,并且一定在线。

消息收发是物联网平台的核心能力。因此,这种判定方法不会因为物联网平台架构升级或业务变动而变化,也不会因为设备使用的客户端不同而不同。是设备端检测自己是否在线最通用的一种原理。

该原理的一种特殊实现就是设备端消息的自发自收。

流程
创建Topic = /yourProductKey/y**iceName/user/checkstatus。
Topic可以自定义,但权限必须为发布和订阅。

设备端订阅上一步创建的Topic 。
设备端发送消息{"id":123,"version":"1.0","time":1234567890123},请一定使用QoS=0 。
消息内容可自定义,但建议使用此格式。

参数说明:


设备端收到消息上一步发送的消息。
离线判定逻辑

严格的:发送消息后,5秒内没有收到消息算失败,出现1次失败,判定为离线
普通的:发送消息后,5秒内没有收到消息算失败,连续2次失败,判定为离线
宽松的:发送消息后,5秒内没有收到消息算失败,连续3次失败,判定为离线
说明 您可以根据自己的情况,自定义离线判定逻辑。
实现
为方便体验,本例基于Java SDK Demo开发,实现设备端检测自己是否在线的严格判定逻辑。

Java SDK开发具体细节,请查看相关文档。
说明 您可以根据自己的喜好,选择不同的设备端SDK进行开发。
首先,下载Demo工程,添加本类,并填写设备证书信息。设备端代码如下:

  1. import java.io.UnsupportedEncodingException;

  2. import com.aliyun.alink.dm.api.DeviceInfo;
  3. import com.aliyun.alink.dm.api.InitResult;
  4. import com.aliyun.alink.linkkit.api.ILinkKitConnectListener;
  5. import com.aliyun.alink.linkkit.api.IoTMqttClientConfig;
  6. import com.aliyun.alink.linkkit.api.LinkKit;
  7. import com.aliyun.alink.linkkit.api.LinkKitInitParams;
  8. import com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest;
  9. import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest;
  10. import com.aliyun.alink.linksdk.cmp.core.base.AMessage;
  11. import com.aliyun.alink.linksdk.cmp.core.base.ARequest;
  12. import com.aliyun.alink.linksdk.cmp.core.base.AResponse;
  13. import com.aliyun.alink.linksdk.cmp.core.base.ConnectState;
  14. import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener;
  15. import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSendListener;
  16. import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener;
  17. import com.aliyun.alink.linksdk.tools.AError;

  18. public class CheckDeviceStatusOnDevice {

  19.         // ===================需要用户填写的参数,开始===========================
  20.         // 产品productKey,设备证书参数之一
  21.         private static String productKey = "";
  22.         // 设备名字deviceName,设备证书参数之一
  23.         private static String deviceName = "";
  24.         // 设备密钥deviceSecret,设备证书参数之一
  25.         private static String deviceSecret = "";
  26.         // 消息通信的Topic,需要在控制台定义,权限必须为发布和订阅
  27.         private static String checkStatusTopic = "/" + productKey + "/" + deviceName + "/user/checkstatus";
  28.         // ===================需要用户填写的参数结束===========================

  29.         // 接收到的消息
  30.         private static String subInfo = "";

  31.         public static void main(String[] args) throws InterruptedException {

  32.                 CheckDeviceStatusOnDevice device = new CheckDeviceStatusOnDevice();

  33.                 // 初始化
  34.                 device.init(productKey, deviceName, deviceSecret);

  35.                 // 下行数据监听
  36.                 device.registerNotifyListener();

  37.                 // 订阅Topic
  38.                 device.subscribe(checkStatusTopic);

  39.                 // 测试设备状态
  40.                 System.out.println("we will check device online status now.");
  41.                 device.checkStatus();

  42.                 // 准备测试设备离线状态,请拔掉网线
  43.                 System.out.println("pls close network,we will check device offline status after 60 seconds.");
  44.                 for (int i = 0; i < 6; i++) {
  45.                         Thread.sleep(10000);
  46.                 }
  47.                 device.checkStatus();
  48.         }

  49.         /**
  50.          * 测试设备状态
  51.          *
  52.          * @throws InterruptedException
  53.          */
  54.         public void checkStatus() throws InterruptedException {

  55.                 // -------------------------------------------------------------------
  56.                 // 要发送的消息,可以自定义,建议使用当前格式
  57.                 // -------------------------------------------------------------------
  58.                 // Field   | Tyep   | Desc
  59.                 // -------------------------------------------------------------------
  60.                 // id      | Object | 用于验证收发的消息是否是同一个,请自行业务层保证唯一
  61.                 // -------------------------------------------------------------------
  62.                 // version | String | 版本号固定1.0
  63.                 // -------------------------------------------------------------------
  64.                 // time    | Long   | 发送消息的时间戳,可以计算消息来回的延时,评估当前的通信质量
  65.                 // -------------------------------------------------------------------
  66.                 String payload = "{"id":123, "version":"1.0","time":" + System.currentTimeMillis() + "}";

  67.                 // 发送消息
  68.                 publish(checkStatusTopic, payload);

  69.                 // 严格的离线判定逻辑:发送消息后,5秒内没有收到消息算失败,出现1次失败,判定为离线
  70.                 boolean isTimeout = true;
  71.                 for (int i = 0; i < 5; i++) {
  72.                         Thread.sleep(1000);
  73.                         if (!subInfo.isEmpty()) {
  74.                                 isTimeout = false;
  75.                                 break;
  76.                         }
  77.                 }
  78.                 if (!isTimeout && payload.equals(subInfo)) {
  79.                         System.out.println("Device is online !!");
  80.                 } else {
  81.                         System.out.println("Device is offline !!");
  82.                 }

  83.                 // 置空接收到的消息,方便下一次测试
  84.                 subInfo = "";
  85.         }

  86.         /**
  87.          * 初始化
  88.          *
  89.          * @param pk productKey
  90.          * @param dn devcieName
  91.          * @param ds deviceSecret
  92.          * @throws InterruptedException
  93.          */
  94.         public void init(String pk, String dn, String ds) throws InterruptedException {

  95.                 LinkKitInitParams params = new LinkKitInitParams();

  96.                 // 设置 MQTT 初始化参数
  97.                 IoTMqttClientConfig config = new IoTMqttClientConfig();
  98.                 config.productKey = pk;
  99.                 config.deviceName = dn;
  100.                 config.deviceSecret = ds;
  101.                 params.mqttClientConfig = config;

  102.                 // 设置初始化设备证书信息,用户传入
  103.                 DeviceInfo deviceInfo = new DeviceInfo();
  104.                 deviceInfo.productKey = pk;
  105.                 deviceInfo.deviceName = dn;
  106.                 deviceInfo.deviceSecret = ds;

  107.                 params.deviceInfo = deviceInfo;

  108.                 LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
  109.                         @Override
  110.                         public void onInitDone(InitResult initResult) {
  111.                                 System.out.println("init success !!");
  112.                         }

  113.                         @Override
  114.                         public void onError(AError aError) {
  115.                                 System.out.println("init failed !! code=" + aError.getCode() + ",msg=" + aError.getMsg() + ",subCode="
  116.                                                 + aError.getSubCode() + ",subMsg=" + aError.getSubMsg());
  117.                         }
  118.                 });

  119.                 // 确保初始化成功后才执行后面的步骤,可以根据实际情况适当延长这里的延时
  120.                 Thread.sleep(2000);
  121.         }

  122.         /**
  123.          * 监听下行数据
  124.          */
  125.         public void registerNotifyListener() {
  126.                 LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
  127.                         @Override
  128.                         public boolean shouldHandle(String connectId, String topic) {
  129.                                 // 只处理特定Topic的消息
  130.                                 if (checkStatusTopic.equals(topic)) {
  131.                                         return true;
  132.                                 } else {
  133.                                         return false;
  134.                                 }
  135.                         }

  136.                         @Override
  137.                         public void onNotify(String connectId, String topic, AMessage aMessage) {
  138.                                 // 接收消息
  139.                                 try {
  140.                                         subInfo = new String((byte[]) aMessage.getData(), "UTF-8");
  141.                                 } catch (UnsupportedEncodingException e) {
  142.                                         e.printStackTrace();
  143.                                 }
  144.                         }

  145.                         @Override
  146.                         public void onConnectStateChange(String connectId, ConnectState connectState) {
  147.                         }
  148.                 });
  149.         }

  150.         /**
  151.          * 发布消息
  152.          *
  153.          * @param topic 发送消息的Topic
  154.          * @param payload 发送的消息内容
  155.          */
  156.         public void publish(String topic, String payload) {
  157.                 MqttPublishRequest request = new MqttPublishRequest();
  158.                 request.topic = topic;
  159.                 request.payloadObj = payload;
  160.                 request.qos = 0;
  161.                 LinkKit.getInstance().getMqttClient().publish(request, new IConnectSendListener() {
  162.                         @Override
  163.                         public void onResponse(ARequest aRequest, AResponse aResponse) {
  164.                         }

  165.                         @Override
  166.                         public void onFailure(ARequest aRequest, AError aError) {
  167.                         }
  168.                 });
  169.         }

  170.         /**
  171.          * 订阅消息
  172.          *
  173.          * @param topic 订阅消息的Topic
  174.          */
  175.         public void subscribe(String topic) {
  176.                 MqttSubscribeRequest request = new MqttSubscribeRequest();
  177.                 request.topic = topic;
  178.                 LinkKit.getInstance().getMqttClient().subscribe(request, new IConnectSubscribeListener() {
  179.                         @Override
  180.                         public void onSuccess() {
  181.                         }

  182.                         @Override
  183.                         public void onFailure(AError aError) {
  184.                         }
  185.                 });
  186.         }

  187. }

说明 检测到设备离线后,尽量不要选择主动重连。
物联网平台规定一个设备1分钟只允许尝试接入平台5次,超过会触发限流,限制设备接入。此时停止接入,等待1分钟即可解除限制。

设备端应注意退避,切勿触发限流。
您需要登录后才可以回帖 登录 | 注册

本版积分规则

12

主题

14

帖子

0

粉丝
快速回复 在线客服 返回列表 返回顶部