打印

边缘计算框架edgex添加mqtt

[复制链接]
864|0
手机看帖
扫描二维码
随时随地手机跟帖
跳转到指定楼层
楼主
keer_zu|  楼主 | 2020-4-28 10:01 | 只看该作者 |只看大图 回帖奖励 |倒序浏览 |阅读模式

                                            EdgeX & MQTT
                    
                                    
                     
               
                原文:https://docs.edgexfoundry.org/Ch-ExamplesAddingMQTTDevice.html
EdgeX - Edinburgh Release
1.概述下面例子中,我们用模拟器来代替真实设备,这样可以直接的来测试MQTT特性

运行MQTT BrokerEclipse Mosquitto是一个开源(EPL/EDL licensed) 的message Broker,它支持MQTT协议5.0, 3.1.1以及3.1
使用下面docker命令运行Mosquitto
  docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto
2.运行MQTT设备模拟器
模拟器完成下面三件事情:
  • 每15秒发布随机数据
  • 接收读取reading请求,并返回响应
  • 接收Put请求,然后修改设备值
我们创建下面的脚本来模拟MQTT服务
// mock-device.jsfunction getRandomFloat(min, max) {    return Math.random() * (max - min) + min;}const deviceName = "MQ_DEVICE";let message = "test-message";// 1. Publish random number every 15 secondsschedule('*/15 * * * * *', ()=>{    let body = {        "name": deviceName,               "cmd": "randnum",        "randnum": getRandomFloat(25,29).toFixed(1)    };    publish( 'DataTopic', JSON.stringify(body));});// 2. Receive the reading request, then return the response// 3. Receive the put request, then change the device valuesubscribe( "CommandTopic" , (topic, val) => {    var data = val;        if (data.method == "set") {        message = data[data.cmd]    }else{        switch(data.cmd) {            case "ping":              data.ping = "pong";              break;            case "message":              data.message = message;              break;            case "randnum":                data.randnum = 12.123;                break;          }    }    publish( "ResponseTopic", JSON.stringify(data));});
要运行设备模拟器,输入下面命令并根据你的实际情况做出修改
mv mock-device.js /path/to/mqtt-scriptsdocker run -d --restart=always --name=mqtt-scripts \  -v /path/to/mqtt-scripts:/scripts \  dersimn/mqtt-scripts --url mqtt://mqtt-broker-ip --dir /scripts
  • 将/path/to/mqtt-scripts修改为你的正确路径
  • 将mqtt-broker-ip修改为你的正确的broker ip
设置 setup创建文件夹,在里面放入部署需要的文件
- device-service-demo  |- docker-compose.yml  |- mqtt     |- configuration.toml     |- mqtt.test.device.profile.yml设备文件 Device Profile (mqtt.test.device.profile.yml)设备文件定义了设备的取值和操作方法,可以通过它们来进行读写操作
创建设备文件mqtt.test.device.profile.yml,并添加以下内容
# mqtt.test.device.profile.ymlname: "Test.Device.MQTT.Profile"manufacturer: "iot"model: "MQTT-DEVICE"description: "Test device profile"labels:  - "mqtt"  - "test"deviceResources:  -    name: randnum    description: "device random number"    properties:      value:        { type: "Float64", size: "4", readWrite: "R", floatEncoding: "eNotation" }      units:        { type: "String", readWrite: "R", defaultValue: "" }  -    name: ping    description: "device awake"    properties:      value:        { type: "String", size: "0", readWrite: "R", defaultValue: "pong" }      units:        { type: "String", readWrite: "R", defaultValue: "" }  -    name: message    description: "device message"    properties:      value:        { type: "String", size: "0", readWrite: "W" ,scale: "", offset: "", base: "" }      units:        { type: "String", readWrite: "R", defaultValue: "" }deviceCommands:  -    name: testrandnum    get:    - { index: "1", operation: "get", object: "randnum", parameter: "randnum" }  -    name: testping    get:    - { index: "1", operation: "get", object: "ping", parameter: "ping" }  -    name: testmessage    get:    - { index: "1", operation: "get", object: "message", parameter: "message" }    set:    - { index: "1", operation: "set", object: "message", parameter: "message" }coreCommands:  -    name: testrandnum    get:      path: "/api/v1/device/{deviceId}/testrandnum"      responses:      -        code: "200"        description: "get the random value"        expectedValues: ["randnum"]      -        code: "503"        description: "service unavailable"        expectedValues: []  -    name: testping    get:      path: "/api/v1/device/{deviceId}/testping"      responses:      -        code: "200"        description: "ping the device"        expectedValues: ["ping"]      -        code: "503"        description: "service unavailable"        expectedValues: []  -    name: testmessage    get:      path: "/api/v1/device/{deviceId}/testmessage"      responses:      -        code: "200"        description: "get the message"        expectedValues: ["message"]      -        code: "503"        description: "service unavailable"        expectedValues: []    put:      path: "/api/v1/device/{deviceId}/testmessage"      parameterNames: ["message"]      responses:      -        code: "204"        description: "set the message."        expectedValues: []      -        code: "503"        description: "service unavailable"        expectedValues: []
5.设备服务配置(configuration.toml)使用配置文件去定义设备和调度任务。device-mqtt在启动时生成一个相关的实例。
MQTT是订阅/发布模式,所以我们必须在配置文件的 [DeviceList.Protocols]部分定义MQTT连接信息。
创建配置文件,取名configuration.toml,内容如下,修改Host ip为你的主机IP
# configuration.toml[Writable]LogLevel = 'DEBUG'[Service]Host = "edgex-device-mqtt"Port = 49982ConnectRetries = 3Labels = []OpenMsg = "device mqtt started"Timeout = 5000EnableAsyncReadings = trueAsyncBufferSize = 16[Registry]Host = "edgex-core-consul"Port = 8500CheckInterval = "10s"FailLimit = 3FailWaitTime = 10Type = "consul"[Logging]EnableRemote = falseFile = "./device-mqtt.log"[Clients]  [Clients.Data]  Name = "edgex-core-data"  Protocol = "http"  Host = "edgex-core-data"  Port = 48080  Timeout = 50000  [Clients.Metadata]  Name = "edgex-core-metadata"  Protocol = "http"  Host = "edgex-core-metadata"  Port = 48081  Timeout = 50000  [Clients.Logging]  Name = "edgex-support-logging"  Protocol = "http"  Host ="edgex-support-logging"  Port = 48061[Device]  DataTransform = true  InitCmd = ""  InitCmdArgs = ""  MaxCmdOps = 128  MaxCmdValueLen = 256  RemoveCmd = ""  RemoveCmdArgs = ""  ProfilesDir = "/custom-config"# Pre-define Devices[[DeviceList]]  Name = "MQ_DEVICE"  Profile = "Test.Device.MQTT.Profile"  Description = "General MQTT device"  Labels = [ "MQTT"]  [DeviceList.Protocols]    [DeviceList.Protocols.mqtt]       Schema = "tcp"       Host = "192.168.16.68"       Port = "1883"       ClientId = "CommandPublisher"       User = ""       Password = ""       Topic = "CommandTopic"  [[DeviceList.AutoEvents]]    Frequency = "30s"    OnChange = false    Resource = "testrandnum"# Driver configs[Driver]IncomingSchema = "tcp"IncomingHost = "192.168.16.68"IncomingPort = "1883"IncomingUser = ""IncomingPassword = ""IncomingQos = "0"IncomingKeepAlive = "3600"IncomingClientId = "IncomingDataSubscriber"IncomingTopic = "DataTopic"ResponseSchema = "tcp"ResponseHost = "192.168.16.68"ResponsePort = "1883"ResponseUser = ""ResponsePassword = ""ResponseQos = "0"ResponseKeepAlive = "3600"ResponseClientId = "CommandResponseSubscriber"ResponseTopic = "ResponseTopic"In the Driver configs section:
  • IncomingXxx定义了从设备接收的异步值的数据主题DataTopic
  • ResponseXxx定义了从设备接收的命令响应的响应主题ResponseTopic
6.把设备服务添加到docker-compose文件(docker-compose.yml)下载docker-compose文件 https://github.com/edgexfoundry/ ... edinburgh-1.0.0.yml
由于我们使用docker-compose部署EdgeX,我们必须将device-mqtt添加到docker-compose。If you have prepared configuration files, you can mount them using volumes and change the entrypoint for device-mqtt internal use.
下面是docker-compose的代码片段
device-mqtt:  image: edgexfoundry/docker-device-mqtt-go:1.0.0  ports:    - "49982:49982"  container_name: edgex-device-mqtt  hostname: edgex-device-mqtt  networks:    - edgex-network  volumes:    - db-data:/data/db    - log-data:/edgex/logs    - consul-config:/consul/config    - consul-data:/consul/data    - ./mqtt:/custom-config  depends_on:    - data    - command  entrypoint:    - /device-mqtt    - --registry=consul://edgex-core-consul:8500    - --confdir=/custom-config
当使用设备服务时,用户需要在–registry参数提供registry URL
在Docker上启动EdgeX Foundry下面的文件夹放好之后,我们就可以开始部署EdgeX了
- device-service-demo  |- docker-compose.yml  |- mqtt     |- configuration.toml     |- mqtt.test.device.profile.yml使用下面命令进行EdgeX部署
cd path/to/device-service-demodocker-compose pulldocker-compose up -d服务启动之后,查看控制台面板

执行命令接下来就可以执行相关命令了
7.查找可执行命令使用下面命令去查询可执行命令
  $ curl http://your-edgex-server-ip:48082/api/v1/device | json_pp
  % Total % Received % Xferd Average Speed Time Time Time Current                                 Dload Upload Total Spent Left Speed100 1972 100 1972 0 0 64349 0 --:--:-- --:--:-- --:--:-- 65733[   {      "location" : null,      "adminState" : "UNLOCKED",      "commands" : [         {            ...         },         {            ...         },         {            "get" : {               "responses" : [                  {                     "code" : "503",                     "description" : "service unavailable"                  }               ],               "path" : "/api/v1/device/{deviceId}/testmessage",               "url" : "http://edgex-core-command:48082/api/v1/device/ddb2f5cf-eec2-4345-86ee-f0d87e6f77ff/command/0c257a37-2f72-4d23-b2b1-2c08e895060a"            },            "modified" : 1559195042046,            "name" : "testmessage",            "put" : {               "parameterNames" : [                  "message"               ],               "path" : "/api/v1/device/{deviceId}/testmessage",               "url" : "http://edgex-core-command:48082/api/v1/device/ddb2f5cf-eec2-4345-86ee-f0d87e6f77ff/command/0c257a37-2f72-4d23-b2b1-2c08e895060a"            },            "created" : 1559195042046,            "id" : "0c257a37-2f72-4d23-b2b1-2c08e895060a"         }      ],      "lastReported" : 0,      "operatingState" : "ENABLED",      "name" : "MQ_DEVICE",      "lastConnected" : 0,      "id" : "ddb2f5cf-eec2-4345-86ee-f0d87e6f77ff",      "labels" : [         "MQTT"      ]   }]8.执行Put命令根据url和参数名来执行Put命令,当运行edgex-core-command时,将[host]替换为服务器IP,可以有下面两种方法来执行
$ curl http://your-edgex-server-ip:48082/api/v1/device/ddb2f5cf-eec2-4345-86ee-f0d87e6f77ff/command/0c257a37-2f72-4d23-b2b1-2c08e895060a \    -H "Content-Type:application/json" -X PUT \    -d '{"message":"Hello!"}'或者
$ curl “http://your-edgex-server-ip:48082/api/v1/device/name/MQ_DEVICE/command/testmessage”-H “Content-Type:application/json” -X PUT -d ‘{“message”:”Hello!”}’9.执行Get命令执行Get命令,如下
  $ curl "http://your-edgex-server-ip:48082/api/v1/device/name/MQ_DEVICE/command/testmessage" | json_pp
  % Total % Received % Xferd Average Speed Time Time Time Current                                 Dload Upload Total Spent Left Speed100 139 100 139 0 0 132 0 0:00:01 0:00:01 --:--:-- 132{   "readings" : [      {         "name" : "message",         "device" : "MQ_DEVICE",         "value" : "Hello!",         "origin" : 1559196276732      }   ],   "device" : "MQ_DEVICE",   "origin" : 1559196276738}10.调度任务 Schedule Job调度任务定义在TOML配置文件的[[DeviceList.AutoEvents]]部分
# Pre-define Devices[[DeviceList]]  Name = "MQ_DEVICE"  Profile = "Test.Device.MQTT.Profile"  Description = "General MQTT device"  Labels = [ "MQTT"]  [DeviceList.Protocols]    [DeviceList.Protocols.mqtt]       Schema = "tcp"       Host = "192.168.16.68"       Port = "1883"       ClientId = "CommandPublisher"       User = ""       Password = ""       Topic = "CommandTopic"  [[DeviceList.AutoEvents]]    Frequency = "30s"    OnChange = false    Resource = "testrandnum"服务启动之后,查询core-data的读API。结果显示,服务每30s自动执行一次该命令,显示如下
  $ curl http://your-edgex-server-ip:48080/api/v1/reading | json_pp
  % Total % Received % Xferd Average Speed Time Time Time Current                                 Dload Upload Total Spent Left Speed100 1613 100 1613 0 0 372k 0 --:--:-- --:--:-- --:--:-- 393k[   {      "value" : "1.212300e+01",      "origin" : 1559197206092,      "modified" : 1559197206104,      "id" : "59f2a768-ad72-49a1-9df9-700d8599a890",      "created" : 1559197206104,      "device" : "MQ_DEVICE",      "name" : "randnum"   },   {      ...   },   {      "name" : "randnum",      "device" : "MQ_DEVICE",      "modified" : 1559197175109,      "created" : 1559197175109,      "id" : "f9dc39e0-5326-45d0-831d-fd0cd106fe2f",      "origin" : 1559197175098,      "value" : "1.212300e+01"   },]异步数据读取
device-mqtt订购一个DataTopic, 它等待真实设备发送数据给broker,接下来device-mqtt解析这个值,并把它发回给core-data
数据格式包含下面值
  • name = device name
  • cmd = deviceResource name
  • method = get or put
  • cmd = device reading
你必须在设备配置文件里定义连接信息,如下:
[Driver]IncomingSchema = "tcp"IncomingHost = "192.168.16.68"IncomingPort = "1883"IncomingUser = ""IncomingPassword = ""IncomingQos = "0"IncomingKeepAlive = "3600"IncomingClientId = "IncomingDataSubscriber"IncomingTopic = "DataTopic"下面的结果显示,模拟设备每15毫秒发送一次读取reading
  $ curl http://your-edgex-server-ip:48080/api/v1/reading | json_pp
  % Total % Received % Xferd Average Speed Time Time Time Current                                 Dload Upload Total Spent Left Speed100 539 100 539 0 0 169k 0 --:--:-- --:--:-- --:--:-- 175k[   {      ...   },   {      "name" : "randnum",      "created" : 1559197140013,      "origin" : 1559197140006,      "modified" : 1559197140013,      "id" : "286cc305-42f6-4bca-ad41-3af52301c9f7",      "value" : "2.830000e+01",      "device" : "MQ_DEVICE"   },   {      "modified" : 1559197125011,      "name" : "randnum",      "created" : 1559197125011,      "origin" : 1559197125004,      "device" : "MQ_DEVICE",      "value" : "2.690000e+01",      "id" : "c243e8c6-a904-4102-baff-8a5e4829c4f6"   }]        

使用特权

评论回复

相关帖子

发新帖 我要提问
您需要登录后才可以回帖 登录 | 注册

本版积分规则

个人签名:qq群:49734243 Email:zukeqiang@gmail.com

1341

主题

12401

帖子

53

粉丝