安装依赖
pip install pymongo
类封装
以 task_id 为基准的封装测试
功能1:连接数据库,并连接集合(没有数据库则创建数据库,没有集合则创建集合)
功能2:插入数据
功能3:以 task_id 为基准查询数据
功能4:以 task_id 为基准更新数据
功能5:以 task_id 为基准删除数据
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from pymongo import MongoClient
from pymongo.collection import Collection
class Mongodb(object):
def __init__(self):
self.db_name = "crazyaction"
self.collection_name = "taskdata"
self.collection = self.connect(self.db_name)
def connect(self, database_name):
try:
clientdb = MongoClient()
#clientdb = MongoClient(host='localhost', port=27017)
"""
database_list = clientdb.list_database_names()
if not database_name in database_list:
db = clientdb[database_name]
"""
db = clientdb[database_name]
collection: Collection = db[self.collection_name]
except Exception as e:
print("连接MongoDB错误: {}".format(e))
return None
return collection
def insert(self, data):
try:
result = self.collection.insert_one(data)
print(f"插入ID: {result.inserted_id}")
except Exception as e:
print("插入数据错误: {}".format(e))
return False
return True
def select_task_id(self, task_id):
try:
result = self.collection.find_one({"task_id": task_id})
#result = self.collection.find({"task_id": task_id})
except Exception as e:
print("查询任务ID错误: {}".format(e))
return None
return result
def update(self, task_id, new_data):
try:
condition = {"task_id": task_id}
update_data = {"$set": new_data}
#result = self.collection.update_many(condition, update_data) # 更新所有
result = self.collection.update_one(condition, update_data) # 更新一条
except Exception as e:
print("更新数据错误: {}".format(e))
return False
return result.raw_result.get("updatedExisting")
def delete(self, task_id):
try:
condition = {"task_id": task_id}
result = self.collection.delete_one(condition)
#result = self.collection.delete_many(condition) # 删除所有数据
except Exception as e:
print("删除数据错误: {}".format(e))
return False
return result.deleted_count
if __name__ == "__main__":
task_id = "1d8668d6-997f-11ee-8ba4-2500a2496aba"
data = {"task_id": "1d8668d6-997f-11ee-8ba4-2500a2496aba", "a":"b", "c":[1, 2], "d": [{"a":"b"}, {"c":[1, 2]}]}
new_data = {"task_id": "1d8668d6-997f-11ee-8ba4-2500a2496aba", "k": "oooooooooooooooooooooooo"}
db = Mongodb()
if not db.insert(data):
print("插入数据失败")
result = db.select_task_id(task_id)
if not result is None:
print(result)
else:
print("查询没有该任务ID")
if not db.update(task_id, new_data):
print("更新数据失败")
if not db.delete(task_id):
print("删除数据失败")