[嵌入式开发模块]MQTT开源库Paho嵌入式C/C++版本的移植及使用(基于W5500 io库)

小鱼儿 2022-01-13 13:23 2836阅读 0赞

文章目录

  • 前言
    • Paho
  • 接口文件
    • mqtt_interface.h
    • mqtt_interface.c
  • 移植
    • 定时器Timer类
    • 网络Network类
  • 使用
    • 初始化相关实例
    • 与MQTT Server创建TCP链接
    • 设置MQTT链接选项并创建MQTT链接
    • 订阅Topic
    • 发布Topic
    • 守护
    • 断连
    • 综合示例

前言

最近成功使用W5500实现了MQTT客户端,进行一个记录。

W5500的io库下载下来后在Internet文件夹下有MQTT文件夹。
watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpbl9zdHJvbmc_size_16_color_FFFFFF_t_70
里头差不多就长这个样子。
其中,mqtt_interface两个文件是移植用的接口文件。而其他的文件实际上就是Paho开源库的Embedded C/C++ 版本的文件。

Paho

Paho是MQTT的官方开源库,其有很多版本,各版本之间的特性比较如下:
watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpbl9zdHJvbmc_size_16_color_FFFFFF_t_70 1
可以看到,我们要讲的嵌入式版本(最后一行)是其中特性最少的;这很正常,受限于设备的能力,肯定要精简掉一些特性。注意,虽然这张图上把非阻塞API那一项勾上了,但实际上嵌入式版本只有阻塞式API,可能这里是个笔误。

在进一步研究之前,请先下载Embedded C的最新版库;即使你用的是W5500的io库,里头已经有相关文件了,也最好更新成最新版的。稍微有点差别。
https://github.com/eclipse/paho.mqtt.embedded-c
点进去上面的链接,下载下来一堆文件,我们只需要MQTTPacket/src这个文件夹以及MQTTClient-C/src里的MQTTClient.h和MQTTClient.c。

记得在MQTTClient.h里略做修改。在
#include “MQTTPacket.h”
后面加上一行:
#include “mqtt_interface.h”

网上对C和C++版本的讲解有很多,也很全面,但是对Embedded版本的讲解却几乎没有。下面由我进行讲解。

接口文件

方便起见,我们这样:先添加进去以下两个文件,这两文件是在我的环境下移植好的,但是在你的环境下会有一点点小错误的;但是不打紧,之后我们对其进行修改以完成移植。

mqtt_interface.h

  1. //*****************************************************************************
  2. //! \file mqtt_interface.h
  3. //! \brief Paho MQTT to WIZnet Chip interface Header file.
  4. //! \details The process of porting an interface to use paho MQTT.
  5. //! \version 1.0.0
  6. //! \date 2016/12/06
  7. //! \par Revision history
  8. //! <2016/12/06> 1st Release
  9. //!
  10. //! \author Peter Bang & Justin Kim
  11. //! \copyright
  12. //!
  13. //! Copyright (c) 2016, WIZnet Co., LTD.
  14. //! All rights reserved.
  15. //!
  16. //! Redistribution and use in source and binary forms, with or without
  17. //! modification, are permitted provided that the following conditions
  18. //! are met:
  19. //!
  20. //! * Redistributions of source code must retain the above copyright
  21. //! notice, this list of conditions and the following disclaimer.
  22. //! * Redistributions in binary form must reproduce the above copyright
  23. //! notice, this list of conditions and the following disclaimer in the
  24. //! documentation and/or other materials provided with the distribution.
  25. //! * Neither the name of the <ORGANIZATION> nor the names of its
  26. //! contributors may be used to endorse or promote products derived
  27. //! from this software without specific prior written permission.
  28. //!
  29. //! THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  30. //! AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  31. //! IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  32. //! ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  33. //! LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  34. //! CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  35. //! SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  36. //! INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  37. //! CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  38. //! ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  39. //! THE POSSIBILITY OF SUCH DAMAGE.
  40. //
  41. //*****************************************************************************
  42. /* MQTT subscribe Example.... W5500 + STM32F103(IoT board)
  43. //Include: Board configuration
  44. #include "IoTEVB.h"
  45. //Include: MCU peripheral Library
  46. #include "stm32f10x_rcc.h"
  47. #include "stm32f10x.h"
  48. //Include: W5500 iolibrary
  49. #include "w5500.h"
  50. #include "wizchip_conf.h"
  51. #include "misc.h"
  52. //Include: Internet iolibrary
  53. #include "MQTTClient.h"
  54. //Include: MCU Specific W5500 driver
  55. #include "W5500HardwareDriver.h"
  56. //Include: Standard IO Library
  57. #include <stdio.h>
  58. //Socket number defines
  59. #define TCP_SOCKET 0
  60. //Receive Buffer Size define
  61. #define BUFFER_SIZE 2048
  62. //Global variables
  63. unsigned char targetIP[4] = {
  64. }; // mqtt server IP
  65. unsigned int targetPort = 1883; // mqtt server port
  66. uint8_t mac_address[6] = {
  67. };
  68. wiz_NetInfo gWIZNETINFO = {
  69. .mac = {
  70. }, //user MAC
  71. .ip = {
  72. }, //user IP
  73. .sn = {
  74. },
  75. .gw = {
  76. },
  77. .dns = {
  78. },
  79. .dhcp = NETINFO_STATIC};
  80. unsigned char tempBuffer[BUFFER_SIZE] = {
  81. };
  82. struct opts_struct
  83. {
  84. char* clientid;
  85. int nodelimiter;
  86. char* delimiter;
  87. enum QoS qos;
  88. char* username;
  89. char* password;
  90. char* host;
  91. int port;
  92. int showtopics;
  93. } opts ={
  94. (char*)"stdout-subscriber", 0, (char*)"\n", QOS0, NULL, NULL, targetIP, targetPort, 0 };
  95. // @brief messageArrived callback function
  96. void messageArrived(MessageData* md)
  97. {
  98. unsigned char testbuffer[100];
  99. MQTTMessage* message = md->message;
  100. if (opts.showtopics)
  101. {
  102. memcpy(testbuffer,(char*)message->payload,(int)message->payloadlen);
  103. *(testbuffer + (int)message->payloadlen + 1) = "\n";
  104. printf("%s\r\n",testbuffer);
  105. }
  106. if (opts.nodelimiter)
  107. printf("%.*s", (int)message->payloadlen, (char*)message->payload);
  108. else
  109. printf("%.*s%s", (int)message->payloadlen, (char*)message->payload, opts.delimiter);
  110. }
  111. // @brief 1 millisecond Tick Timer setting
  112. void NVIC_configuration(void)
  113. {
  114. NVIC_InitTypeDef NVIC_InitStructure;
  115. SysTick_CLKSourceConfig(SysTick_CLKSource_HCLK);
  116. SysTick_Config(72000);
  117. NVIC_InitStructure.NVIC_IRQChannel = SysTick_IRQn;
  118. NVIC_InitStructure.NVIC_IRQChannelPreemptionPriority = 0; // Highest priority
  119. NVIC_InitStructure.NVIC_IRQChannelSubPriority = 0;
  120. NVIC_InitStructure.NVIC_IRQChannelCmd = ENABLE;
  121. NVIC_Init(&NVIC_InitStructure);
  122. }
  123. // @brief 1 millisecond Tick Timer Handler setting
  124. void SysTick_Handler(void)
  125. {
  126. MilliTimer_Handler();
  127. }
  128. int main(void)
  129. {
  130. led_ctrl led1,led2;
  131. int i;
  132. int rc = 0;
  133. unsigned char buf[100];
  134. //Usart initialization for Debug.
  135. USART1Initialze();
  136. printf("USART initialized.\n\r");
  137. I2C1Initialize();
  138. printf("I2C initialized.\n\r");
  139. MACEEP_Read(mac_address,0xfa,6);
  140. printf("Mac address\n\r");
  141. for(i = 0 ; i < 6 ; i++)
  142. {
  143. printf("%02x ",mac_address[i]);
  144. }
  145. printf("\n\r");
  146. //LED initialization.
  147. led_initialize();
  148. led1 = led2 = ON;
  149. led2Ctrl(led2);
  150. led1Ctrl(led1);
  151. //W5500 initialization.
  152. W5500HardwareInitilize();
  153. printf("W5500 hardware interface initialized.\n\r");
  154. W5500Initialze();
  155. printf("W5500 IC initialized.\n\r");
  156. //Set network informations
  157. wizchip_setnetinfo(&gWIZNETINFO);
  158. setSHAR(mac_address);
  159. print_network_information();
  160. Network n;
  161. MQTTClient c;
  162. NewNetwork(&n, TCP_SOCKET);
  163. ConnectNetwork(&n, targetIP, targetPort);
  164. MQTTClientInit(&c,&n,1000,buf,100,tempBuffer,2048);
  165. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  166. data.willFlag = 0;
  167. data.MQTTVersion = 3;
  168. data.clientID.cstring = opts.clientid;
  169. data.username.cstring = opts.username;
  170. data.password.cstring = opts.password;
  171. data.keepAliveInterval = 60;
  172. data.cleansession = 1;
  173. rc = MQTTConnect(&c, &data);
  174. printf("Connected %d\r\n", rc);
  175. opts.showtopics = 1;
  176. printf("Subscribing to %s\r\n", "hello/wiznet");
  177. rc = MQTTSubscribe(&c, "hello/wiznet", opts.qos, messageArrived);
  178. printf("Subscribed %d\r\n", rc);
  179. while(1)
  180. {
  181. MQTTYield(&c, data.keepAliveInterval);
  182. }
  183. }
  184. */
  185. #ifndef __MQTT_INTERFACE_H_
  186. #define __MQTT_INTERFACE_H_
  187. /*
  188. * @brief MQTT MilliTimer handler
  189. * @note MUST BE register to your system 1m Tick timer handler
  190. */
  191. void MilliTimer_Handler(void);
  192. /*
  193. * @brief Timer structure
  194. */
  195. typedef struct Timer Timer;
  196. struct Timer {
  197. unsigned long systick_period;
  198. unsigned long end_time;
  199. };
  200. /*
  201. * @brief Network structure
  202. */
  203. typedef struct Network Network;
  204. struct Network
  205. {
  206. int my_socket;
  207. int (*mqttread) (Network*, unsigned char*, int, int);
  208. int (*mqttwrite) (Network*, unsigned char*, int, int);
  209. void (*disconnect) (Network*);
  210. };
  211. /*
  212. * @brief Timer function
  213. */
  214. void TimerInit(Timer*);
  215. char TimerIsExpired(Timer*);
  216. void TimerCountdownMS(Timer*, unsigned int);
  217. void TimerCountdown(Timer*, unsigned int);
  218. int TimerLeftMS(Timer*);
  219. /*
  220. * @brief Network interface porting
  221. */
  222. int w5x00_read(Network*, unsigned char*, int, int);
  223. int w5x00_write(Network*, unsigned char*, int, int);
  224. void w5x00_disconnect(Network*);
  225. void NewNetwork(Network* n, int sn);
  226. int ConnectNetwork(Network*, char*, int);
  227. #endif //__MQTT_INTERFACE_H_

mqtt_interface.c

  1. //*****************************************************************************
  2. //! \file mqtt_interface.c
  3. //! \brief Paho MQTT to WIZnet Chip interface implement file.
  4. //! \details The process of porting an interface to use paho MQTT.
  5. //! \version 1.0.0
  6. //! \date 2016/12/06
  7. //! \par Revision history
  8. //! <2016/12/06> 1st Release
  9. //!
  10. //! \author Peter Bang & Justin Kim
  11. //! \copyright
  12. //!
  13. //! Copyright (c) 2016, WIZnet Co., LTD.
  14. //! All rights reserved.
  15. //!
  16. //! Redistribution and use in source and binary forms, with or without
  17. //! modification, are permitted provided that the following conditions
  18. //! are met:
  19. //!
  20. //! * Redistributions of source code must retain the above copyright
  21. //! notice, this list of conditions and the following disclaimer.
  22. //! * Redistributions in binary form must reproduce the above copyright
  23. //! notice, this list of conditions and the following disclaimer in the
  24. //! documentation and/or other materials provided with the distribution.
  25. //! * Neither the name of the <ORGANIZATION> nor the names of its
  26. //! contributors may be used to endorse or promote products derived
  27. //! from this software without specific prior written permission.
  28. //!
  29. //! THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  30. //! AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  31. //! IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  32. //! ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  33. //! LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  34. //! CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  35. //! SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  36. //! INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  37. //! CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  38. //! ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  39. //! THE POSSIBILITY OF SUCH DAMAGE.
  40. //
  41. //*****************************************************************************
  42. #include "mqtt_interface.h"
  43. #include "socket.h"
  44. #include "MyOS.h"
  45. unsigned long MilliTimer;
  46. /*
  47. * @brief MQTT MilliTimer handler
  48. * @note MUST BE register to your system 1m Tick timer handler.
  49. */
  50. void MilliTimer_Handler(void) {
  51. MilliTimer++;
  52. }
  53. /*
  54. * @brief Timer Initialize
  55. * @param timer : pointer to a Timer structure
  56. * that contains the configuration information for the Timer.
  57. */
  58. void TimerInit(Timer* timer) {
  59. timer->end_time = 0;
  60. }
  61. /*
  62. * @brief expired Timer
  63. * @param timer : pointer to a Timer structure
  64. * that contains the configuration information for the Timer.
  65. */
  66. char TimerIsExpired(Timer* timer) {
  67. long left = timer->end_time - MilliTimer;
  68. return (left < 0);
  69. }
  70. /*
  71. * @brief Countdown millisecond Timer
  72. * @param timer : pointer to a Timer structure
  73. * that contains the configuration information for the Timer.
  74. * timeout : setting timeout millisecond.
  75. */
  76. void TimerCountdownMS(Timer* timer, unsigned int timeout) {
  77. timer->end_time = MilliTimer + timeout;
  78. }
  79. /*
  80. * @brief Countdown second Timer
  81. * @param timer : pointer to a Timer structure
  82. * that contains the configuration information for the Timer.
  83. * timeout : setting timeout millisecond.
  84. */
  85. void TimerCountdown(Timer* timer, unsigned int timeout) {
  86. timer->end_time = MilliTimer + (timeout * 1000);
  87. }
  88. /*
  89. * @brief left millisecond Timer
  90. * @param timer : pointer to a Timer structure
  91. * that contains the configuration information for the Timer.
  92. */
  93. int TimerLeftMS(Timer* timer) {
  94. long left = timer->end_time - MilliTimer;
  95. return (left < 0) ? 0 : left;
  96. }
  97. /*
  98. * @brief New network setting
  99. * @param n : pointer to a Network structure
  100. * that contains the configuration information for the Network.
  101. * sn : socket number where x can be (0..7).
  102. * @retval None
  103. */
  104. void NewNetwork(Network* n, int sn) {
  105. n->my_socket = sn;
  106. n->mqttread = w5x00_read;
  107. n->mqttwrite = w5x00_write;
  108. n->disconnect = w5x00_disconnect;
  109. }
  110. /*
  111. * @brief read function
  112. * @param n : pointer to a Network structure
  113. * that contains the configuration information for the Network.
  114. * buffer : pointer to a read buffer.
  115. * len : buffer length.
  116. */
  117. int w5x00_read(Network* n, unsigned char* buffer, int len, int timeout_ms){
  118. Timer tmr;
  119. TimerInit(&tmr);
  120. TimerCountdownMS(&tmr, timeout_ms);
  121. while(!TimerIsExpired(&tmr)){
  122. if(getSn_SR(n->my_socket) != SOCK_ESTABLISHED)
  123. return -1;
  124. if(getSn_RX_RSR(n->my_socket)>0)
  125. return recv(n->my_socket, buffer, len);
  126. MyOS_DlyHMSM(0,0,0,30);
  127. }
  128. return 0;
  129. }
  130. /*
  131. * @brief write function
  132. * @param n : pointer to a Network structure
  133. * that contains the configuration information for the Network.
  134. * buffer : pointer to a read buffer.
  135. * len : buffer length.
  136. */
  137. int w5x00_write(Network* n, unsigned char* buffer, int len, int timeout_ms){
  138. return send(n->my_socket, buffer, len);
  139. }
  140. /*
  141. * @brief disconnect function
  142. * @param n : pointer to a Network structure
  143. * that contains the configuration information for the Network.
  144. */
  145. void w5x00_disconnect(Network* n){
  146. disconnect(n->my_socket);
  147. }
  148. /*
  149. * @brief connect network function
  150. * @param n : pointer to a Network structure
  151. * that contains the configuration information for the Network.
  152. * ip : server iP.
  153. * port : server port.
  154. */
  155. int ConnectNetwork(Network* n, char* ip, int port)
  156. {
  157. uint8_t myport = 12345;
  158. socket(n->my_socket,Sn_MR_TCP,myport,0);
  159. connect(n->my_socket,ip,port);
  160. }

以上两个文件mqtt_interface.h和mqtt_interface.c不完全是io库自带的那个,是基于我的环境进行了一定的修改后移植完成后的版本。原来的版本有一些根本性的问题,所以不要嫌麻烦,动动小手指 唱跳rap篮球+cv 一下,先替换掉原来的文件。

移植

下面我们来看看怎么把这个文件移植到你的嵌入式系统中。移植的主要工作就是为MQTT库实现一个定时器Timer类以及一个网络Network类。

定时器Timer类

打开MQTTClient.h可以看到库对Timer类的要求:

  1. /* The Timer structure must be defined in the platform specific header,
  2. * and have the following functions to operate on it. */
  3. extern void TimerInit(Timer*);
  4. extern char TimerIsExpired(Timer*);
  5. extern void TimerCountdownMS(Timer*, unsigned int);
  6. extern void TimerCountdown(Timer*, unsigned int);
  7. extern int TimerLeftMS(Timer*);

这几个接口的作用分别是:
TimerInit:初始化一个Timer实例
TimerIsExpired:返回定时器是否超时
TimerCountdownMS:设定过多少ms超时
TimerCountdown:设定过多少s超时
TimerLeftMS:返回还剩多少ms超时

上面的接口文件已经实现了这些接口,其原理是内部用了一个计时用的全局变量,每1ms应该加1,而Timer内部成员则保存计时的起始时间和超时时间。通过内部变量与Timer内部成员的比较,算出是否超时等。这个实现与操作系统等无关,所以应该可以直接用。

但是要记得实现每1ms调用一次以下接口。比如用一个计时中断。

  1. /*
  2. * @brief MQTT MilliTimer handler
  3. * @note MUST BE register to your system 1m Tick timer handler
  4. */
  5. void MilliTimer_Handler(void);

网络Network类

同样在MQTTClient.h中可以看到库对Network类的要求:

  1. /* The Platform specific header must define the Network and Timer structures and functions
  2. * which operate on them.
  3. *
  4. typedef struct Network
  5. {
  6. int (*mqttread)(Network*, unsigned char* read_buffer, int, int);
  7. int (*mqttwrite)(Network*, unsigned char* send_buffer, int, int);
  8. } Network;*/

简单来说就是MQTT库将网络接口抽象为了Network类。我要好好夸夸这个接口设计,它非常成功地将MQTT实现与网络的具体实现解耦了。网络相关的信息由用户封装到自己定义的Network类之中,库只管在需要的时候回调Network类的几个方法来实现网络功能。如果你用纯C做过面向对象编程的话应该会比较好理解。

我们来看看Network结构体在这个移植版本中的定义:

  1. typedef struct Network Network;
  2. struct Network
  3. {
  4. int my_socket;
  5. int (*mqttread) (Network*, unsigned char*, int, int);
  6. int (*mqttwrite) (Network*, unsigned char*, int, int);
  7. void (*disconnect) (Network*);
  8. };

可以看到它在要求的结构体上还进行了一定的扩展。
第一个成员my_socket用于存储当前TCP链接的socket号,对应于io库中各个接口的第一个参数。

后面三个则定义了网络的读、写以及断连接口。MQTT库在内部会调用Network实例的对应接口,并把Network实例自身作为第一个参数传入,以实现对应的功能,而接口则可以通过实例的第一个成员来区分要操作的是哪一个socket。

于是我们可以看到NewNetwork的实现是这样的:

  1. void NewNetwork(Network* n, int sn) {
  2. n->my_socket = sn;
  3. n->mqttread = w5x00_read;
  4. n->mqttwrite = w5x00_write;
  5. n->disconnect = w5x00_disconnect;
  6. }

只是简单的将实例的成员和函数接口进行赋值。这个应该不需要你动。

于是接下来我们就要进行移植的最主要工作—实现w5x00_read、w5x00_write和(其实可以不实现的)w5x00_disconnect了。
我们来看下它们的实现:

  1. /*
  2. * @brief read function
  3. * @param n : pointer to a Network structure
  4. * that contains the configuration information for the Network.
  5. * buffer : pointer to a read buffer.
  6. * len : buffer length.
  7. */
  8. int w5x00_read(Network* n, unsigned char* buffer, int len, int timeout_ms){
  9. Timer tmr;
  10. TimerInit(&tmr);
  11. TimerCountdownMS(&tmr, timeout_ms);
  12. while(!TimerIsExpired(&tmr)){
  13. if(getSn_SR(n->my_socket) != SOCK_ESTABLISHED)
  14. return -1;
  15. if(getSn_RX_RSR(n->my_socket)>0)
  16. return recv(n->my_socket, buffer, len);
  17. MyOS_DlyHMSM(0,0,0,30);
  18. }
  19. return 0;
  20. }
  21. /*
  22. * @brief write function
  23. * @param n : pointer to a Network structure
  24. * that contains the configuration information for the Network.
  25. * buffer : pointer to a read buffer.
  26. * len : buffer length.
  27. */
  28. int w5x00_write(Network* n, unsigned char* buffer, int len, int timeout_ms){
  29. return send(n->my_socket, buffer, len);
  30. }
  31. /*
  32. * @brief disconnect function
  33. * @param n : pointer to a Network structure
  34. * that contains the configuration information for the Network.
  35. */
  36. void w5x00_disconnect(Network* n){
  37. disconnect(n->my_socket);
  38. }

应该还蛮好理解的,就是要对Network n进行操作,从网络往缓冲区buffer中读入最多len个字符或写buffer中的len个字符到网络中,超时时间timeout_ms微妙。主要就是实现到W5500 io库对应接口的一个映射关系。唯一会出问题的估计就是read函数里头那个MyOS_DlyHMSM,这是我对操作系统的抽象,实现线程sleep的功能,以避免阻塞操作抢占太多系统资源,你应该根据自己的操作系统进行修改。

还有一个ConnectNetwork就不细讲了。

上面的实现默认你用的是W5500的io库,如果不是,可能需要照着进行一定的修改。

使用

使用这个嵌入式版本的MQTT Client有以下几个步骤:

初始化相关实例

  1. static Network _mqttNetwork;
  2. static MQTTClient _mqttClient = DefaultClient;
  3. NewNetwork(&_mqttNetwork, MQTT_SOCKET);
  4. MQTTClientInit(&_mqttClient,&_mqttNetwork,3000,_mqttSendBuf,sizeof(_mqttSendBuf),_mqttRecvBuf,
  5. sizeof(_mqttRecvBuf));

MQTTClientInit的第三个参数是命令的超时时间(ms),剩下几个参数是发送及接收缓冲区。

与MQTT Server创建TCP链接

这一步与MQTT库无关,就是使用自己的网络栈创建一条TCP链接,可以使用Network类的方法:

  1. ConnectNetwork(&_mqttNetwork, _serverIP, _serverPort);

设置MQTT链接选项并创建MQTT链接

创建好TCP链接后就要和MQTT服务器创建MQTT的链接了,不然相关功能都无法使用。
首先我们要设置链接选项:

  1. static MQTTPacket_connectData _mqttConnData = MQTTPacket_connectData_initializer;
  2. _mqttConnData.clientID.cstring = _devName; // 设置客户端的名字
  3. _mqttConnData.cleansession = 1; // 清会话
  4. // 如不使用用户名密码,直接注释下面即可
  5. _mqttConnData.username.cstring = "用户名";
  6. _mqttConnData.password.cstring = "密码";

MQTT有一个遗嘱功能可以在这里设置,如设置:

  1. _mqttConnData.willFlag = TRUE; // 启用遗嘱
  2. _mqttConnData.will.topicName.cstring = "feeldead"; // 设置遗嘱的topic名
  3. _mqttConnData.will.qos = QOS1; // 遗嘱的QOS
  4. _mqttConnData.will.message.cstring = "我断连了!"; // 遗嘱的内容

这样,当服务器检测到客户端异常断连的话,就会发送遗嘱,这样所有订阅这个主题的客户端就都能知道这个设备掉线了。当然,如果是主动发出断连请求后断掉的话就不会发遗嘱了。

设置好连接选项后就可以调用MQTTConnect来创建MQTT连接了

  1. MQTTConnect(&_mqttClient, &_mqttConnData);

成功则返回 SUCCESS。

订阅Topic

建立连接成功后,可以订阅感兴趣的Topic:

  1. MQTTSubscribe(&_mqttClient, "topicFilter", QOS1, messageArrived);

成功则返回 SUCCESS。最后一个选项用于注册对应Topic的处理函数,原型如下:

  1. void messageArrived(MessageData* md);

发布Topic

建立连接成功后,可以发布Topic:

  1. static char _sData[] = "10C";
  2. static MQTTMessage _sensorData;
  3. _sensorData.qos = QOS0;
  4. _sensorData.retained = 0; // 是否retained,如是,新发起订阅的客户端会收到最近的一个消息
  5. _sensorData.id = 30; // 消息id 只需要指定初始的,后面每次会自增
  6. _sensorData.payload = _sData;
  7. _sensorData.payloadlen = sizeof(_sData) - 1;
  8. MQTTPublish(&_mqttClient, "topicName", &_sensorData);

成功则返回 SUCCESS。

守护

建立连接成功后,需要有进程完成例行的通信和接收消息等任务,这是通过调用MQTTYield来阻塞完成的:

  1. MQTTYield(&_mqttClient, 3000);

第二个参数代表这次要守护多久(ms);
如返回SUCCESS, 说明这一次的守护成功;如失败,可能你得考虑重新建立连接。

断连

如需断开连接,先调用MQTTDisconnect:

  1. MQTTDisconnect(&_mqttClient);

然后断开TCP链接。

综合示例

以下示例中的Task主动连接192.168.1.16:1883上的MQTT服务器。

设置遗嘱的TopicName为feeldead,QOS1,消息为自己的设备名。

然后以QOS1订阅主题s/temp,每次收到这个主题的消息时对相关信息进行打印。

通过设置_mqttKeepConn为FALSE来主动断开连接。

如需要发布消息时如上调用MQTTPublish(&_mqttClient, “topicName”, &_sensorData);来发布。

已略去其他关系不是很大的代码。

  1. #include "MQTTClient.h"
  2. ……
  3. static const char _devName[] = "device1";
  4. static Network _mqttNetwork;
  5. static MQTTClient _mqttClient = DefaultClient;
  6. static MQTTPacket_connectData _mqttConnData = MQTTPacket_connectData_initializer;
  7. static uint8_t _mqttSendBuf[100];
  8. static uint8_t _mqttRecvBuf[100];
  9. static uint8_t _mqttHost[4] = {
  10. 192, 168, 1, 16};
  11. static uint16_t _mqttPort = 1883;
  12. static const char _sData[] = "34.12,43,1";
  13. static BOOL _mqttKeepConn = TRUE;
  14. static MQTTMessage _sensorData = {
  15. QOS0,
  16. 0,
  17. 0,
  18. 1,
  19. _sData,
  20. sizeof(_sData) - 1,
  21. };
  22. static void _printfMessage(MessageData* md){
  23. char *buf = (char *)md->message->payload;
  24. size_t i;
  25. int index = 0;
  26. printf("Topic %.*s\n", md->topicName->lenstring.len, md->topicName->lenstring.data);
  27. printf("QoS: %d\r\nRetained: %d\r\nDup: %d\r\nID: %u\r\n",
  28. (int)md->message->qos,
  29. (int)md->message->retained,
  30. (int)md->message->dup,
  31. md->message->id);
  32. printf("Payload: %.*s\n", md->message->payloadlen, (char *)md->message->payload);
  33. }
  34. void messageArrived(MessageData* md){
  35. _printfMessage(md);
  36. }
  37. static void MqttSubTask(void *p_arg){
  38. int rc;
  39. (void)p_arg;
  40. _mqttConnData.clientID.cstring = _devName;
  41. _mqttConnData.cleansession = 1;
  42. _mqttConnData.willFlag = TRUE;
  43. _mqttConnData.will.topicName.cstring = "feeldead";
  44. _mqttConnData.will.qos = QOS1;
  45. _mqttConnData.will.message.cstring = _devName;
  46. NewNetwork(&_mqttNetwork, MQTT_SOCKET);
  47. MQTTClientInit(&_mqttClient,&_mqttNetwork,3000,_mqttSendBuf,sizeof(_mqttSendBuf),_mqttRecvBuf,
  48. sizeof(_mqttRecvBuf));
  49. for(;;){
  50. (void)OSTimeDlyHMSM(0,0,10,0);
  51. if(!_mqttKeepConn || !Network_Ready())
  52. continue;
  53. if(socket(MQTT_SOCKET,Sn_MR_TCP,0 ,0x00) == MQTT_SOCKET){
  54. (void)printf("MQTTclient: socket inited.\r\n");
  55. }else{
  56. (void)printf("MQTTclient: socket init fail.\r\n");
  57. continue;
  58. }
  59. if(connect(MQTT_SOCKET, _mqttHost, _mqttPort) == SOCK_OK){
  60. (void)printf("MQTTclient: tcp connect bulid.\r\n");
  61. }else{
  62. (void)printf("MQTTclient: connect fail.\r\n");
  63. continue;
  64. }
  65. (void)printf("MQTTclient: try connect server.\r\n");
  66. if((rc = MQTTConnect(&_mqttClient, &_mqttConnData)) == SUCCESS){
  67. (void)printf("MQTTclient: connect server success.\r\n");
  68. }else{
  69. (void)printf("MQTTclient: connect server fail,%d.\r\n", rc);
  70. continue;
  71. }
  72. printf("MQTTclient: Subscribing to %s, Qos: %d\r\n", "s/+/temp", QOS1);
  73. rc = MQTTSubscribe(&_mqttClient, "s/temp", QOS1, messageArrived);
  74. if(rc != SUCCESS)
  75. printf("MQTTclient: Subscribed %d\r\n", rc);
  76. while(1){
  77. if((rc = MQTTYield(&_mqttClient, 3000)) != SUCCESS){
  78. printf("MQTTclient: Yield fail %d\r\n", rc);
  79. break;
  80. }
  81. if(!_mqttKeepConn){
  82. MQTTDisconnect(&_mqttClient);
  83. disconnect(MQTT_SOCKET);
  84. close(MQTT_SOCKET);
  85. break;
  86. }
  87. }
  88. }
  89. }

发表评论

表情:
评论列表 (有 0 条评论,2836人围观)

还没有评论,来说两句吧...

相关阅读