Java源码示例:org.apache.flink.table.factories.TableSourceFactory
示例1
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
TableSource<?> tableSource;
Optional<TableFactory> tableFactory = catalog.getTableFactory();
if (tableFactory.isPresent()) {
TableFactory tf = tableFactory.get();
if (tf instanceof TableSourceFactory) {
tableSource = ((TableSourceFactory) tf).createTableSource(tablePath, table);
} else {
throw new TableException(String.format("Cannot query a sink-only table. TableFactory provided by catalog %s must implement TableSourceFactory",
catalog.getClass()));
}
} else {
tableSource = TableFactoryUtil.findAndCreateTableSource(table);
}
if (!(tableSource instanceof StreamTableSource)) {
throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
}
return new TableSourceTable<>(
tableSource,
!((StreamTableSource<?>) tableSource).isBounded(),
FlinkStatistic.UNKNOWN()
);
}
示例2
@Override
public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
CatalogTable table = checkNotNull(context.getTable());
Preconditions.checkArgument(table instanceof CatalogTableImpl);
boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
if (!isGeneric) {
return new HiveTableSource(
new JobConf(hiveConf),
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
table);
} else {
return TableFactoryUtil.findAndCreateTableSource(context);
}
}
示例3
private Optional<TableSource<?>> findAndCreateTableSource() {
Optional<TableSource<?>> tableSource = Optional.empty();
try {
if (lookupResult.getTable() instanceof CatalogTable) {
// Use an empty config for TableSourceFactoryContextImpl since we can't fetch the
// actual TableConfig here. And currently the empty config do not affect the logic.
ReadableConfig config = new Configuration();
TableSourceFactory.Context context =
new TableSourceFactoryContextImpl(tableIdentifier, (CatalogTable) lookupResult.getTable(), config);
TableSource<?> source = TableFactoryUtil.findAndCreateTableSource(context);
if (source instanceof StreamTableSource) {
if (!isStreamingMode && !((StreamTableSource<?>) source).isBounded()) {
throw new ValidationException("Cannot query on an unbounded source in batch mode, but " +
tableIdentifier.asSummaryString() + " is unbounded.");
}
tableSource = Optional.of(source);
} else {
throw new ValidationException("Catalog tables only support " +
"StreamTableSource and InputFormatTableSource.");
}
}
} catch (Exception e) {
tableSource = Optional.empty();
}
return tableSource;
}
示例4
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
TableSource<?> tableSource;
Optional<TableFactory> tableFactory = catalog.getTableFactory();
if (tableFactory.isPresent()) {
TableFactory tf = tableFactory.get();
if (tf instanceof TableSourceFactory) {
tableSource = ((TableSourceFactory) tf).createTableSource(tablePath, table);
} else {
throw new TableException(String.format("Cannot query a sink-only table. TableFactory provided by catalog %s must implement TableSourceFactory",
catalog.getClass()));
}
} else {
tableSource = TableFactoryUtil.findAndCreateTableSource(table);
}
if (!(tableSource instanceof StreamTableSource)) {
throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
}
return new TableSourceTable<>(
tableSource,
// this means the TableSource extends from StreamTableSource, this is needed for the
// legacy Planner. Blink Planner should use the information that comes from the TableSource
// itself to determine if it is a streaming or batch source.
isStreamingMode,
FlinkStatistic.UNKNOWN()
);
}
示例5
private void testSourceConfig(boolean fallbackMR, boolean inferParallelism) throws Exception {
HiveTableFactory tableFactorySpy = spy((HiveTableFactory) hiveCatalog.getTableFactory().get());
doAnswer(invocation -> {
TableSourceFactory.Context context = invocation.getArgument(0);
return new TestConfigSource(
new JobConf(hiveCatalog.getHiveConf()),
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
context.getTable(),
fallbackMR,
inferParallelism);
}).when(tableFactorySpy).createTableSource(any(TableSourceFactory.Context.class));
HiveCatalog catalogSpy = spy(hiveCatalog);
doReturn(Optional.of(tableFactorySpy)).when(catalogSpy).getTableFactory();
TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
tableEnv.getConfig().getConfiguration().setBoolean(
HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, fallbackMR);
tableEnv.getConfig().getConfiguration().setBoolean(
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, inferParallelism);
tableEnv.getConfig().getConfiguration().setInteger(
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
tableEnv.registerCatalog(catalogSpy.getName(), catalogSpy);
tableEnv.useCatalog(catalogSpy.getName());
List<Row> results = Lists.newArrayList(
tableEnv.sqlQuery("select * from db1.src order by x").execute().collect());
assertEquals("[1,a, 2,b]", results.toString());
}
示例6
@Override
public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
Configuration conf = new Configuration();
context.getTable().getOptions().forEach(conf::setString);
return new FileSystemTableSource(
context.getTable().getSchema(),
getPath(conf),
context.getTable().getPartitionKeys(),
conf.get(PARTITION_DEFAULT_NAME),
context.getTable().getProperties());
}
示例7
private Table convertCatalogTable(
ObjectIdentifier identifier,
CatalogTable table,
TableSchema resolvedSchema,
@Nullable TableFactory tableFactory) {
final TableSource<?> tableSource;
final TableSourceFactory.Context context = new TableSourceFactoryContextImpl(
identifier, table, tableConfig.getConfiguration());
if (tableFactory != null) {
if (tableFactory instanceof TableSourceFactory) {
tableSource = ((TableSourceFactory<?>) tableFactory).createTableSource(context);
} else {
throw new TableException(
"Cannot query a sink-only table. TableFactory provided by catalog must implement TableSourceFactory");
}
} else {
tableSource = TableFactoryUtil.findAndCreateTableSource(context);
}
if (!(tableSource instanceof StreamTableSource)) {
throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
}
return new TableSourceTable<>(
resolvedSchema,
tableSource,
// this means the TableSource extends from StreamTableSource, this is needed for the
// legacy Planner. Blink Planner should use the information that comes from the TableSource
// itself to determine if it is a streaming or batch source.
isStreamingMode,
FlinkStatistic.UNKNOWN()
);
}
示例8
@Override
public StreamTableSource<Row> createTableSource(TableSourceFactory.Context context) {
TableSchema schema = context.getTable().getSchema();
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(context.getTable().toProperties());
final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
return new TestTableSource(
schema,
context.getTable().getProperties().get(testProperty),
proctime.orElse(null),
rowtime);
}