[应用相关] RT-Thread 使用MY MQTT软件包连接mqtt

[复制链接]
41|10
Xiashiqi 发表于 2026-3-11 22:01 | 显示全部楼层 |阅读模式
文章使用RT-Thread 标准版,版本为5.0.2 。使用野火霸道主板,主控为STM32F103VET6。该文章介绍在RT-Thread studio上正常连接公网后开始添加MY MQTT软件包,并实现mqtt数据交互功能。

1.前提
在msh页面使用ping www.baidu.com 可以ping通,如下

8043069b0ba80e914e.png

2.添加MY MQTT软件包
搜索mymtqq软件包

763269b0ba7c795bc.png

打开DFS支持

9494769b0ba763a8cb.png

保存编译,报以下错误

../packages/mymqtt-latest/MQTTClient-C/mqtt_client.c:31:23: fatal error: dfs_posix.h: No such file or directory
compilation terminated.
arm-none-eabi-gcc "../libraries/STM32F1xx_HAL_Driver/Src/stm32f1xx_hal_cortex.c"
arm-none-eabi-gcc "../libraries/STM32F1xx_HAL_Driver/Src/stm32f1xx_hal_crc.c"
arm-none-eabi-gcc "../libraries/STM32F1xx_HAL_Driver/Src/stm32f1xx_hal_dac.c"
make: *** [packages/mymqtt-latest/MQTTClient-C/subdir.mk:18: packages/mymqtt-latest/MQTTClient-C/mqtt_client.o] Error 1
make: *** Waiting for unfinished jobs....
"make -j20 all" terminated with exit code 2. Build might be incomplete.


6838769b0ba6fdd183.png

解决方法如下,在mqtt_client.c文件中添加

struct linger {
       int l_onoff;                /* option on/off */
       int l_linger;               /* linger time */
};


6351569b0ba68208ac.png

编译通过

3.编写用户使用文件
以下给定mqtt 用户模板,可直接拷贝使用。在.c文件中主要修改data_cheak()函数中的应用程序。在.h文件中主要修改mqtt服务器的各种参数。

mqtt_app.c文件



#include <mqtt_client.h>
#include <netdev.h>
#include "mqtt_app.h"





#define DBG_TAG "MQTT"
#define DBG_LVL DBG_LOG
#include <rtdbg.h>

/*
* 全局变量
*/
rt_thread_t Net_thread = RT_NULL;//mqtt线程
char MQTT_URI_0[50] = {0};//临时存储mqtt ip地址端口号   tcp
static int is_started_user = 0; //mqtt是否第一次配置
static mqtt_client client;//mqtt结构体
char MQTT_CH0_Successfulworking = 0; //连网成功标志位
char get_mqtt_data[256];//暂存拷贝接收到的数据  全局变量
char get_data_flag = 0;//接收到mqtt信息标志位


//接收到mqtt主机下发的信息   回调函数
static void mqtt_sub_callback(mqtt_client *c, message_data *msg_data)
{
#if 1
    *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
    LOG_D("mqtt sub callback: %.*s %.*s",
               msg_data->topic_name->lenstring.len,
               msg_data->topic_name->lenstring.data,
               msg_data->message->payloadlen,
               (char *)msg_data->message->payload);

    LOG_D("get:%s\n",(char *)msg_data->message->payload);
#endif

#if 1
    //将mqtt服务器下发的信息拷贝到  get_mqtt_data
    get_data_flag = 1;
    strncat((char *) get_mqtt_data, (const char *)msg_data->message->payload, msg_data->message->payloadlen);
#endif
}

static void mqtt_sub_default_callback(mqtt_client *c, message_data *msg_data)
{
#if 0
    *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
    LOG_D("mqtt sub default callback: %.*s %.*s",
               msg_data->topic_name->lenstring.len,
               msg_data->topic_name->lenstring.data,
               msg_data->message->payloadlen,
               (char *)msg_data->message->payload);
#endif
}
//连接回调函数
static void mqtt_connect_callback(mqtt_client *c)
{
    LOG_I("inter mqtt_connect_callback!");
}
//连接成功回调函数
static void mqtt_online_callback(mqtt_client *c)
{
    LOG_I("inter mqtt_online_callback!");
}
//mqtt离线回调函数
static void mqtt_offline_callback(mqtt_client *c)
{
    LOG_I("inter mqtt_offline_callback!");
}


/*
* 主mqtt配置函数
*/
int mqtt_init_user(void)
{
    /* init condata param by using MQTTPacket_connectData_initializer */
    MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
    static char cid[20] = { 0 };

    if (is_started_user)
    {
       LOG_E("mqtt client is already connected.");
       return -1;
    }
    /* config MQTT context param */
    {
        client.isconnected = 0;
        rt_memcpy(MQTT_URI_0, "\0", sizeof(MQTT_URI_0));
        rt_sprintf(MQTT_URI_0, "tcp://%s:%d", MQTT_URL_New,MQTT_PORT_New);//拷贝ip和端口号
        LOG_I("MQTT_URI_0:%s", MQTT_URI_0);
        client.uri = MQTT_URI_0;

        /* generate the random client ID */
        rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());//连接id
        /* config connect param */
        memcpy(&client.condata, &condata, sizeof(condata));
        client.condata.clientID.cstring = cid;
        client.condata.username.cstring = USER_NAME; //mqtt  用户名
        client.condata.password.cstring = PASS_WORD; //mqtt  密码
        client.condata.keepAliveInterval = 30;
        client.condata.cleansession = 1;

        /* config MQTT will param. */
        client.condata.willFlag = 0;
        client.condata.will.qos = 1;
        client.condata.will.retained = 0;
        client.condata.will.topicName.cstring = will_topic;//遗嘱主题 (无用)
        client.condata.will.message.cstring = MQTT_WILLMSG;//遗嘱信息(无用)

        /* malloc buffer. */
        //修改mqtt内存池   建议修改   可以支持大约4k上报信息 (我实际使用最大3.5k为一包),如果RAM太小可修改该变量
        client.buf_size = client.readbuf_size = 4096;
        client.buf = rt_calloc(1, client.buf_size);
        client.readbuf = rt_calloc(1, client.readbuf_size);
        if (!(client.buf && client.readbuf))
        {
            LOG_E("no memory for MQTT client buffer!");
            return -1;
        }

        /* set event callback function */
        client.connect_callback = mqtt_connect_callback;//回调
        client.online_callback = mqtt_online_callback;
        client.offline_callback = mqtt_offline_callback;

        /* set subscribe table and event callback */
        client.message_handlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC);//订阅主题
        client.message_handlers[0].callback = mqtt_sub_callback;//订阅回调函数
        client.message_handlers[0].qos = QOS1;

        /* set default subscribe event callback */
        client.default_message_handlers = mqtt_sub_default_callback;
    }

    {
      int value;
      uint16_t u16Value;
      value = 5;
      paho_mqtt_control(&client, MQTT_CTRL_SET_CONN_TIMEO, &value);
      value = 5;
      paho_mqtt_control(&client, MQTT_CTRL_SET_MSG_TIMEO, &value);
      value = 5;
      paho_mqtt_control(&client, MQTT_CTRL_SET_RECONN_INTERVAL, &value);
      value = 30;
      paho_mqtt_control(&client, MQTT_CTRL_SET_KEEPALIVE_INTERVAL, &value);
      u16Value = 3;
      paho_mqtt_control(&client, MQTT_CTRL_SET_KEEPALIVE_COUNT, &u16Value);
    }

    /* run mqtt client */

    if (PAHO_SUCCESS == paho_mqtt_start(&client, 8196, 20))
        is_started_user = 1;
    else
        is_started_user = 0;

    return 0;
}


//需要发布的mqtt信息
char Send_data[1024] = "This is an MQTT instance information";//需要发送的模拟量值4G其他打包使用

//处理mqtt数据函数   用户自定义修改
static void data_cheak(char mode){
    if(get_data_flag)
    {
        get_data_flag = 0;
        LOG_I("get data [%s],",get_mqtt_data);
        rt_memset(get_mqtt_data, 0, sizeof(get_mqtt_data));
    }

    //以下是测试上报实例
    static uint32_t cun = 0;
    if(cun % 1000 == 0)
    {
        paho_mqtt_publish(&client, QOS0, MQTT_PUBTOPIC, Send_data,strlen(Send_data));
    }
    cun++;
}


//MQTT主运行函数
static void MQTTNew_entry(void* paremeter){
    rt_thread_mdelay(5000);
    LOG_I("mqtt run!");
    while(1)
    {
        while (1) //等待网络连接准备好
        {
           struct netdev *netdev_link = netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);
           if (netdev_link)
           {
               netdev_low_level_set_link_status(netdev_link, 1);
               break;
           }
           rt_thread_mdelay(rt_tick_from_millisecond(RT_TICK_PER_SECOND));
        }
        rt_thread_mdelay(3000);
        mqtt_init_user();
        while (!client.isconnected) //等待连接成功
        {
            LOG_I("Waiting for mqtt0 connection...\n");
            rt_thread_delay(1000);
        }
        MQTT_CH0_Successfulworking = 1;//连接成功
        while(client.isconnected)
        {
            data_cheak(1);
            rt_thread_mdelay(100);
        }
        MQTT_CH0_Successfulworking = 0;//连接失败
        rt_thread_mdelay(1000);
    }
}






int Client_Net_init(void){
    uint8_t result = 0;

    Net_thread = rt_thread_create( "LTE_W5",MQTTNew_entry,RT_NULL,LTEW5500_stack,LTE_PRIORITY, 20);
    if (Net_thread != RT_NULL){rt_thread_startup(Net_thread);}
    else{
        LOG_W("MQTT Init err!\n");
        result = RT_ERROR;
    }
    return result;
}
INIT_APP_EXPORT(Client_Net_init);






mqtt_app.h文件,主要包含mqtt服务器各种信息

#ifndef APPLICATIONS_MQTT_APP_H_
#define APPLICATIONS_MQTT_APP_H_


#include <rtthread.h>

//mqtt线程优先级
#define LTE_PRIORITY (RT_MAIN_THREAD_PRIORITY - 4 + 3)
//mqtt堆栈大小
#define LTEW5500_stack  (1024+512)



//mqtt 账号
#define USER_NAME "xxxxx"用户自行修改
//mqtt 密码
#define PASS_WORD "xxxxx"用户自行修改
//mqtt ip
#define MQTT_URL_New "xxxxx"用户自行修改
//mqtt 端口号
#define MQTT_PORT_New "xxxxx"用户自行修改
//mqtt 订阅主题
#define MQTT_SUBTOPIC "xxxxx"用户自行修改
//mqtt 发布主题
#define MQTT_PUBTOPIC "xxxxx"用户自行修改

//遗嘱主题
#define will_topic "xxxxx"用户自行修改
//遗嘱信息
#define MQTT_WILLMSG "xxxxx"用户自行修改


#endif /* APPLICATIONS_MQTT_APP_H_ */



移植文件后运行,发现报以下错误

linking...
./packages/mymqtt-latest/MQTTClient-C/mqtt_client.o: In function `mqtt_connect':
E:\Myself\work\A_code\RT-Thread\STM32F103ZET6\10001\Debug/../packages/mymqtt-latest/MQTTClient-C/mqtt_client.c:689: undefined reference to `select'
./packages/mymqtt-latest/MQTTClient-C/mqtt_client.o: In function `mqtt_subscribe':
E:\Myself\work\A_code\RT-Thread\STM32F103ZET6\10001\Debug/../packages/mymqtt-latest/MQTTClient-C/mqtt_client.c:791: undefined reference to `select'
./packages/mymqtt-latest/MQTTClient-C/mqtt_client.o: In function `paho_mqtt_thread':
E:\Myself\work\A_code\RT-Thread\STM32F103ZET6\10001\Debug/../packages/mymqtt-latest/MQTTClient-C/mqtt_client.c:1176: undefined reference to `select'
collect2.exe: error: ld returned 1 exit status
make: *** [makefile:81: rtthread.elf] Error 1
"make -j20 all" terminated with exit code 2. Build might be incomplete.



这是一个典型的链接错误,表明MQTT客户端代码中使用了select ()系统调用,但在RT-Thread编译环境中没有提供对应的实现。解决方法如下找到libc/posix/io 添加构建poll文件即可。

9081069b0ba4fea2d7.png

重新编译,运行,下载 。

可以看到,设备发布,订阅mqtt成功。

2404069b0ba4b12ea0.png

————————————————
版权声明:本文为CSDN博主「BUG_yechiyu」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/henan_zhang/article/details/157549119

公羊子丹 发表于 2026-3-12 08:51 | 显示全部楼层
楼主遇到的 dfs_posix.h 找不到的问题,我移植时也碰到过,其实是只开了 DFS 总开关没开 posix 接口,你在 menuconfig 里找到 DFS 的 posix-like 函数选项勾上,编译就不会报这个头文件错了。
周半梅 发表于 2026-3-12 08:53 | 显示全部楼层
我发现楼主手动定义 struct linger 的方法虽然能解决编译问题,但其实是因为 RT-Thread 的 socket 头文件没引全,在 mqtt_client.c 开头加 #include <sys/socket.h> 和 #include <netinet/tcp.h > 就不用手动定义了。
帛灿灿 发表于 2026-3-12 08:53 | 显示全部楼层
想问下楼主,你把 client.buf_size 设成 4096,STM32F103VET6 的 SRAM 扛得住吗?我试过设这么大,再加上其他线程的栈,容易内存溢出,你有没有用 rt_memheap 检查过内存使用情况?
童雨竹 发表于 2026-3-12 08:54 | 显示全部楼层
笑了,MY MQTT 这个软件包的坑是真不少,我之前连不上服务器,排查半天发现是端口号定义成字符串了,楼主的 MQTT_PORT_New 是宏定义的字符串,传参时要转成 int,不然连接会直接失败。
万图 发表于 2026-3-12 08:56 | 显示全部楼层
我建议楼主在 mqtt_offline_callback 里加个重连逻辑,现在代码里只有主循环等重连,要是网络闪断,恢复后不会自动连,加个 paho_mqtt_start 重新启动客户端,鲁棒性会好很多。
Wordsworth 发表于 2026-3-12 08:57 | 显示全部楼层
楼主的 data_cheak 里用 cun++ 计数做定时发布,用 rt_tick_get () 会更准吧?毕竟 rt_thread_mdelay 的延时会有误差,而且计数变量没加 volatile,编译器优化后可能会出问题,这点要注意。
Bblythe 发表于 2026-3-12 08:58 | 显示全部楼层
想问下楼主,你用的 W5500 网卡,有没有给 MQTT 线程设更高的优先级?我之前因为 MQTT 线程优先级比网卡收包线程低,导致数据丢包,把优先级调高一档后就正常了。
Pulitzer 发表于 2026-3-12 08:59 | 显示全部楼层
分享个小技巧,楼主的 get_mqtt_data 用 strncat 拼接数据,第一次接收时因为数组初值为 0 没问题,后续如果没清干净容易粘包,换成 memcpy 直接覆盖会更安全,避免数据错乱。
Uriah 发表于 2026-3-12 09:00 | 显示全部楼层
我怀疑楼主编译时的警告里还有未定义的标识符,其实是 MY MQTT 软件包的最新版和 RT-Thread5.0.2 有小兼容问题,把软件包回退到 v1.2.0 版本,不用改代码也能直接编译通过。
Clyde011 发表于 2026-3-12 09:01 | 显示全部楼层
楼主在 Client_Net_init 里创建线程用的栈大小是 1536,MQTT 通信时栈容易不够用,我实测把栈大小调到 2048,就不会出现线程莫名挂掉的情况,F103VET6 的 RAM 足够,放心调就行。
您需要登录后才可以回帖 登录 | 注册

本版积分规则

169

主题

467

帖子

0

粉丝
快速回复 在线客服 返回列表 返回顶部
0