MQTT入门
服务端:
使用阿里云
有点贵
使用RabbitMQ搭建MQTT服务
本次使用的是docker 搭建RabbitMQ 提供 MQTT服务
如需各种配置请参考 dockerhub地址,
docker run -d --hostname my-rabbit --name rabbitmq -p 15672:15672 -p 5672:5672 -p 1883:1883 -p 15675:15675 rabbitmq:3-managementport:15672 用于提供websocket服务
port:1883 用于提供mqtt服务
port:5672 用于提供amqp服务
port:5672 用于提供amqp服务
:3-management 指的是 版本号, 3表示是V3 management 默认开启15672提供web服务,
- mqtt服务 默认是不开启的, 需要手动开启, 先进入docker中的rabbitMQ 容器, 然后执行命令:
docker exec -it $container_id bash
rabbitmq-plugins enable rabbitmq_mqtt- mqtt web (ws://)
rabbitmq-plugins enable rabbitmq_web_mqtt
开启服务后, 打开 web管理界面 http://localhost:15672/
- 使用rabbit 作为MQTT服务的话,需要创建 topic类型的交换机。交换机名作为订阅和发布消息的Topic
localhost:15672/#/exchanges

使用emqx搭建MQTT服务
客户端:
使用mqttx测试
Connections 指的是发布端
Subscription 指的是订阅端
新建Connection:
注意如果服务端是RabbitMQ Advanced中的MQTTVersion 不能选5, 因为RabbitMQ不支持5, 填5的话会一直连接失败
账号密码就是Rabbit的登录账号密码,其余参数默认就可以连接。
使用MQTT(TCP)进行连接

使用MQTT(websocket)进行连接
注意端口号是15675 后缀是 /ws

发布消息
连接成功后就可以发送消息了。 法送消息时记得填写交换机名我这里交换机名为MQTT

接收消息
点击New Subscription 就可以开启一个订阅消息客户端

在golang 中使用 MQTT
本项目使用 paho.mqtt.golang 作为 MQTT 客户端库,安装:
go get github.com/eclipse/paho.mqtt.golang
整体代码如下:
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const (
ADDRESS = "tcp://127.0.0.1:1883"
USER_NAME = "guest"
PASSWORD = "guest"
TOPIC = "MQTT"
)
// initMqtt
/**
* @Description: 初始化MQTT
*/
func newMqtt() mqtt.Client {
opts := mqtt.NewClientOptions()
// 添加代理
opts.AddBroker(ADDRESS)
// 设置用户名
opts.SetUsername(USER_NAME)
// 设置密码
opts.SetPassword(PASSWORD)
// 使用连接信息进行连接
MqttClient := mqtt.NewClient(opts)
if token := MqttClient.Connect(); token.Wait() && token.Error() != nil {
fmt.Println("订阅 MQTT 失败")
panic(token.Error())
}
return MqttClient
}
const (
QoS0 = 0 // 至多一次
QoS1 = 1 // 至少一次
QoS2 = 2 // 确保只有一次
)
func Publish(client mqtt.Client, msg string) {
client.Publish(TOPIC, QoS2, true, msg)
}
func Subscribe(client mqtt.Client) {
client.Subscribe(TOPIC, QoS2, subCallBackFunc)
}
func subCallBackFunc(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("订阅: 当前话题是 [%s]; 信息是 [%s] \n", msg.Topic(), string(msg.Payload()))
}
func main() {
c := newMqtt()
c2 := newMqtt()
Subscribe(c)
Subscribe(c2)
select {}
}打开web管理端


可以看到有3个connections 6个Channels
上面两个是golang 的 还有一个是mqttx的, 每一个connection都可以订阅/发布, 所以是6个通道
在js 中使用 MQTT
首先开通 ws 的服务, 因为是js 是没有办法用mqtt(tcp)的
rabbitmq-plugins enable rabbitmq_web_mqtt
默认端口为:15675
下面使用2种js来通过websockect 进行MQTT通信:
思路都是一样的:
先进行带参数连接, 连接成功后可以订阅/发布消息
订阅的消息通过事件去处理, demo 如下:
mqtt.js
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>MQTTDemo</title>
</head>
<body>
<script src="https://cdn.bootcdn.net/ajax/libs/mqtt/4.1.0/mqtt.min.js"></script>
<script>
options = {
protocolId: 'MQIsdp',
protocolVersion: 3,
username:'guest',
password:'guest',
}
let client = mqtt.connect('mqtt://192.168.56.101:15675/ws', options)
client.on('connect', function () {
client.subscribe('MQTT', { qos: 0 }, function (error, granted) {
console.log(error)
if (error) {
console.log(error)
} else {
console.log('subscrib MQTT')
}
})
})
client.on('message', function (topic, message) {
// message is Buffer
console.log(message.toString())
//client.end()
})
</script>
</body>
</html>paho.mqtt.javascript
应该是好久没有更新了, 导致官方给的例子都跑不起来, 要````Paho.MQTT.Client->Paho.Client```
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>paho-mqtt</title>
</head>
<body>
<div id="count">0</div>
<div id="arrivedDiv"></div>
<script src="paho-mqtt.js"></script>
<script type="text/javascript">
function $(id){ return document.getElementById(id);}
// mqtt协议rabbitmq服务
var brokerIp = '192.168.56.101';
// mqtt协议端口号
var port = 15675;
// 接受推送消息的主题
var topic = "MQTT";
// mqtt连接
client = new Paho.Client(brokerIp, port, "/ws", "clientId_" + parseInt(Math.random() * 100, 10));
var options = {
userName :'guest',
password :'guest',
onSuccess:onConnect
};
client.connect(options);
// called when the client connects
function onConnect() {
// Once a connection has been made, make a subscription and send a message.
console.log(("已经连接到" + brokerIp + ":" + port));
client.subscribe(topic);
console.log(("订阅了topic:"+ topic));
}
// 连接断开事件
client.onConnectionLost = function (responseObject) {
console.log("失去连接 - " + responseObject.errorMessage);
};
// 接收消息事件
client.onMessageArrived = function (message) {
console.log("接受主题: " + message.destinationName + "的消息: " + message.payloadString);
$("arrivedDiv").append(message.payloadString + '\r');
var count = $("count").innerHTML;
count = Number(count) + 1;
$("count").innerHTML = count;
};
// 推送给指定主题
function sendMessage() {
var a = $("#message").val();
if (client.isConnected()) {
var message = new Paho.MQTT.Message(a);
message.destinationName = topic;
client.send(message);
}
}
</script>
</body>
</html>