绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
【五分钟了解MongoDB 1】Change Stream 和MongoDB 4.x
2020-05-22 11:36:42

MongoDB中文社区,一个mongoers都会来的社区(微信公众号:mongoing-mongoing)


充分获知数据库的数据变动是从MongoDB向其他数据服务进行数据同步的关键点。与直接查询collection来获取数据变动相比,通过流式的方式进行监听会有效并及时的多。这是一种非常强大的“响应式编程”模式。随着MongoDB的版本更新,流式的获取方式将变得原来越易用。


让我们来一同回顾一下。在MongoDB3.6之前,如果我们希望对MongoDB数据库中的数据变动进行监听,我们通常是通过 “监听并回放oplog”(“tail the oplog”)的模式(oplog表将会记录复制集中的数据变动)。在生产环境中这种方式(“监听并回放oplog”)通常较为复杂,并且难以保证其稳定与可靠性。


Change Streams and Collections


从MongoDB3.6开始支持的 Change Streams打破了这个僵局。 Change Streams使得数据的变动监听变得简单易用。以下是一个示例,该示例演示了通过Node.js对“movieDetails”表的变动监听。


javascript
const MongoClient = require("mongodb").MongoClient;
const uri = "MONGODBURL";
const client = new MongoClient(uri, { useNewUrlParser: true });
client.connect().then(db => {
 const changeStream = client.db("video").collection("movieDetails").watch();
 changeStream.on("change", next => {
   console.log(next);
 });
});


上述代码首先连接进入了数据实例,并通过watch()函数对“video”库的“movieDetails”表建立了change stream。而后通过.on("change",... 建立了一个事件trigger,该事件将监听该change stream上的所有变动并调用对应的后续函数。在上述示例中,监听到变动后将会将变动事件打印出来。下面是我们在 [MongoDB Compass](MongoDB Compass - MongoDB Compass stable)中进行对应修改后的输出示例:


javascript
{ _id:   
   { _data:      '825C51D03F0000000129295A1004E515B4338C574BA2B9603CB1C7FB3B0446645F696400645C0EC4B74B052F9E2EF0C3810004' },  operationType: 'replace', 
 clusterTime:   
  Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1548865599 }, 
 fullDocument:
   { _id: 5c0ec4b74b052f9e2ef0c381,
     title: 'PS I Love You',
     year: 2007,
     ...
     awards: { wins: 2, nominations: 4, text: '2 wins & 4 nominations.' },
     type: 'movie' },
  ns: { db: 'video', coll: 'movieDetails' },
  documentKey: { _id: 5c0ec4b74b052f9e2ef0c381 }
 }


在上述的返回中可以快速的找到此次change stream的重要信息,即通过operationType了解到变动的类型,有关完整的返回说明请参考 [Change Events documentation]。当监听某一个collection的时候,operationType的值通常是 insert , update , replace , delete 或 invalidate ,前四种的含义通过名字可以清楚的获知,上述返回的replace类型是我们通过Compass同collection进行replace操作的反馈。


当我们监听的collection被drop、改名或者其所属的db被drop的时候,我们将会看到类型为invalidate的operationType。于此同时这也意味着是时候关闭change stream了。上述返回中剩下的部分是变动的详细信息,变动发生在什么namespace,数据是什么样的,何时发生的变更。


以上的示例是在MongoDB4.x版本中生成的,相比3.6版本,4.x版本新增了一个_data字段。该字段是一个恢复token(resume token),应用程序能够在重连后从该点进行继续监听。



Beyond Collections


如果你只需要针对某一个collection进行变动监听,MongoDB3.6就可以满足你的需求,但是对于那些此前通过oplog来进行变动监听的同学,他们的诉求往往是希望监听数据库中的所有变动,以此来将变动应用到其他系统中。MongoDB4.0很好的满足了这个诉求,在4.0版本中我们可以针对若干个数据库或者整个实例(复制集或者sharding)进行变动监听。与`watch()`某一个collection不同,4.0中我们可以`watch()`某个数据库或者整个实例。


javascript
const MongoClient = require("mongodb").MongoClient;
const uri ="MONGODBURL";
undefined
const client = new MongoClient(uri, { useNewUrlParser: true });
client.connect().then(db => {
 const changeStream = client.watch();
 changeStream.on("change", next => {
   console.log(next);
});


上述示例将对于任何数据库、任何表的任何变动进行输出,这些也不是我们所捕获的全部信息。由于我们将监听范围放到了广,我们也将会看到在删除collection时候的删除事件、删除数据库的时间以及重命名collection的事件。



What Next?


我们可以根据实际需要选择监听某一个collection的变动、或者某个数据库中所有collection的变动又或者是整个实例中所有的数据库与collection的变动。需要注意的是创建新collection、数据库的变动将不会被直接监听到,不过我们可以通过变动中的内容间接获知。


当然,这也不是什么大问题,如果我们希望监听数据库或者collection的创建,我们可以通过变动内容中的collection来判断是否该表为此前未创建的新表这一方法进行。另外,索引的创建由于不是为表数据变动也不会被监听捕获。


MongoDB4.0为我们带来了一个全新且强大的数据变动监听方式,尤其是该方式可以实时进行变动捕获。我们十分建议你去尝试下这个功能。Change Stream的详细文档可以参考[Change Streams]。如果你还未安装MongoDB4.0实例,你也可以在MongoDB Atlas中[注册]并获取M0的免费集群节点进行学习和测试。


译者:周李洋

Teambition运维总监

MongoDB中文社区联席主席

分享好友

分享这个小栈给你的朋友们,一起进步吧。

MongoDB资料专区
创建时间:2020-05-08 13:54:47
MongoDB是一个介于关系数据库和非关系数据库之间的产品。MongoDB是一个基于分布式文件存储 [1] 的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 小雨滴
    专家
戳我,来吐槽~