dremio 是支持create table的,官方文档比较简单,只说明了可以直接创建,但是dremio 的create table 其实是更加强大的
如果使用了dremio的ui 的话,数据下载导出就是利用了这个能力,但是默认是在master中执行的,而且数据是放在一个固定的space中 __datasetDownload
同时基于jobid 动态的创建了一个表,然后去读字节流,之后包装的http servlet 进行的处理
几种支持的格式
txt (csv),json, parquet 而且支持比较多的配置选项
参考测试用例
当让对于支持create table的是需要开启CTAS的,同时有时看看源码,多了解下官方的机制还是很有用的,同时官方源码中的一个测试用例也是很不错的学习
资料
package com.dremio.exec.sql;
import org.junit.Test;
import com.dremio.PlanTestBase;
public class TestCTASWithOptions extends PlanTestBase {
@Test
public void csv() throws Exception {
try {
test("CREATE TABLE dfs_test.testCsv " +
"STORE AS (type => 'text', fieldDelimiter => ',') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM TABLE(\"dfs_test\".\"testCsv\"" +
"(type => 'text', fieldDelimiter => ',', extractHeader => true))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues("0", "None")
.baselineValues("1", "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testCsv");
}
}
@Test
public void csvWithCustomExtension() throws Exception {
try {
test("CREATE TABLE dfs_test.csvWithCustomExtension " +
"STORE AS (type => 'text', fieldDelimiter => ',', outputExtension => 'myparquet') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM TABLE(\"dfs_test\".\"csvWithCustomExtension\"" +
"(type => 'text', fieldDelimiter => ',', extractHeader => true))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues("0", "None")
.baselineValues("1", "San Francisco")
.go();
} finally {
// DROP TABLE doesn't support custom extensions
//test("DROP TABLE dfs_test.csvWithCustomExtension");
}
}
@Test
public void csvUnordered() throws Exception {
try {
// order the options differently
test("CREATE TABLE dfs_test.testCsvUnordered " +
"STORE AS (fieldDelimiter => ',', type => 'text') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM TABLE(\"dfs_test\".\"testCsvUnordered\"" +
"(type => 'text', fieldDelimiter => ',', extractHeader => true))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues("0", "None")
.baselineValues("1", "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testCsvUnordered");
}
}
@Test
public void csvTabRecordDelimiter() throws Exception {
try {
test("CREATE TABLE dfs_test.testCsvTabRecordDelimiter " +
"STORE AS (type => 'text', fieldDelimiter => ',', lineDelimiter => '\t') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM " +
"TABLE(\"dfs_test\".\"testCsvTabRecordDelimiter\"" +
"(type => 'text', fieldDelimiter => ',', lineDelimiter => '\t', extractHeader => true))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues("0", "None")
.baselineValues("1", "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testCsvTabRecordDelimiter");
}
}
@Test
public void tsv() throws Exception {
try {
test("CREATE TABLE dfs_test.testTsv STORE AS (type => 'teXt', fieldDelimiter => '\t') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM " +
"TABLE(\"dfs_test\".\"testTsv\"(type => 'text', fieldDelimiter => '\t', extractHeader => true))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues("0", "None")
.baselineValues("1", "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testTsv");
}
}
@Test
public void json() throws Exception {
try {
test("CREATE TABLE dfs_test.testJson " +
"STORE AS (type => 'json') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM " +
"TABLE(\"dfs_test\".\"testJson\"(type => 'json'))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues(0L, "None")
.baselineValues(1L, "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testJson");
}
}
@Test
public void jsonWithCustomExtension() throws Exception {
try {
test("CREATE TABLE dfs_test.jsonWithCustomExtension " +
"STORE AS (type => 'json', outputExtension => 'myjson') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM " +
"TABLE(\"dfs_test\".\"jsonWithCustomExtension\"(type => 'json'))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues(0L, "None")
.baselineValues(1L, "San Francisco")
.go();
} finally {
// DROP TABLE doesn't support custom extensions
//test("DROP TABLE dfs_test.jsonWithCustomExtension");
}
}
@Test
public void parquet() throws Exception {
try {
test("CREATE TABLE dfs_test.testParquet " +
"STORE AS (type => 'parquet') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM " +
"TABLE(\"dfs_test\".\"testParquet\"(type => 'parquet'))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues(0L, "None")
.baselineValues(1L, "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testParquet");
}
}
@Test
public void parquetWithCustomExtension() throws Exception {
try {
test("CREATE TABLE dfs_test.parquetWithCustomExtension " +
"STORE AS (type => 'parquet', outputExtension => 'myparquet') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT * FROM " +
"TABLE(\"dfs_test\".\"parquetWithCustomExtension\"(type => 'parquet'))")
.unOrdered()
.baselineColumns("region_id", "sales_city")
.baselineValues(0L, "None")
.baselineValues(1L, "San Francisco")
.go();
} finally {
// DROP TABLE doesn't support custom extensions
//test("DROP TABLE dfs_test.parquetWithCustomExtension");
}
}
@Test
public void parquetWithPartition() throws Exception {
try {
test("CREATE TABLE dfs_test.testParquetWithPartition " +
"PARTITION BY (region_id) " +
"STORE AS (type => 'parquet') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2");
testBuilder()
.sqlQuery("SELECT dir0, region_id, sales_city FROM TABLE(\"dfs_test\".\"testParquetWithPartition\"(type => 'parquet'))")
.unOrdered()
.baselineColumns("dir0", "region_id", "sales_city")
.baselineValues("0_0", 0L, "None")
.baselineValues("1_1", 1L, "San Francisco")
.go();
} finally {
test("DROP TABLE dfs_test.testParquetWithPartition");
}
}
@Test
public void negativeCaseUnsupportedType() throws Exception {
final String query = "CREATE TABLE dfs_test.negativeCaseUnsupportedType " +
"STORE AS (type => 'unknownFormat') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2";
errorMsgTestHelper(query, "unknown type unknownFormat, expected one of");
}
@Test
public void negativeCaseUnknownOption() throws Exception {
final String query = "CREATE TABLE dfs_test.negativeCaseUnknownOptions " +
"STORE AS (type => 'json', unknownOption => 'sd') " +
"AS SELECT region_id, sales_city FROM cp.\"region.json\" ORDER BY region_id LIMIT 2";
errorMsgTestHelper(query, "Unknown storage option(s): {unknownOption=sd}");
}
@Test
public void csvWithSingleWriter() throws Exception {
try {
final String query = "CREATE TABLE dfs_test.csvWithSingleWriter " +
"STORE AS (type => 'text', fieldDelimiter => ',') " +
"WITH SINGLE WRITER " +
"AS SELECT region_id, count(*) cnt FROM cp.\"region.json\" GROUP BY region_id ORDER BY region_id LIMIT 2";
test(query);
testBuilder()
.sqlQuery("SELECT * FROM TABLE(\"dfs_test\".\"csvWithSingleWriter\"" +
"(type => 'text', fieldDelimiter => ',', extractHeader => true))")
.unOrdered()
.baselineColumns("region_id", "cnt")
.baselineValues("0", "1")
.baselineValues("1", "1")
.go();
} finally {
test("DROP TABLE dfs_test.csvWithSingleWriter");
}
}
}
说明
实际上我们也可以参考此玩法,使用s3开启CTAS,将需要导出的数据放到s3中,之后基于s3进行统一的数据导出处理,好处是可以规避ui 100万数据导出的一个限制,同时进行不同场景数据的优化处理,当然create还支持数据分片以及排序的。。。