目前来说dremio JdbcStoragePlugin 是没有开源的如果需要学习就需要通过反编译的手段,目前来说
参考类图
从下图可以看出jdbcstorageplugin 也是依赖storageplugin开发的,扩展了SupportsListingDatasets以及SupportsExternalQuery
SupportsListingDatasets 方便支持datasets 的处理,命名上是属于一个list类型的(包含一个iterator 方法定义)SupportsExternalQuery
可以方便的支持外部查询(当然基本上目前的版本都已经支持外部查询了,后边会讲解关于外部查询处理的细节)
SupportsListingDatasets 接口定义
SupportsListingDatasets 主要是处理datasets 的,对于不同的jdbc 驱动包含的东西不一样,information 的包含的比较多,对于普通的jdbc arp 就只需要包含
tables ,jdbc 的处理
// 使用了内部类,
class JdbcIteratorListing implements DatasetHandleListing {
private final Set<CloseableIterator<CanonicalizeTablePathResponse>> references = new HashSet();
JdbcIteratorListing() {
}
public Iterator<DatasetHandle> iterator() {
CloseableIterator<CanonicalizeTablePathResponse> iterator =
//使用JdbcStoragePlugin 的fetcher listTableNames 查询table 信息
JdbcStoragePlugin.this.fetcher.listTableNames(ListTableNamesRequest.newBuilder().build());
this.references.add(iterator);
return Iterators.transform(iterator, (input) -> {
return JdbcStoragePlugin.this.new JdbcDatasetHandle(input.getCatalog(), input.getSchema(), input.getTable());
});
}
public void close() {
try {
AutoCloseables.close(this.references);
} catch (Exception var2) {
JdbcStoragePlugin.LOGGER.warn("Error closing iterators when listing JDBC datasets.", var2);
}
}
}
JdbcSchemaFetcher 定义 (此接口还是比较重要的,我们自定义arp扩展的schema 获取就需要此接口的实现,对于标准数据库一般不需要处理,但是特殊的就需要自己开发实现了,默认已经包含了不少,一般推荐直接基于ArpSchemaFetcher 进行扩展)
public interface JdbcSchemaFetcher extends Service {
void start() throws IOException;
JdbcPluginConfig getConfig();
GetStateResponse getState(GetStateRequest var1);
CatalogOrSchemaExistsResponse catalogOrSchemaExists(CatalogOrSchemaExistsRequest var1);
CanonicalizeTablePathResponse canonicalizeTablePath(CanonicalizeTablePathRequest var1);
CloseableIterator<CanonicalizeTablePathResponse> listTableNames(ListTableNamesRequest var1);
GetTableMetadataResponse getTableMetadata(GetTableMetadataRequest var1);
GetExternalQueryMetadataResponse getExternalQueryMetadata(GetExternalQueryMetadataRequest var1);
}
目前官方的实现
SupportsExternalQuery 简单说明
接口定义
public interface SupportsExternalQuery {
String EXTERNAL_QUERY = "external_query";
int FUNCTION_CALL_NUM_PATHS = 2;
// 物理操作,后续介绍
PhysicalOperator getExternalQueryPhysicalOperator(PhysicalPlanCreator var1, ExternalQueryScanPrel var2, BatchSchema var3, String var4) throws IOException;
static Optional<Function> getExternalQueryFunction(java.util.function.Function<String, BatchSchema> schemaBuilder, java.util.function.Function<BatchSchema, RelDataType> rowTypeBuilder, StoragePluginId pluginId, List<String> tableSchemaPath) {
Preconditions.checkNotNull(schemaBuilder, "schemaBuilder cannot be null.");
Preconditions.checkNotNull(rowTypeBuilder, "rowTypeBuilder cannot be null.");
return tableSchemaPath.size() == 2 && isExternalQuery(tableSchemaPath) ? Optional.of(new ExternalQuery(schemaBuilder, rowTypeBuilder, pluginId)) : Optional.empty();
}
// 外部查询格式约定
static boolean isExternalQuery(List<String> tableSchemaPath) {
return ((String)tableSchemaPath.get(tableSchemaPath.size() - 1)).equalsIgnoreCase("external_query");
}
}
权限处理
此处对于dremio 社区版来说似乎是一个可选的扩展点,我们可以基于此进行权限的管理,默认为true,此扩展是所有存储插件支持的能力
如果查看了DatasetConfig,就会发现DatasetConfig包含创建这,以及一些配置信息,基于此我们是可以做一些简单的权限扩展的,基于用户
以及namespace 还有数据集的配置以及rbac 就可以搞一个简单的访问控制
public boolean hasAccessPermission(String user, NamespaceKey key, DatasetConfig datasetConfig) {
return true;
}
其他方法
包含一些getDatasetHandle,listDatasetHandles,getDatasetMetadata,这些对于处理还是比较重要的,jdbc 的方言类我们是可以自己扩展的
(尤其是需要自己调整查询处理的),目前还包含一个getSourceCapabilities具体功能还需研究下
getRulesFactoryClass 是所有存储扩展都应该包含的,此处属于dremio的一个扩展了,使用了默认的实现,实际上我们可以自己扩展一个
实现不用的功能,默认实现JdbcRulesFactory
此访问可以调整jdbc 的逻辑计划,物理执行计划以及一些sql 规则,很值得研究下
public class JdbcRulesFactory extends StoragePluginTypeRulesFactory {
public JdbcRulesFactory() {
}
public Set<RelOptRule> getRules(OptimizerRulesContext optimizerContext, PlannerPhase phase, SourceType pluginType) {
switch(phase) {
case LOGICAL:
Builder<RelOptRule> logicalBuilder = ImmutableSet.builder();
if (optimizerContext.getPlannerSettings().isRelPlanningEnabled()) {
return logicalBuilder.add(new JdbcScanCrelRule(pluginType)).build();
}
return logicalBuilder.build();
case JDBC_PUSHDOWN:
case POST_SUBSTITUTION:
Builder<RelOptRule> builder = ImmutableSet.builder();
builder.add(new JdbcScanCrelRule(pluginType));
builder.add(JdbcExpansionRule.INSTANCE);
builder.add(JdbcAggregateRule.CALCITE_INSTANCE);
builder.add(JdbcCalcRule.INSTANCE);
builder.add(JdbcFilterRule.CALCITE_INSTANCE);
builder.add(JdbcIntersectRule.INSTANCE);
builder.add(JdbcJoinRule.CALCITE_INSTANCE);
builder.add(JdbcMinusRule.INSTANCE);
builder.add(JdbcProjectRule.CALCITE_INSTANCE);
builder.add(JdbcSampleRule.CALCITE_INSTANCE);
builder.add(JdbcSortRule.CALCITE_INSTANCE);
builder.add(JdbcTableModificationRule.INSTANCE);
builder.add(JdbcUnionRule.CALCITE_INSTANCE);
builder.add(JdbcValuesRule.CALCITE_INSTANCE);
builder.add(JdbcFilterSetOpTransposeRule.INSTANCE);
return builder.build();
case RELATIONAL_PLANNING:
Builder<RelOptRule> jdbcBuilder = ImmutableSet.builder();
jdbcBuilder.add(JdbcExpansionRule.INSTANCE);
jdbcBuilder.add(JdbcAggregateRule.LOGICAL_INSTANCE);
jdbcBuilder.add(JdbcFilterRule.LOGICAL_INSTANCE);
jdbcBuilder.add(JdbcJoinRule.LOGICAL_INSTANCE);
jdbcBuilder.add(JdbcProjectRule.LOGICAL_INSTANCE);
jdbcBuilder.add(JdbcSampleRule.LOGICAL_INSTANCE);
jdbcBuilder.add(JdbcSortRule.LOGICAL_INSTANCE);
jdbcBuilder.add(JdbcUnionRule.LOGICAL_INSTANCE);
jdbcBuilder.add(JdbcValuesRule.LOGICAL_INSTANCE);
jdbcBuilder.add(JdbcFilterSetOpTransposeRule.INSTANCE);
jdbcBuilder.add(JdbcLimitRule.INSTANCE);
jdbcBuilder.add(JdbcSortMergeRule.INSTANCE);
jdbcBuilder.add(JdbcWindowRule.INSTANCE);
return jdbcBuilder.build();
case PHYSICAL:
Builder<RelOptRule> physicalBuilder = ImmutableSet.builder();
physicalBuilder.add(new JdbcPrule(optimizerContext.getFunctionRegistry()));
return physicalBuilder.build();
default:
return ImmutableSet.of();
}
}
}
说明
社区版的JdbcStoragePlugin 实现的了jdbc 的处理,也是我们日常使用dremio 比较多的地方,是很值得学习研究的,好多扩展点都是可以从此处发现的
jdbc 方言以及元数据处理,jdbc 规则对于jdbc 存储扩展是比较重要的东西,后续研究下,目前只是简单的介绍下