一般来说datax只需要我们设置reader和writer,但是是什么连接了reader和writer呢?
就是channel! 这个有什么用? 慢慢学习。
core.json
[devuser@cdp-node12 /data/DATA_DIR/share/dataingestion/conf]$ cat core.json
{
"entry": {
"jvm": "-Xms1G -Xmx1G",
"environment": {}
},
"common": {
"column": {
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"timeFormat": "HH:mm:ss",
"dateFormat": "yyyy-MM-dd",
"extraFormats":["yyyyMMdd"],
"timeZone": "GMT+8",
"encoding": "utf-8"
}
},
"core": {
"dataXServer": {
"address": "http://localhost:7001/api",
"timeout": 10000,
"reportDataxLog": false,
"reportPerfLog": false
},
"transport": {
"channel": {
"class": "com.tencent.s2.dataingestion.core.transport.channel.memory.MemoryChannel",
"speed": {
"byte": -1,
"record": -1
},
"flowControlInterval": 20,
"capacity": 512,
"byteCapacity": 67108864
},
"exchanger": {
"class": "com.tencent.s2.dataingestion.core.plugin.BufferedRecordExchanger",
"bufferSize": 32
}
},
"container": {
"job": {
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}},
"statistics": {
"collector": {
"plugin": {
"taskClass": "com.tencent.s2.dataingestion.core.statistics.plugin.task.StdoutPluginCollector",
"maxDirtyNumber": 10
}
}
}
}
}
channel类
public Channel(final Configuration configuration) {
//channel的queue里默认record为1万条。原来为512条
//core.json=512 default=2048
int capacity = configuration.getInt(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY, 2048);
//core.json=-1 最后是1M
long byteSpeed = configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024);
//core.json=01 最后是10000r
long recordSpeed = configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000);
if (capacity <= 0) {
throw new IllegalArgumentException(String.format(
"通道容量[%d]必须大于0.", capacity));
}
synchronized (isFirstPrint) {
if (isFirstPrint) {
("Channel set byte_speed_limit to " + byteSpeed
+ (byteSpeed <= 0 ? ", No bps activated." : "."));
("Channel set record_speed_limit to " + recordSpeed
+ (recordSpeed <= 0 ? ", No tps activated." : "."));
isFirstPrint = false;
}
}
this.taskGroupId = configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
this.capacity = capacity;
this.byteSpeed = byteSpeed;
this.recordSpeed = recordSpeed;
//core.json=20 default=20
this.flowControlInterval = configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_FLOWCONTROLINTERVAL, 1000);
//channel的queue默认大小为8M,原来为64M datax自己说的
//core.json=67108864=64M default=8M
this.byteCapacity = configuration.getInt(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);
this.configuration = configuration;
}
再看memoryChannel
public MemoryChannel(final Configuration configuration) {
super(configuration);
this.queue = new ArrayBlockingQueue<Record>(this.getCapacity());
this.bufferSize = configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE);
lock = new ReentrantLock();
notInsufficient = lock.newCondition();
notEmpty = lock.newCondition();
}
1.根据capacity 设置queue 大小
2.bufferSize = core.transport.exchanger.bufferSize=32
我们再看 BufferedRecordExchanger.java
这个sendToWriter 是reader每读取一条记录就send一下
@Override
public void sendToWriter(Record record) {
if(shutdown){
throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
}
Validate.notNull(record, "record不能为空.");
//判断单条记录的大小是否超过64M
if (record.getMemorySize() > this.byteCapacity) {
this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
return;
}
//判断是否满了 bufferSize=32 或者 总记录数的字节大小+ 当前的记录>64M
//也就是说32个记录 作为一个buffer 放到channel里去,并不是一个一个的, 32个记录push 或者64M push到channel
boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
if (isFull) {
// flush到channel 然后buffer和memoryBiteSize重置-0
flush();
}
this.buffer.add(record);
this.bufferIndex++;
memoryBytes.addAndGet(record.getMemorySize());
}
可以看到 reader是一条条发的,但是中间搞了一个buffer,buffer里只能装32条数据或者64M的而数据,到了这个界限,就会flush到channel里去,
到了channel就会被writer消费。
这个时候我们就可以考虑 这个buffer=32 是否可以调大?(bufferSize=64M不需要考虑 一般没这么大)
现在我们知道reader是怎么发数据到channel了的。
继续研究writer是怎么从channel里获取数据的?
这里的recordReceiver就是BufferedRecordExchanger。来看getFromReader()方法
//buffer里有数据,就直接从buffer里消费 //buffer没数据,我就从channel里获取32个数据到buffer,然后一个个消费 @Override public Record getFromReader() { if(shutdown){ throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); } //这里的这个主要是给writer用的。 //判断依据是 下标已经读到最后一位了 boolean isEmpty = (this.bufferIndex >= this.buffer.size()); if (isEmpty) { //如果buffer里为空 我就从channel里获取数据 receive(); } //然后我开始从下标0开始慢慢读 Record record = this.buffer.get(this.bufferIndex++); if (record instanceof TerminateRecord) { record = null; } return record; }//之前把数据都存到channel里了,现在来从channel里获取数据 //然后bufferIndex=0开始从下标为0开始消费 private void receive() { this.channel.pullAll(this.buffer); this.bufferIndex = 0; this.bufferSize = this.buffer.size(); }//这个channel.pullAll没啥看的 public void pullAll(final Collection<Record> rs) { Validate.notNull(rs); this.doPullAll(rs); this.statPull(rs.size(), this.getByteSize(rs)); }//memeoryChannel.doPullAll
@Override protected void doPullAll(Collection<Record> rs) { assert rs != null; rs.clear(); try { long startTime = System.nanoTime(); lock.lockInterruptibly(); //从channel的queue里获取32个数据 while (this.queue.drainTo(rs, bufferSize) <= 0) { notEmpty.await(200L, TimeUnit.MILLISECONDS); } waitReaderTime += System.nanoTime() - startTime; int bytes = getRecordBytes(rs); memoryBytes.addAndGet(-bytes); notInsufficient.signalAll(); } catch (InterruptedException e) { throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); } finally { lock.unlock(); } }
我们梳理下。
reader读数据(可能一条条读可能一批批读)读到的数据一条条的写到buffer里,当buffer=32的时候,再放到channel里去
writer从channel一次次获取32条数据,然后一条条的去给writer去执行。
注意channel的限制是512条或64M
buffer的限制是32条 或64M
这里感觉就不对了呀!channel和buffer都是64M,但是代码是ali写的我也没法。
总感觉这个reader和writer过程有点不对劲
1.buffer=32 能不能搞大点,一般数据都是百万千万级别
2.writer一次获取1条是不是也很蠢?
个人觉得32是可以搞大点的
一般来说有如下三种情况
reader比writer快,这个时候我们可以加大channel的size,保存更多数据,但感觉用处也不大
reader比writer慢,这个时候可以减小channel的size,因为根本不需要那么多内存,还不如给其他人用
reader=writer。建议加大buffer的size,因为大家都很快buffer不够用。