使用pymongo的用法请参考:Python连接MongoDB,使用pymongo进行增删改查
迁移的逻辑也很简单:
- 从源主机读取数据库
- 得到数据库的全部集合
- 循环每个集合,根据
_id
迁移 - 然后每一个
_id
都会保存一次 - 每次更换集合的时候都会重新申请一个pymongo的链接,这里是防止数据连接时间过长导致的链接失效报错
示例代码
from pymongo import MongoClient
from loguru import logger
def get_coll_to(host, database, collection):
"""目标数据库"""
mongo_conn = MongoClient(host=host, port=27017)
mongo_db = mongo_conn.get_database(database)
coll = mongo_db.get_collection(collection)
return coll
def _save_or_update_mongodb(coll, dict_value):
"""根据检查_id,如果存在就覆盖,如果不存在就新增"""
record = coll.find_one({"_id": dict_value['_id']})
if not record:
coll.insert_one(dict_value)
else:
coll.update_one(record, {
"$set": dict_value,
})
def main(host_from, host_to):
database_name_list = ["database1", "database2", "database3"] # 数据库列表
for database_name in database_name_list:
logger.debug("开始迁移数据库:{}", database_name)
mongo_conn = MongoClient(host=host_from, port=27017)
mongo_database = mongo_conn.get_database(database_name)
for _collection_name in mongo_database.list_collection_names():
logger.debug("迁移集合:{}", _collection_name)
to_cell = get_coll_to(host_to, database_name, _collection_name)
coll = mongo_database.get_collection(_collection_name)
for _record in coll.find():
logger.debug("数据库:{} 集合:{} 迁移中 id:{}", database_name, _collection_name, _record['_id'])
_save_or_update_mongodb(to_cell, _record)
if __name__ == '__main__':
main("127.0.0.1", "192.168.1.123") # 从本机的地址到远程的地址