Giao thức MQTT là giao thức được sử dụng rất nhiều trong thế giới IOT. MQTT có thể chạy trền nền TCP hoặc Socket, do chúng ta mới làm tới TCP nên đương nhiên bài này sẽ hướng dẫn các bạn giao thức TCP trên nền tảng TCP mà chúng ta đã viết từ các bài trước
MQTT cần có 1 broker là trung tâm của mọi luồng dữ liệu, mình sẽ sử dụng broker của trang hive mqtt broker.hivemq.com
Nhà cung cấp này cho trung ta 1 broker công khai miễn phí và không bảo mật do vậy rất phù hợp để học tập và thử nghiệm. MQTT trên nền TCP sẽ được phục vụ tại cổng 1883To
Topic, Publish, Subscribe
Đây là 3 khái niệm cơ bản của MQTT, topic là 1 chủ đề, hiểu 1 cách đơn giản là đích đến của dữ liệu. Publish là hành động xuất bản 1 tin nhắn tới topic nào đó, và Subscriptions – đăng kí nhận dữ liệu từ topic nào đó
Ví dụ: Máy tính đăng kí với broker tôi muốn nhận dữ liệu từ topic cảm biến, mạch thu thập nhiệt độ sẽ thường xuyên gửi dữ liệu nhiệt độ nó đo được lên topic cảm biến. Do máy tính đã đăng kí nhận dữ liệu ở topic nên máy tính sẽ đc broker gửi tin nhắn này cho. Máy tính cũng có thể đăng kí nhận dữ liệu từ các topic khác nữa
Cấu trúc gói
Để làm việc được với MQTT ta cần ít nhất 4 gói tin cơ bản gồm: Gói connect, gói disconect, gói puclish, gói subscribe
Trường Control Header : 1 byte , nó có tác dụng điều hướng gói tin mqtt này thuộc loài nào, với:
Gói connect: 0x10
Gói publish: 0x30
Gói subscribe : 0x80
Gói unsubscribe : 0xA0
Gói disconect: 0xE0
Gói CONNACK : 0x20 ( khi gửi in connect m sẽ check gói này để biết kết nối ok không)
Gói PUBACK : 0x40 (khi gửi tin nhắn đi mình check gói này để biết bên nhận đã nhận được chưa)
Gói SUBACK : 0x90 (khi gửi tin nhắn đăng kí topic mình check gói này để biết đã đăng kí được chưa)
Gói UNSUBACK : 0xB0 (khi gửi gói tin hủy đăng kí topic mình check gói này để biết hủy thành công chưa)
Ở byte Control Header , 4 byte đầu là số hiệu của gói, 4 byte sau là các option tùy chọn. Ví dụ = 0 thì không cần phản hồi, bằng 2 là có phản hồi. Cụ thể, bạn gửi gói tin connect với Header là 0x10 thì nó sẽ không phản hồi lại gói CONNACK, còn gửi 0x12 thì nó sẽ gửi lại CONNACK để chắc chắn 100% đã kết nối thành công
Trường Packet Length mô tả phía sau nó còn bao nhiêu byte nữa (không tính bản thân nó)
Trường này không cố định số lượng byte, tối đa 4 và tối thiểu 1. Bit cao nhất của byte sẽ xác định xem byte phía sau nó có thuộc Packet Length không ! Do đó chỉ có 7bit dùng để mã hóa dữ liệu
Ví dụ 1: Gói tin phía sau còn 31 byte thì ta chỉ cần 1 byte cho Packet Length
=> Packet Length = 0x1F
Ví dụ 2: Gói tin phía sau còn 321 byte thì ta sẽ cần 2 byte cho Packet Length với byte1 là byte thấp và byte2 là byte cao. Cứ mỗi giá trị của byte cao sẽ = 128 lần byte thấp. Ta tách 321 = 65 + 2*128
=> Packet Length = 0x41 0x02
Bit cao nhất của byte1 phải được set lên 1 để báo vẫn còn 1 byte nữa cho Packet Length nên phải sửa thành Packet Length = 0xC1 0x02
Với 4 byte Packet Length, ta sẽ mã hóa được tối đa 268,435,455 byte dữ liệu
From | To |
0 (0x00) | 127 (0x7F) |
28 (0x80, 0x01) | 16 383 (0xFF, 0x7F) |
16 384 (0x80, 0x80, 0x01) | 2 097 151 (0xFF, 0xFF, 0x7F) |
2 097 152 (0x80, 0x80, 0x80, 0x01) | 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F) |
Trong bài này, để cho đơn giản mình sẽ mặc định chỉ sử dụng 1 byte cho Packet Length
Tiếp tục tới trường thứ 3 là Variable length Header, nó gồm 4 trường nhỏ sau: Protocol Name, Protocol Level, Connect Flags, and Keep Alive
- Protocol Name: (6byte) 0x00 0x04 0x4D 0x51 0x54 0x54
- Protocol Level (version): (1byte) 0x04 = Version: MQTT v3.1.1 (4)
- Connect Flags: (1 byte) 0x??
- Keep Alive: (2 byte) 0x?? 0x??
Các bạn chỉ cần quan tâm tới Connect Flags
Cấu trúc của nó như sau:
Bit 1 chúng ta sẽ mặc định để bằng 1
Các bit khác các bạn xài cái nào thì set nó lên. Ví dụ mình không cần user, password thì chỉ cần 0x02 là ok, nếu set cái nào lên thì phải mô tả nó trong payload nhé
Payload là trường sẽ chứa các thông tin để phục vụ việc connect như ID_Client, User, Password. Thông tin trong trường này phải tuân thủ thứ tự sau:
Client ID -> Will Topic -> Will Message -> User Name -> Password
Chú ý: Ở trên byte Connect Flags cái nào không set thì bỏ qua nó trong Payload nhé !
Còn Keep Alive là 2 byte chứa thời gian được tính bằng giây. Đó là khoảng thời gian tối đa được phép trôi qua giữa điểm mà Client hoàn thành việc truyền một Control Packet và điểm mà nó bắt đầu gửi tiếp theo. Nói chung cứ để từ 10 đến 60s tùy các bạn
Tạo thư viện MQTT
Các bạn tạo thêm 2 file mqtt.c và mqtt.h cũng như thêm các mã khởi tạo thư viện cơ bản mà chúng ta đã quá quen thuộc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
#ifndef MQTT_H_ #define MQTT_H_ //-------------------------------------------------- //Include cac thu vien can thiet #include "stm32f1xx_hal.h" #include <string.h> #include <stdlib.h> #include <stdint.h> #include <stdio.h> #include "uart.h" #include "tcp.h" //-------------------------------------------------- typedef struct { uint8_t control_header; uint8_t length; uint8_t Protocol_name[6]; //MQTT (0x00 0x04 0x4D 0x51 0x54 0x54) uint8_t Protocol_version; uint8_t flag; uint16_t Keep_alive; uint16_t data_length; uint8_t data[]; }__attribute__ ((packed)) MQTT_connect_struct; //struct size = size_data + 14 typedef struct { uint8_t control_header; uint8_t length; uint16_t topic_length; uint8_t data[]; }__attribute__ ((packed)) MQTT_mess_struct; //struct size = size_data + 4 typedef struct { uint8_t control_header; uint8_t length; uint16_t mess_identifier; //0x0001 uint16_t topic_length; uint8_t data[]; }__attribute__ ((packed)) MQTT_sub_struct; //struct size = size_data + 6 + 1 (ket thuc co 1 byte 0x00 nua) typedef struct { uint8_t *topic; uint16_t topic_length; uint8_t *mess; uint16_t mess_length; }__attribute__ ((packed)) MQTT_callback_typedef; typedef void (*mqtt_fun_typedef)(MQTT_callback_typedef *); //-------------------------------------------------- void MQTT_connect(uint8_t *ip_broker,uint16_t port,char *client_id); void MQTT_publish(char *topic,char *mess); void MQTT_subscriber(char *topic); //gui 1 tin nhan toi topic bat ki void MQTT_read(uint8_t *mqtt_data,uint16_t mqtt_length); void MQTT_registerCallback(mqtt_fun_typedef mqtt); //-------------------------------------------------- #endif /* MQTT_H_ */ |
1 2 3 4 5 6 7 8 9 |
#include "mqtt.h" extern const uint8_t macaddr[6]; extern uint8_t ip[4]; extern char debug_string[60]; extern uint8_t eth_buffer[BUFFER_LENGTH]; extern uint8_t gateway_ip[4]; extern uint8_t gateway_mac[6]; //--------------END FILE---------------------------------------------------------// |
Hàm Connect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
void MQTT_connect(uint8_t *ip_broker,uint16_t port,char *client_id) { uint8_t *mqtt_ptr; uint16_t struct_len,data_len; data_len = strlen(client_id); struct_len = data_len+14; //day la kich thuoc cua toan bo goi tin MQTT mqtt_ptr = calloc(struct_len,sizeof(uint8_t)); //su dung cap phat dong MQTT_connect_struct *MQTT = (MQTT_connect_struct *)mqtt_ptr; //khoi tao cau truc mqtt len vung nho dc cap phat MQTT->control_header = 0x10; //connect MQTT->data_length = swap16(data_len); MQTT->length = 12 + data_len; memcpy(MQTT->Protocol_name,"\x00\x04MQTT",6); MQTT->Protocol_version = 0x04; //Version: MQTT v3.1.1 MQTT->flag = 0x02; MQTT->Keep_alive = swap16(60); memcpy(MQTT->data,client_id,data_len); UART_putString("Ket noi toi broker MQTT\r\n"); TCP_request(ip_broker,port,mqtt_ptr,struct_len,3000); free(mqtt_ptr); //giai phong vung nho } |
Chú ý: Các bạn tạo thêm hàm TCP_request trong thư viện tcp.c nhé
1 2 3 4 5 |
void TCP_request(uint8_t *IP_server,uint16_t port,uint8_t *data,uint16_t length,uint32_t timeout) { TCP_Connect(IP_server,port,timeout,0); //ket noi TCP_sendData(data,length,3000); //gửi tin nhắn với timeout là 3s } |
Hàm Publish
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
void MQTT_publish(char *topic,char *mess) //gui 1 tin nhan toi topic bat ki { uint8_t *mqtt_ptr; uint16_t topic_length = strlen(topic); uint16_t mess_length = strlen(mess); uint16_t struct_len = topic_length + mess_length + 4; //tinh toan do dai cua struct mqtt_ptr = calloc(struct_len,sizeof(uint8_t)); //su dung cap phat dong MQTT_mess_struct *MQTT = (MQTT_mess_struct *)mqtt_ptr; //su dung cap phat dong de khoi tao con tro struct mqtt MQTT->control_header = 0x30; //publish MQTT->length = topic_length + mess_length + 2; MQTT->topic_length = swap16(topic_length); memcpy(MQTT->data,topic,topic_length); //copy topic vao struct memcpy(MQTT->data+topic_length,mess,mess_length); //copy tin nhan vao struct TCP_sendData(mqtt_ptr,struct_len,3000); //gửi tin nhắn với timeout là 3s free(mqtt_ptr); //giai phong vung nho } |
2 hàm trên mình sử dụng cấp phát động để khởi tạo bộ nhớ cho struct
Test thử
Chúng ta sẽ kết nối tới broker của hive MQTT và gửi thử 1 vài tin nhắn lên để kiểm tra chương trình hoạt động ok chưa nhé !
Trước tiên trong file main hãy đổi ip server thành 3.127.99.166 vì đó chính là ip của hive mqtt. Mình cũng tạo thêm 1 buff để lưu tin nhắn sẽ gửi và 1 biến i lưu số hiệu tin nhắn
1 2 |
uint8_t ip_server[4]={3,127,99,166}; uint8_t buff[100],i; |
Trong hàm main, trước while(1) mình sẽ gọi hàm MQTT connect với id của client là enc28j60
1 |
MQTT_connect(ip_server,1883,"enc28j60"); |
Và trong main cứ 2s gửi 1 tin nhắn lên topic tên là IOT47
1 2 3 |
sprintf(buff,"Xin chao, toi la ENC28J60, tin nhan so %i",i++); MQTT_publish("IOT47",buff); HAL_Delay(2000); |
OK, mình sẽ show giao diện debug ở ngay đây
Demo gửi nhận MQTT
Tin nhắn gửi tới topic IOT47: ...
Topic:
Tin nhắn:
Mình đã nhận được tin nhắn gửi lên rồi web ! Chúc các bạn thành công 😛
Viết hàm Subscriber
Hàm subscriber
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
void MQTT_subscriber(char *topic) //gui 1 tin nhan toi topic bat ki { uint8_t *mqtt_ptr; uint16_t topic_length = strlen(topic); uint16_t struct_len = topic_length + 6 + 1; //tinh toan do dai cua struct mqtt_ptr = calloc(struct_len,sizeof(uint8_t)); //su dung cap phat dong mqtt_ptr[struct_len-1]=0; //end point = zero MQTT_sub_struct *MQTT = (MQTT_sub_struct *)mqtt_ptr; MQTT->control_header = 0x80; // sub MQTT->length = struct_len-2; MQTT->mess_identifier = swap16(0x0001); MQTT->topic_length = swap16(topic_length); memcpy(MQTT->data,topic,topic_length); //copy topic vao struct UART_putString("Dang ki topic:");UART_putString(topic);UART_putString("\r\n"); TCP_sendData(mqtt_ptr,struct_len,3000); //gửi tin nhắn tcp với timeout là 3s free(mqtt_ptr); //giai phong vung nho } |
Xử lí các gói tin xác nhận (ACK)
Để chắc chắn việc kết nối, đăng kí thành công 100%, chúng ta sẽ bắt thêm các bản tin ACK nữa nhé, cụ thể mình sẽ bắt gói SUBACK, và CONACK, tuy nhiên mình sẽ để các bạn tự viết thêm chương trình bắt các gói xác nhận này nếu muôn, mình sẽ demo việc bắt gói Publish
Bây giờ mình sẽ viết phương thức MQTT_read để bên thư viện TCP gọi nó ra. Nó giống như việc thằng TCP nhận đc 1 gói tin nhìn giống giống gói MQTT nên sẽ gọi hàm MQTT_read và bảo "ê, có gói tin na ná mqtt này, tự đi mà xử lí tau không quan tâm"
Quay lại thư viện TCP, mở tcp.h và add thư viện mqtt.h vô
Ở cuối hàm TCP_readData thêm phần kiểm tra xem có gói tin mqtt nào không rồi gọi hàm MQTT_read cho em nó xử lí
1 2 3 |
//kiem tra goi tin xem co phai la goi MQTT publish khong if(TCP_Struct_Frame->data[data_offset] == 0x30) //bat publish MQTT_read(&TCP_Struct_Frame->data[data_offset],data_len); |
OK ! bây giờ quay lại hàm MQTT_read và chúng ta sẽ xử lí data ở đó
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
void MQTT_read(uint8_t *mqtt_data,uint16_t mqtt_length) { MQTT_callback_typedef mqtt; //kiem tra do dai cua topic mqtt if((mqtt_data[1] & 0x80) == 0) { mqtt.topic_length = swap16(mqtt_data[1]); mqtt.topic = &mqtt.topic[4]; mqtt.mess = &mqtt.topic[4+mqtt_data[1]]; mqtt.mess_length = mqtt_length - 4+mqtt_data[1]; UART_putString("Da nhan goi MQTT\r\nTopic:"); for(int i=0;i<mqtt.topic_length;i++) UART_putChar(mqtt.topic[i]); UART_putString("\r\nMess:"); for(int i=0;i<mqtt.mess_length;i++) UART_putChar(mqtt.mess[i]); UART_putString("\r\n"); } else UART_putString("Tin nhan qua dai ! khong ho tro\r\n"); } |
Trước khi viết tiếp mình sẽ test thử xem có đúng là đã nhận được bản tin publish nào chưa đã
Quay về, hàm main, sau khi connect tới broker xong, gọi hàm MQTT_subscriber
1 |
MQTT_subscriber("iot47_hehe"); |
Và ở trang này các bạn cuộn lên giao diện MQTT debug mình đã viết bên trên, thử gửi 1 tin nhắn tới topic iot47_hehe xem
Đăng kí call back nhận tin nhắn
Các bạn có thể viết code thực thi gì đó ở hàm MQTT read luôn, nhưng để độc lập các thư viện mình sẽ tạo hàm callback để có thể viết hàm xử lí ở file main. Mình sẽ tạo 1 con trỏ hàm để trỏ tới hàm cần cần gọi, các bạn mở file mqtt.c và khởi tạo 1 con trỏ hàm ở đầu file
1 |
mqtt_fun_typedef mqtt_function; |
Và tạo thêm hàm đăng kí callback
1 2 3 4 |
void MQTT_registerCallback(void *function) { mqtt_function = mqtt; //tro vao ham can goi } |
Khi chúng ta gọi hàm này và nạp vào địa chỉ của hàm cần callback, biến toàn cục mqtt_function sẽ lưu lại địa chỉ của hàm đó
Bây giờ, trong hàm MQTT_read, tiến hành gọi hàm callback
1 |
mqtt_function(&mqtt);// goi ham thuc thi |
Hàm MQTT_read lúc này như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
void MQTT_read(uint8_t *mqtt_data,uint16_t mqtt_length) { MQTT_callback_typedef mqtt; //kiem tra do dai cua topic mqtt if((mqtt_data[1] & 0x80) == 0) { mqtt.topic_length = swap16(mqtt_data[1]); mqtt.topic = &mqtt.topic[4]; mqtt.mess = &mqtt.topic[4+mqtt_data[1]]; mqtt.mess_length = mqtt_length - 4+mqtt_data[1]; mqtt_function(&mqtt);// goi ham thuc thi } else UART_putString("Tin nhan qua dai ! khong ho tro\r\n"); } |
Bây giờ các bạn có thể quay về file main, tạo 1 hàm với 1 tên bất kì, ví dụ
1 2 3 4 |
void mqtt_callback(MQTT_callback_typedef *mqtt) { //your code } |
Hàm này nhập vào con trỏ struct chứa các thông tin topic, mess để các bạn so sánh hay xử lí gì thì tùy
Sau đó, sau khi kết nối tới broker, gọi
MQTT_registerCallback(&mqtt_callback);
Lúc này, mỗi khi có tin nhắn gửi tới, hàm mqtt_callback sẽ tự động được gọi
Callback là 1 cách rất hay ho để tăng tính độc lập giữa các thư viện
Gói ping
Chúng ta phải thường xuyên ping tới broker để nó biết client vẫn đang hoạt động, khi gửi ping request thì broker cũng sẽ gửi ping reponse. Vậy là cả 2 thằng đều biết là cả 2 vẫn hoạt động bình thường
Thời gian ping phải nhỏ hớn thời gian keep alive, nếu không broker sẽ ngắt kết nối và cho rằng client đã offlife. Lúc này broker sẽ phát đi 1 tin nhắn tới cho các client khác trong mạng đã đăng kí topic bảo rằng thằng này đã offline (đương nhiên muốn làm thế thì client phải chủ động thỏa thuận với broker topic sẽ nhắn và nội dung sẽ nhắn khi tôi offline), hiểu nôm na như để lại di chúc vậy
Do vậy, MQTT có khái niệm will retain
< mình sẽ cập nhật hàm đăng kí gói tin này sau> trong phần này sẽ tập trung viết gói ping
Chào anh, em có tham khảo bài viết này của anh về MQTT nhưng với thingspeak. Kết quả lại không hiển thị được dữ liệu, địa chỉ IP broker em lấy là địa chỉ IP thingspeak server như với giao thức HTTP không biết có đúng không ?
địa chỉ IP thì chỉ có 1 thôi đương nhiên nó sẽ đúng rồi. nhưng các giao thức phân biệt với nhau bằng cổng ! bạn đã chọn đúng cổng chưa ? thông thường MQTT TCP hoạt động ở công 1883, MQTT socket chạy ở cổng 9000 ! Mà thingspeak có mqtt broker à, lần đầu nghe thấy đó