数采网关程序——Modbus-RTU与MQTT协议转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <modbus.h>

#include <unistd.h>
#include <MQTTClient.h>

#define ADDRESS ""
#define CLIENTID ""

#define LOOP 10
#define SERVER_ID 1
#define ADDRESS_START 0
#define ADDRESS_END 4
#define CODE 1

void publish(MQTTClient client, char* topic, char* payload) {
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.payload = payload;
pubmsg.payloadlen = strlen(pubmsg.payload);
pubmsg.qos = 2;
pubmsg.retained = 0;
MQTTClient_deliveryToken token;
MQTTClient_publishMessage(client, topic, &pubmsg, &token);
MQTTClient_waitForCompletion(client, token, 1000L);
printf("Message '%s' with delivery token %d delivered\n", payload, token);
}

int on_message(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
char* payload = message->payload;
printf("Received operation %s\n", payload);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}

int main(void)
{
modbus_t *ctx;
int rc;
int nb_fail;
int nb_loop;
int addr;
int nb;
uint8_t *tab_rp_bits;
uint16_t *tab_rp_registers;
char buffer[100];

/*MQTT initialize*/
MQTTClient client;
MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
conn_opts.username = "";
conn_opts.password = "";
MQTTClient_setCallbacks(client, NULL, NULL, on_message, NULL);
int rc_mqtt;
if ((rc_mqtt = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
printf("Failed to connect, return code %d\n", rc_mqtt);
exit(-1);
}
//create device
publish(client, "s/us", "100,USTB_TEST,c8y_MQTTDevice");
//set hardware information
publish(client, "s/us", "110,S123456789,MQTT test model,Rev0.1");
//listen for operation
MQTTClient_subscribe(client, "s/ds", 0);

/* RTU */
ctx = modbus_new_rtu("/dev/ttyS1", 19200, 'N', 8, 1);
modbus_set_slave(ctx, SERVER_ID);

/* TCP */
// ctx = modbus_new_tcp("127.0.0.1", 1502);
// ctx = modbus_new_tcp("192.168.72.61", 502);
// modbus_set_debug(ctx, TRUE);

if (modbus_connect(ctx) == -1) {
fprintf(stderr, "Connection failed: %s\n",
modbus_strerror(errno));
modbus_free(ctx);
return -1;
}

nb = ADDRESS_END - ADDRESS_START;

if(CODE == 1 || CODE == 2){
tab_rp_bits = (uint8_t *) malloc(nb * sizeof(uint8_t));
memset(tab_rp_bits, 0, nb * sizeof(uint8_t));
} else{
tab_rp_registers = (uint16_t *) malloc(nb * sizeof(uint16_t));
memset(tab_rp_registers, 0, nb * sizeof(uint16_t));
}

nb_loop = nb_fail = 0;
while (nb_loop++ < LOOP) {
int i;
addr = ADDRESS_START;

switch(CODE){
case 1: rc = modbus_read_bits(ctx, addr, nb, tab_rp_bits);break; // modbus功能码01
case 2: rc = modbus_read_input_bits(ctx, addr, nb, tab_rp_bits);break; // modbus功能码02
case 3: rc = modbus_read_registers(ctx, addr, nb, tab_rp_registers);break; // modbus功能码03
case 4: rc = modbus_read_input_registers(ctx, addr, nb, tab_rp_registers);break; // modbus功能码04
default: rc = 0;
}

printf("<<\n");
if (rc != nb) {
printf("ERROR modbus_read_registers (%d)\n", rc);
printf("Address = %d, nb = %d\n", addr, nb);
nb_fail++;
} else {
for (i=0; i<nb; i++) {
buffer[0] = '\0';
switch(CODE){
case 1: printf("Address = %d, Status = %X\n", addr+i, tab_rp_bits[i]);
sprintf(buffer,"%s%s%d%s%d","200,", "Register_Address_",ADDRESS_START+i, ",ustb_test,", tab_rp_bits[i]);
break;
case 2:printf("Address = %d, Status = %X\n", addr+i, tab_rp_bits[i]);
sprintf(buffer,"%s%s%d%s%d","200,", "Register_Address_",ADDRESS_START+i, ",ustb_test,", tab_rp_bits[i]);
break;
case 3:printf("Address = %d, Value = %d\n", ADDRESS_START+i, tab_rp_registers[i]);
sprintf(buffer,"%s%s%d%s%d","200,", "Register_Address_",ADDRESS_START+i, ",ustb_test,", tab_rp_registers[i]);
break;
case 4:printf("Address = %d, Value = %d\n", ADDRESS_START+i, tab_rp_registers[i]);
sprintf(buffer,"%s%s%d%s%d","200,", "Register_Address_",ADDRESS_START+i, ",ustb_test,", tab_rp_registers[i]);
break;
}
publish(client, "s/us",buffer);
sleep(1);
}
}

printf("LOOP: %d, CODE: %d\n", nb_loop, CODE);
printf("Read: ");
if (nb_fail)
printf("%d FAILS\n", nb_fail);
else
printf("SUCCESS\n");

printf(">>\n");
}

/* Free the memory */
free(tab_rp_registers);

/* Close the connection */
modbus_close(ctx);
modbus_free(ctx);

return 0;
}