Mongo副本集的数据迁移

方案一

阿里云DTS迁移
支持多种数据源间的数据迁移、支持数据的结构迁移、全量迁移、增量迁移,通过结构、全量、增量迁移可帮助用户将数据实时同步到目标端,实现业务平滑迁移。

image.png

方案二

停机迁移。使用mongodump命令来备份MongoDB数据,使用 mongorestore 命令来恢复备份的数据。

# -h:MongoDB 所在服务器地址
# -d:需要备份的数据库实例
# -o:备份的数据存放位置
mongodump -h dbhost -d dbname -o dbdirectory

# -host <:port>
# -d 需要恢复的数据库实例
# -drop 恢复的时候,先删除当前数据,然后恢复备份的数据。就是说,恢复后,备份后添加修改的数据都会被删除,慎用
# <path> mongorestore 最后的一个参数,设置备份数据所在位置
mongorestore -h <hostname><:port> -d dbname <path>
复制代码

方案三

不停机迁移。在方案二的基础上使用ChangeStream来监听改变的数据库、集合或部署节点。

本篇文章重点说明方案三的实现。

ChangeStream

ChangeStream允许应用程序访问实时的数据更改。应用程序可以使用ChangeStream监听单个集合、数据库或整个部署上的所有数据更改,并立即对它们作出响应。由于ChangeStream使用聚合框架,所以应用程序还可以过滤特定的更改或转换通知。

可用性

ChangeStream在分片集和副本集是可用的

  • 存储引擎 副本集和分片集群必须使用WiredTriger的存储引擎

  • 副本集协议版本 副本集和分片集群必须使用protocol version 1(PV1)

  • 开启“majority”的读关注

    MongoDB 4.0及以前,只有启用了“majority”读关注支持(默认),ChangeStream才可用。

    MongoDB 4.2开始,ChangeStream使用时不需要开启“majority”读关注

监听集合、数据库、部署节点

目标 描述 使用
collection 你可以对单个集合打开一个ChangeStream的游标(除了admin, localconfig数据库中的集合) db.collection.watch()
database 从MongoDB 4.0开始,你可以为单个数据库(除了admin, localconfig数据库)打开一个ChangeStream游标,以观察其所有更改 db.watch()
deployment 从MongoDB 4.0开始,您可以打开一个部署(副本集或分片集群)的ChangeStream游标,以监视所有数据库(除了admin, localconfig)的所有非系统集合的更改。 Mongo.watch()

打开一个ChangeStream

  • 副本集:您可以从任何承载数据的成员发出打开ChangeStream
  • 分片集群:您必须从mongos发出打开ChangeStream

下面使用Python连接Mongo的副本集,打开一个ChangeStream,遍历游标以获取数据更改信息

从MongoDB 4.0开始,您可以指定startAtOperationTime在特定的时间点打开游标。如果指定的起始点在过去,则必须在oplog的时间范围内

def get_client():
    con_url = 'mongodb://192.168.131.130:27000,192.168.131.130:27001,192.168.131.130:27002/?replicaSet=rs0'
	client = MongoClient(con_url)
    return client
def open_change_stream():
    client = get_client()
    db = client.get_database(listen_db)
    cursor = db.watch(start_at_operation_time=Timestamp(1623730169, 1))
    for data in cursor:
        mongo_operator(data)
复制代码

当MongoDB的连接保持打开时,游标也保持打开状态

游标关闭

  • 游标被显式的关闭
  • 一个无效的事件发生。
    • 针对Collection打开的ChangeStream,drop、rename、dropDatabase将导致无效事件
    • 针对database打开的ChangeStream,dropDatabase会导致无效事件。
  • 如果部署的是分片集群,分片删除可能导致打开的ChangeStream游标关闭,并且关闭的ChangeStream游标可能不会完全恢复

修改ChangeStream的输出

可以通过提供管道(Pipeline)来控制ChangeStream的输出。

# pipeline可用stage
$addFields、$match、$project、$replaceRoot、$redact、【$replaceWith、$set、$unset(在Mongodb4.2时可用)】
复制代码
def open_change_stream():
    client = get_client()
    pipeline = [
        {'$match': {'fullDocument.username': 'alice'}},
        {'$addFields': {'newField': 'this is an added field!'}}
    ]
    cursor = db.inventory.watch(pipeline=pipeline)
    document = next(cursor)
复制代码

注意:ChangeStream事件文档的_id字段充当resume标记。不要使用管道来修改或删除ChangeStream事件的_id字段。如果修改_id,ChangeStream会抛出一个异常

pipeline = [ { '$unset': "_id" }]
复制代码

1625131034455.png

更新操作的Full Document

默认情况下,ChangeStream只在更新操作期间返回字段的增量。您可以配置ChangeStream返回最新的多数提交版本的document。具体体现在更新操作的fullDocument字段

cursor = db.inventory.watch(full_document='updateLookup')
document = next(cursor)
复制代码
'fullDocument': {'_id': ObjectId('60c2d488c95f85e0a915dece'), 'name': '张思', 'age': 23.0}
复制代码

如果在更新操作之后、查找之前有一个或多个“majority”提交操作修改了更新后的文档,则返回的fullDocument可能与更新操作时的Document有很大不同。然而,包含在ChangeStream文档中的增量总是正确地描述了应用于该变更流事件的监视集合变更。

恢复ChangeStream

ChangeStream可以通过在打开游标时指定resume token来恢复。

resumeAfter

您可以通过在打开游标时向resumafter传递resume token来恢复特定事件之后的ChangeStream。对于resume token,使用ChangeStream事件文档的_id值。

注意事项

  • oplog必须有足够的历史记录来定位与token或时间戳相关的操作(如果时间戳在过去)。

  • 在无效事件关闭流后,不能使用resumeAfter恢复ChangeStream。 从MongoDB 4.2开始,你可以使用startAfter在无效事件后启动一个新的ChangeStream。

您可以使用resume_after在resume token的操作之后恢复通知。resume_after修饰符必须接收一个resume Token, 对于resume token,使用ChangeStream事件文档的_id值。 例如下面例子中的resume_token。

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = next(cursor)
复制代码

startAfter

您可以在打开游标时向startAfter传递一个resume token,从而在特定事件之后启动一个新的ChangeStream。与resumeAfter不同,startafter可以通过创建一个新的ChangeStream来恢复无效事件后的通知。对于resume token,使用ChangeStream事件文档的_id值。

注意事项

  • oplog必须有足够的历史记录来定位与令牌或时间戳相关的操作(如果时间戳在过去)。

事件通知

ChangeStream只会通知已持久化到副本集中的大多数数据承载成员的数据更改。

例如,考虑一个3个成员的复制集,其中一个ChangeStream游标在主节点上打开。如果客户端发出插入操作,则只有当该插入操作持久化到大多数承载数据的成员时,ChangeStream才会通知应用程序数据更改

Change Event

Change Event事件包括

insert、update、replace、delete、drop、rename、dropDatabase、invalidate
复制代码

ChangeStream响应文档可能具有的所有字段。

{
   _id : { <BSON Object> }, 
   "operationType" : "<operation>",
   "fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>",
      "coll" : "<collection>"
   },
   "to" : {
      "db" : "<database>",
      "coll" : "<collection>"
   },
   "documentKey" : { "_id" : <value> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>", ... ]
   },
   "clusterTime" : <Timestamp>,
   "txnNumber" : <NumberLong>,
   "lsid" : {
      "id" : <UUID>,
      "uid" : <BinData>
   }
}
复制代码
key describe
_id 作为ChangeStream事件标识符的BSON对象。当恢复ChangeStream时,此值用作resumeAfter参数的resumeToken。
operationType insert、update、replace、delete、drop、rename、dropDatabase、invalidate
fullDocument 对于insert和replace操作,表示该操作创建的新文档。
对于删除操作,由于文档不再存在,该字段将被省略。
对于更新操作,只有当您将ChangeStream的fullDocument设置为updateLookup时,才会出现此字段。该字段表示由更新操作修改的文档的最新多数提交版本。如果其他多数提交操作在原始更新操作和完整文档查找之间修改了文档,则此文档可能与updateDescription中描述的更改不同。
ns 受事件影响的命名空间(数据库和或集合)
to 当operationType: rename时,该文档将显示ns集合的新名称。对于operationType的所有其他值,省略本文档。
documentKey CRUD操作的文档的_id。
updateDescription 通过更新操作去更新或删除的字段的文档。只有当operationType为update时,才会出现该文档及其字段。
clusterTime 与事件关联的oplog条目的时间戳
txnNumber 事务数量。仅当操作是多文档事务的一部分时才出现。
lsid 与事务关联的会话的标识符。仅当操作是多文档事务的一部分时才出现。

特别说明

replace事件

replace操作使用update命令,它包括两个阶段:

  1. 使用documentKey和删除原始文档
  2. 使用相同的documentkey插入新文档

replace事件的fullDocument表示插入替换文档后的文档。

invalidate事件

  • 针对Collection打开的ChangeStream,drop、rename、dropDatabase将导致无效事件
  • 针对database打开的ChangeStream,dropDatabase会导致无效事件

无效事件关闭ChangeStream游标。

在无效事件关闭流后,不能使用resumeAfter恢复ChangeStream。从MongoDB 4.2开始,你可以使用startAfter在invalidate事件后启动一个新的ChangeStream。

对于mongo数据迁移的脚本

脚本中只关注了对集合的增删改操作。正常情况下是不允许用户对集合、数据库进行操作的,所以没有考虑对应的操作。

这里监听的是change_stream中的某几个集合的数据增量。也可以直接使用管道(pipeline)来代替if coll not in listener_coll: return.

from builtins import print
from bson import ObjectId, Timestamp
from pymongo import MongoClient

listen_db = "change_stream"
target_db = "change_stream_copy"
listener_coll = ['inventory', 'student', 'user', 'school']

def get_client():
    con_url = 'mongodb://192.168.131.130:27000,192.168.131.130:27001,192.168.131.130:27002/?replicaSet=rs0'
    client = MongoClient(con_url)
    return client


def get_target_collection(coll):
    client = get_client()
    db = client.get_database(target_db)
    coll_instance = db.get_collection(coll)
    return coll_instance


def mongo_insert(collection, data):
    collection.insert_one(data)


def mongo_find(collection, condition):
    return collection.find_one(condition)


def mongo_update(collection, conditions, data):
    print(conditions)
    print(data)
    collection.update_one(filter=conditions, update=data)


def mongo_delete(collection, data):
    collection.delete_one(data)


def mongo_operator(event_dict):
    print(event_dict)
    namespace = event_dict['ns']
    coll = namespace['coll']
    if coll not in listener_coll:
        return
    # document_key为{'_id': ObjectId('60c8595a9ffc11572249d198')}
    document_key = event_dict['documentKey']
    operate = event_dict['operationType']
    coll_instance = get_target_collection(coll)
    if operate == 'insert':
        result = mongo_find(coll_instance, document_key)
        if result:
            print(document_key, "is existed")
        else:
            document = event_dict['fullDocument']
            mongo_insert(coll_instance, document)
            print("insert success")
    elif operate == 'update':
        data = event_dict['updateDescription']
        upd = {"$set": data["updatedFields"]}
        mongo_update(coll_instance, document_key, upd)
        print("update success")
    elif operate == 'delete':
        result = mongo_find(coll_instance, document_key)
        if result:
            # 对于删除操作,fullDocument该字段被省略,因为文档不再存在。
            mongo_delete(coll_instance, document_key)
            print("delete success")
        else:
            print(document_key, " is already deleted")
    elif operate == 'replace':
        mongo_delete(coll_instance, document_key)
        document_insert = event_dict['fullDocument']
        mongo_insert(coll_instance, document_insert)
        print("replace success")
    else:
        print("insert/update/delete/replace之外的操作,数据格式为: %s", event_dict)


def open_change_stream():
    client = get_client()
    db = client.get_database(listen_db)
    cursor = db.watch(start_at_operation_time=Timestamp(1623730169, 1))
    for data in cursor:
        mongo_operator(data)


if __name__ == '__main__':
    open_change_stream()
复制代码

#参考文档
1.Mongo官方文档ChangeStream模块:docs.mongodb.com/manual/chan…
2.python入门教程:docs.python.org/zh-cn/3.11/…

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享