一般来说我们的数据都是mysql->hive tpg->hive orcle->hive
之前的文章介绍过优化读的速度。比如创建索引,增加fetchsize 增加并行。
那么怎么增加hdfs写的速度呢?
以orc文件为例。目前我们能控制的无非就是并行写,不能根本的解决问题
举例 1e的数据,mysql分成10channel,51个tasks,那么在hdfswriter上会同时有10个线程去写orc文件,关键问题在于
比如mysql读的速度是2w Records/s, hdfs写的速度是1w Records/s 那么不管你怎么切分任务,写的速度永远是导数的瓶颈?
如何解决?
首先看datax的orc写的源码
/** * 写orcfile类型文件 * * @param lineReceiver * @param config * @param fileName * @param taskPluginCollector */ public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName, TaskPluginCollector taskPluginCollector) { List<Configuration> columns = config.getListConfiguration(Key.COLUMN); String compress = config.getString(Key.COMPRESS, null); List<String> columnNames = getColumnNames(columns); List<ObjectInspector> columnTypeInspectors = getColumnTypeInspectors(columns); StructObjectInspector inspector = (StructObjectInspector) ObjectInspectorFactory .getStandardStructObjectInspector(columnNames, columnTypeInspectors); OrcSerde orcSerde = new OrcSerde(); FileOutputFormat outFormat = new OrcOutputFormat(); if (!"NONE".equalsIgnoreCase(compress) && null != compress) { Class<? extends CompressionCodec> codecClass = getCompressCodec(compress); if (null != codecClass) { outFormat.setOutputCompressorClass(conf, codecClass); } } try { RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL); Record record = null; while ((record = lineReceiver.getFromReader()) != null) { MutablePair<List<Object>, Boolean> transportResult = transportOneRecord(record, columns, taskPluginCollector); if (!transportResult.getRight()) { writer.write(NullWritable.get(), orcSerde.serialize(transportResult.getLeft(), inspector)); } } writer.close(Reporter.NULL); } catch (Exception e) { String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName); LOG.error(message); Path path = new Path(fileName); deleteDir(path.getParent()); throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e); } }
注意这里它是
FileOutputFormat outFormat = new OrcOutputFormat();// 设置 输出格式orc
RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL);
writer.write(xx,row)
查看源码outFormat.getRecordWriter() 方法 本质
然后真正开始写的时候writer.write()
注意这个方法当write为null的时候,会new一个writer。上文有一个,这里又能new一个,
是否说明这两个writer都能写orc文件
那这两个有什么区别呢?
根据我上一篇文章写过的
orc文件的读写及整合hive_cclovezbf的博客-CSDN博客_hive读取orc文件
writer = OrcFile.createWriter(path, options);
writer.addRowBatch(batch); 这里是可以批量添加然后写入的,默认是1024条数据写入
而datax的代码里是来一条写一条,这两种效率谁高谁低就不用说了吧。。
但是改造起来比较麻烦。。有点懒。。需要的时候再写把。