背景
在做数据推送功能遇到的一个问题。具体来说,通过SQL查询语句,将Impala中100多万条数据写入到MongoDB时报错。大致的报错信息如下:
java.lang.Exception: org.springframework.dao.DuplicateKeyException: Bulk write operation error on server
101.202.303.404:7056. Write errors: [BulkWriteError{index=0, code=11000, message='E11000 duplicate key error collection:
pddai_cbd_report_api.autojobtab_fa42f748674d59091a3f71adf25de2d5 index: _id_ dup key: { _id:ObjectId('628cdb23d04f507321288fa5') }', details={}}]
解决过程
问题出在_id
这个key重复。故而需要先提一点关于ObjectId的理论知识。
_id
的特征:
-
_id
是集合中文档的主键,用于区分文档(记录) -
_id
自动编入索引。指定{ _id: }
的查找将使用_id
索引。
默认情况下,_id
字段的类型为 ObjectID,支持用户自定义。ObjectID 长度为 12 字节,由4部分组成:
- 4 字节的值,表示自 Unix 纪元以来的秒数
- 3 字节的机器标识符
- 2 字节的进程 ID
- 3 字节的计数器,以随机值开始
一般情况下,不建议使用自定义并覆盖已有的_id
生成方式,除非是对全局唯一主键生成算法(如:snowflake等)比较熟悉,或者业务数据量非常大导致MongoDB自带的生成方式不满足要求等。
在我的业务场景中,使用MongoDB字段的默认生成方式。
因此导致_id
重复的原因只可能是:
- 同时插入两条一模一样的数据,即这两条数据的
_id
相同; - 或者插入MongoDB中已存在的数据,后面插入的数据的
_id
是之前计算好的。
第2种情况的一个场景:比如2022年6月3日 13:29:43,根据这个时间戳生成一批_id
数据。这批数据可能放在代码的内存中,在2022年6月3日 13:30:58,再次插入。
调试见真章:
此时才意识到自己犯下一个很愚蠢的错误:业务数据量还算较大,SQL查询百万甚至千万级别,从JDBC中获取结果时,不是一次性组装到List<Document>
,而是分批组装塞到list
里面去,batchNum=500000
。分批insert到MongoDB中,满足batchNum后,List<Document>
没有重新new ArrayList<Document>
初始化。
问题解决
是不是很小白的问题,是不是感觉很傻?
一个数组list
或者集合collection
,进行for
循环,取list
或者collection
里面的数据赋值到另外一个list<实体类>
里面,在for
循环里面需要对实体类进行初始化。又或者遇到批量取数的情况,也是需要对list<实体类>
进行初始化。
借口:之所以出错,是因为他人写的代码,if,else,for,while循环层层嵌套(不下8层),导致自己在维护他人的代码时犯迷糊。
经过简化的错误的代码片段:
List<Document> documentList = new LinkedList<>();
int q = 0;
boolean isLast = false;
long allCount = 0L;
while (!isLast) {
String execSql = this.buildExecSql(driver, sql);
ResultSet rs = ps.executeQuery(execSql);
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
while (rs.next()) {
allCount++;
Document document = new Document();
for (int j = 0; j < columnCount; j++) {
document.append(metaData.getColumnLabel(j + 1), rs.getObject(j + 1));
}
documentList.add(document);
}
if (q == 0 && documentList.size() <= 0) {
Document document = new Document();
for (int j = 0; j < columnCount; j++) {
// 存空值
document.append(metaData.getColumnLabel(j + 1), null);
}
documentList.add(document);
}
totalCount += documentList.size();
if (documentList.size() < batchNum) {
isLast = true;
}
// 如果循环的是第一次先删除再创建
if (q == 0) {
// 10天有效期, createCollectionWithExpire会先检查集合是否存在, 如果存在, 则先删除, 再新建集合
MongodbUtil.createCollectionWithExpire(mongoTemplate, key, 10 * 60 * 60 * 24);
}
// 是否需要缓存到MongoDB数据库
if (cacheApi) {
MongodbUtil.insertCollection(mongoTemplate, documentList, key);
}
q++;
}
结论
其实对于MongoDB稍微有所了解,整明白其中的原理之后,答案和解决思路就很明显。
当然,前提是需要看懂他人写的代码。