01 引言
通过前面的博文,我们对DataX
有了一定的深入了解了:
- 《DataX教程(01)- 入门》
- 《DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)》
- 《DataX教程(03)- 源码解读(超详细版)
- 《DataX教程(04)- 配置完整解读》
- 《DataX教程(05)- DataX Web项目实践》
- 《DataX教程(06)- DataX调优》
- 《DataX教程(07)- 图解DataX任务分配及执行流程》
- 《DataX教程(08)- 监控与汇报》
随着对DataX
深入学习,我提出了一个疑问,究竟DataX
是如何做到限速的?本文来讲解下。
02 逆向定位代码
我们知道是在core.json
文件里面的speed方法里面限速DataX
的,可以通过record
记录数和byte
字节数来限速:
选中常量,右键使用IDEA的Find Usages…可以看到有两个地方调用了这个值:
接下来,看看这两个配置在Channel
类如何实现限速的。
03 Channel类里实现限速
从下图,可以看到在Channel
初始化时,顺带初始化了限速的记录数(recordSpeed
)以及字节数(byteSpeed
) ,接下来Control+F
看看recordSpeed
在哪里调用了。
可以看到在statPush
方法里面用到了:statPush
整个流程的描述:
- 判断
byteSpeed(bps)
和recordSpeed(tps)
是否都大于0?如果不是,则退出; - 根据当前的
byteSpeed
和设定的byteSpeed
对比,求出睡眠时间(公式:currentByteSpeed * interval / this.byteSpeed- interval;
) - 根据当前的
recordSpeed
和设定的recordSpeed
对比,求出睡眠时间(公式:currentRecordSpeed * interval / this.recordSpeed - interval;
) - 取休眠时间最大值;
Thread.sleep(sleepTime)
来休眠
下面贴上statPush
的完整代码:
private void statPush(long recordSize, long byteSize) {
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
recordSize);
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
byteSize);
//在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数
currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
return;
}
long lastTimestamp = lastCommunication.getTimestamp();
long nowTimestamp = System.currentTimeMillis();
long interval = nowTimestamp - lastTimestamp;
if (interval - this.flowControlInterval >= 0) {
long byteLimitSleepTime = 0;
long recordLimitSleepTime = 0;
if (isChannelByteSpeedLimit) {
long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
if (currentByteSpeed > this.byteSpeed) {
// 计算根据byteLimit得到的休眠时间
byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
- interval;
}
}
if (isChannelRecordSpeedLimit) {
long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
if (currentRecordSpeed > this.recordSpeed) {
// 计算根据recordLimit得到的休眠时间
recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
- interval;
}
}
// 休眠时间取较大值
long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
recordLimitSleepTime : byteLimitSleepTime;
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
lastCommunication.setTimestamp(nowTimestamp);
}
}