最近做了个即时通讯的小项目也在这期间接触到了MQTT这个东西,如果你能搜到这篇博客,说明你对MQTT起码有大致了解的,就不在这进行介绍了。今天我们要讲的是MQTT的一个node.js实现—–Mosca,由于本身对MQTT规范了解也不深,在这里就主要以代码为主。

什么是Mosca

Mosca是MQTT在Node.js中的一个Broker的开源实现,通俗讲也就是MQTT中的Server实现。
同时作者也维护着MQTT.js这一模块,这一模块大家可理解为MQTT的Client实现。而纵观整个Node.js的module中比较有分量的也就以上两个module.

如何实现即时聊天功能

没有调研过其他的聊天软件是怎么实现一对一推送的,下面简单介绍下MQTT是怎么实现的。MQTT是简单的发布订阅模式,也就是说当一条消息发出去的时候,谁订阅了谁就会受到,而应用于即时通讯的话,我们就需要进行1对1的进行聊天,这时候我们就需要,每个客户端订阅自己的一个专属channel,然后由Broker来进行推送(Broker做的话,可以实现消息过滤,离线消息等)。

在这过程中也遇到了一些问题,比如对于即时通讯来说,客户端应该以什么作为自己的专属channel,在我接手这个项目之前他们订阅的是设备号,这样其实有很多问题,无法实现多设备同时在线。每次用户换设备登陆,服务器端都需要将相应的离线消息进行转移,耗时耗力而且一些需求无法满足,所以我觉得客户端订阅应该以用户的唯一ID为channel,这样就实现了客户端多设备在线,以及离线消息等功能。

如何实现离线消息功能

这里所说的离线消息是指,当设备离线Mqtt断开了之后,这时候向用户推送的数据,会在用户下次上线的时候推给用户。

这个地方客户端有两个关键点:

  • 客户端cleanSession需要设置为false.则该链接便会认为是持久连接,当链接断开的时候,发送的消息便会进行报错,直到下次链接再次建立,会将这些消息发送给客户端
  • Qos标志,在Mqtt标准钟,对Qos有3种设置,在这里我们需要将Qos设置为1。更多Qos介绍,请点击

上面所说的两个关键是是客户端需要设置的。在在这里我们用到的Mosca也有需要注意的地方便是发布订阅的模型选择。详情点击ascoltatori。这一模块是构建Mosca的核心模块之一。

我之前一同事进行过测试,当采用redis,mongodb作为发布订阅模型的话会出现离线消息数据错乱的情况,经过多次测试之后,我发现ZeroMQ,和RabbitMQ这种消息队列的离线数据是完全按照发送的顺序,离线发送。

示例代码

在这里我们将演示一下离线消息发送,在这里我们采用的是zeroMQ最为订阅发布模型

Client pub

1
var mqtt = require('mqtt');

var client = mqtt.createClient(5112, '182.92.149.22');

//client.subscribe('presence');
var num = 0;
setInterval(function (){
  client.publish('order', 'Hello mqtt ' + (num++),{qos:1, retain: true});
}, 1000);

Client Sub

1
var mqtt = require('mqtt');

var client = mqtt.createClient(5112, 'localhost',{clientId:'1',clean:false});

client.subscribe('test',{qos:1});

client.on('message', function (topic, message) {
  console.log(message);
});

Mosca Server

1
var mosca = require('mosca')
var settings = {
  port: 5112,
  backend:{
  	 type: 'zmq',
      json: false,
      zmq: require("zmq"),  
      port: "tcp://127.0.0.1:33333",
      controlPort: "tcp://127.0.0.1:33334",
      delay: 5
  },
  persistence:{
  	factory: mosca.persistence.Mongo,
    url: "mongodb://localhost:27017/mosca"
  }
};
var server = new mosca.Server(settings);
server.on('ready', function(){
	console.log('Mosca server is up and running');	
});
server.on('published', function(packet, client) {
  console.log('Published', packet.payload);
});

在上面我们看到我们使用zeromq作为发布订阅模型, mongodb作为持久化Db,你可以先将Mosca server以及 Client pub启动,过一段时间你再启动Client sub。这时候你会发现,收到的消息会是从1开始的,,