一般来说我们的数据都是mysql->hive tpg->hive orcle->hive
之前的文章介绍过优化读的速度。比如创建索引,增加fetchsize 增加并行。
那么怎么增加hdfs写的速度呢?
以orc文件为例。目前我们能控制的无非就是并行写,不能根本的解决问题
举例 1e的数据,mysql分成10channel,51个tasks,那么在hdfswriter上会同时有10个线程去写orc文件,关键问题在于
比如mysql读的速度是2w Records/s, hdfs写的速度是1w Records/s 那么不管你怎么切分任务,写的速度永远是导数的瓶颈?
如何解决?
首先看datax的orc写的源码
/**
* 写orcfile类型文件
*
* @param lineReceiver
* @param config
* @param fileName
* @param taskPluginCollector
*/
public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
TaskPluginCollector taskPluginCollector) {
List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
String compress = config.getString(Key.COMPRESS, null);
List<String> columnNames = getColumnNames(columns);
List<ObjectInspector> columnTypeInspectors = getColumnTypeInspectors(columns);
StructObjectInspector inspector = (StructObjectInspector) ObjectInspectorFactory
.getStandardStructObjectInspector(columnNames, columnTypeInspectors);
OrcSerde orcSerde = new OrcSerde();
FileOutputFormat outFormat = new OrcOutputFormat();
if (!"NONE".equalsIgnoreCase(compress) && null != compress) {
Class<? extends CompressionCodec> codecClass = getCompressCodec(compress);
if (null != codecClass) {
outFormat.setOutputCompressorClass(conf, codecClass);
}
}
try {
RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL);
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
MutablePair<List<Object>, Boolean> transportResult = transportOneRecord(record, columns, taskPluginCollector);
if (!transportResult.getRight()) {
writer.write(NullWritable.get(), orcSerde.serialize(transportResult.getLeft(), inspector));
}
}
writer.close(Reporter.NULL);
} catch (Exception e) {
String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName);
LOG.error(message);
Path path = new Path(fileName);
deleteDir(path.getParent());
throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);
}
}
注意这里它是
FileOutputFormat outFormat = new OrcOutputFormat();// 设置 输出格式orc
RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL);
writer.write(xx,row)
查看源码outFormat.getRecordWriter() 方法 本质
然后真正开始写的时候writer.write()
注意这个方法当write为null的时候,会new一个writer。上文有一个,这里又能new一个,
是否说明这两个writer都能写orc文件
那这两个有什么区别呢?
根据我上一篇文章写过的
orc文件的读写及整合hive_cclovezbf的博客-CSDN博客_hive读取orc文件
writer = OrcFile.createWriter(path, options);
writer.addRowBatch(batch); 这里是可以批量添加然后写入的,默认是1024条数据写入
而datax的代码里是来一条写一条,这两种效率谁高谁低就不用说了吧。。
但是改造起来比较麻烦。。有点懒。。需要的时候再写把。