var mqtt = require('mqtt'); //https://www.npmjs.com/package/mqtt
var Topic = '#'; //subscribe to all topics
var Broker_URL = 'mqtt://MQTT_BROKER_URL';
var Database_URL = 'Database_URL';
var options = {
clientId: 'MyMQTT',
port: 1883,
keepalive : 60
};
var client = mqtt.connect(Broker_URL, options);
client.on('connect', mqtt_connect);
client.on('reconnect', mqtt_reconnect);
client.on('message', mqtt_messsageReceived);
client.on('close', mqtt_close);
function mqtt_connect() {
console.log("Connecting MQTT");
client.subscribe(Topic, mqtt_subscribe);
};
function mqtt_subscribe(err, granted) {
console.log("Subscribed to " + Topic);
if (err) {console.log(err);}
};
function mqtt_reconnect(err) {
console.log("Reconnect MQTT");
if (err) {console.log(err);}
client = mqtt.connect(Broker_URL, options);
};
function after_publish() {
//do nothing
};
//receive a message from MQTT broker
function mqtt_messsageReceived(topic, message, packet) {
var message_str = message.toString(); //convert byte array to string
console.log("message to string", message_str);
message_str = message_str.replace(/\n$/, ''); //remove new line
//message_str = message_str.toString().split("|");
console.log("message to params array",message_str);
//payload syntax: clientID,topic,message
if (message_str.length == 0) {
console.log("Invalid payload");
} else {
insert_message(topic, message_str, packet);
//console.log(message_arr);
}
};
function mqtt_close() {
//console.log("Close MQTT");
};
////////////////////////////////////////////////////
///////////////////// MYSQL ////////////////////////
////////////////////////////////////////////////////
var mysql = require('mysql'); //https://www.npmjs.com/package/mysql
//Create Connection
var connection = mysql.createConnection({
host: Database_URL,
user: "newuser", //DB Username
password: "mypassword", //DB Password
database: "mydb" //DB Name
});
connection.connect(function(err) {
if (err) throw err;
//console.log("Database Connected!");
});
//insert a row into the tbl_messages table
function insert_message(topic, message_str, packet) {
var message_arr = extract_string(message_str); //split a string into an array
var clientID= message_arr[0];
var message = message_arr[1];
var date= new Date();
var sql = "INSERT INTO ?? (??,??,??,??) VALUES (?,?,?,?)";
var params = ['tbl_messages', 'clientID', 'topic', 'message','date', clientID, topic, message, date];
sql = mysql.format(sql, params);
connection.query(sql, function (error, results) {
if (error) throw error;
console.log("Message added: " + message_str);
});
};
//split a string into an array of substrings
function extract_string(message_str) {
var message_arr = message_str.split(","); //convert to array
return message_arr;
};
//count number of delimiters in a string
var delimiter = ",";
function countInstances(message_str) {
var substrings = message_str.split(delimiter);
return substrings.length - 1;
};
2条答案
按热度按时间zwghvu4y1#
是的,您可以使用任何语言将有效负载从mqtt发送到mysql。基本上,您可以做的是设置一个小节点,它将订阅所有传入的负载并将其转储到mysql数据库中
这是jsscript:-
reference:- httpshttp://github.com/karan6190/mqtt-db-plugin/blob/master/mqtttomysql.js
ego6inou2#
您可以使用任何语言将消息从mqtt发送到mysql数据库(或任何其他数据库)。
例如,您可以创建一个单独的python服务,该服务使用paho mqtt客户机,订阅所有主题,并在收到消息时将数据添加到数据库中。
下面是代码在python中的样子: