Java源码示例:org.apache.calcite.rel.core.Exchange
示例1
public RelNode convert(RelOptPlanner planner, RelNode rel,
RelDistribution toDistribution, boolean allowInfiniteCostConverters) {
if (toDistribution == RelDistributions.ANY) {
return rel;
}
// Create a logical sort, then ask the planner to convert its remaining
// traits (e.g. convert it to an EnumerableSortRel if rel is enumerable
// convention)
final Exchange exchange = LogicalExchange.create(rel, toDistribution);
RelNode newRel = planner.register(exchange, rel);
final RelTraitSet newTraitSet = rel.getTraitSet().replace(toDistribution);
if (!newRel.getTraitSet().equals(newTraitSet)) {
newRel = planner.changeTraits(newRel, newTraitSet);
}
return newRel;
}
示例2
public RelNode convert(RelOptPlanner planner, RelNode rel,
RelDistribution toDistribution, boolean allowInfiniteCostConverters) {
if (toDistribution == RelDistributions.ANY) {
return rel;
}
// Create a logical sort, then ask the planner to convert its remaining
// traits (e.g. convert it to an EnumerableSortRel if rel is enumerable
// convention)
final Exchange exchange = LogicalExchange.create(rel, toDistribution);
RelNode newRel = planner.register(exchange, rel);
final RelTraitSet newTraitSet = rel.getTraitSet().replace(toDistribution);
if (!newRel.getTraitSet().equals(newTraitSet)) {
newRel = planner.changeTraits(newRel, newTraitSet);
}
return newRel;
}
示例3
@Test void testNodeTypeCountExchange() {
final RelNode rel = convertSql("select * from emp");
final RelDistribution dist = RelDistributions.hash(ImmutableList.<Integer>of());
final LogicalExchange exchange = LogicalExchange.create(rel, dist);
final Map<Class<? extends RelNode>, Integer> expected = new HashMap<>();
expected.put(TableScan.class, 1);
expected.put(Exchange.class, 1);
expected.put(Project.class, 1);
final RelMetadataQuery mq = rel.getCluster().getMetadataQuery();
final Multimap<Class<? extends RelNode>, RelNode> result = mq.getNodeTypes(exchange);
assertThat(result, notNullValue());
final Map<Class<? extends RelNode>, Integer> resultCount = new HashMap<>();
for (Entry<Class<? extends RelNode>, Collection<RelNode>> e : result.asMap().entrySet()) {
resultCount.put(e.getKey(), e.getValue().size());
}
assertThat(expected, equalTo(resultCount));
}
示例4
public final T traverse(RelNode n) throws Exception {
List<T> inputStreams = new ArrayList<>();
for (RelNode input : n.getInputs()) {
inputStreams.add(traverse(input));
}
if (n instanceof Aggregate) {
return visitAggregate((Aggregate) n, inputStreams);
} else if (n instanceof Calc) {
return visitCalc((Calc) n, inputStreams);
} else if (n instanceof Collect) {
return visitCollect((Collect) n, inputStreams);
} else if (n instanceof Correlate) {
return visitCorrelate((Correlate) n, inputStreams);
} else if (n instanceof Delta) {
return visitDelta((Delta) n, inputStreams);
} else if (n instanceof Exchange) {
return visitExchange((Exchange) n, inputStreams);
} else if (n instanceof Project) {
return visitProject((Project) n, inputStreams);
} else if (n instanceof Filter) {
return visitFilter((Filter) n, inputStreams);
} else if (n instanceof Sample) {
return visitSample((Sample) n, inputStreams);
} else if (n instanceof Sort) {
return visitSort((Sort) n, inputStreams);
} else if (n instanceof TableModify) {
return visitTableModify((TableModify) n, inputStreams);
} else if (n instanceof TableScan) {
return visitTableScan((TableScan) n, inputStreams);
} else if (n instanceof Uncollect) {
return visitUncollect((Uncollect) n, inputStreams);
} else if (n instanceof Window) {
return visitWindow((Window) n, inputStreams);
} else if (n instanceof Join) {
return visitJoin((Join) n, inputStreams);
} else {
return defaultValue(n, inputStreams);
}
}
示例5
@Override public void onMatch(RelOptRuleCall call) {
final Exchange exchange = call.rel(0);
final RelMetadataQuery mq = call.getMetadataQuery();
final RelNode input = exchange.getInput();
final RelOptPredicateList predicates = mq.getPulledUpPredicates(input);
if (predicates == null) {
return;
}
final Set<Integer> constants = new HashSet<>();
predicates.constantMap.keySet().forEach(key -> {
if (key instanceof RexInputRef) {
constants.add(((RexInputRef) key).getIndex());
}
});
if (constants.isEmpty()) {
return;
}
final List<Integer> distributionKeys = simplifyDistributionKeys(
exchange.getDistribution(), constants);
if (distributionKeys.size() != exchange.getDistribution().getKeys()
.size()) {
call.transformTo(call.builder()
.push(exchange.getInput())
.exchange(distributionKeys.isEmpty()
? RelDistributions.SINGLETON
: RelDistributions.hash(distributionKeys))
.build());
call.getPlanner().prune(exchange);
}
}
示例6
public TrimResult trimFields(
Exchange exchange,
ImmutableBitSet fieldsUsed,
Set<RelDataTypeField> extraFields) {
final RelDataType rowType = exchange.getRowType();
final int fieldCount = rowType.getFieldCount();
final RelDistribution distribution = exchange.getDistribution();
final RelNode input = exchange.getInput();
// We use the fields used by the consumer, plus any fields used as exchange
// keys.
final ImmutableBitSet.Builder inputFieldsUsed = fieldsUsed.rebuild();
for (int keyIndex : distribution.getKeys()) {
inputFieldsUsed.set(keyIndex);
}
// Create input with trimmed columns.
final Set<RelDataTypeField> inputExtraFields = Collections.emptySet();
final TrimResult trimResult =
trimChild(exchange, input, inputFieldsUsed.build(), inputExtraFields);
final RelNode newInput = trimResult.left;
final Mapping inputMapping = trimResult.right;
// If the input is unchanged, and we need to project all columns,
// there's nothing we can do.
if (newInput == input
&& inputMapping.isIdentity()
&& fieldsUsed.cardinality() == fieldCount) {
return result(exchange, Mappings.createIdentity(fieldCount));
}
relBuilder.push(newInput);
final RelDistribution newDistribution = distribution.apply(inputMapping);
relBuilder.exchange(newDistribution);
return result(relBuilder.build(), inputMapping);
}
示例7
@Override public Exchange copy(RelTraitSet traitSet, RelNode newInput,
RelDistribution newDistribution) {
return new LogicalExchange(getCluster(), traitSet, newInput,
newDistribution);
}
示例8
public Set<RelColumnOrigin> getColumnOrigins(Exchange rel,
RelMetadataQuery mq, int iOutputColumn) {
return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}
示例9
/**
* Table references from Exchange.
*/
public Set<RelTableRef> getTableReferences(Exchange rel, RelMetadataQuery mq) {
return mq.getTableReferences(rel.getInput());
}
示例10
/**
* Expression lineage from Exchange.
*/
public Set<RexNode> getExpressionLineage(Exchange rel,
RelMetadataQuery mq, RexNode outputExpression) {
return mq.getExpressionLineage(rel.getInput(), outputExpression);
}
示例11
public Double getDistinctRowCount(Exchange rel, RelMetadataQuery mq,
ImmutableBitSet groupKey, RexNode predicate) {
return mq.getDistinctRowCount(rel.getInput(), groupKey, predicate);
}
示例12
public List<Double> averageColumnSizes(Exchange rel, RelMetadataQuery mq) {
return mq.getAverageColumnSizes(rel.getInput());
}
示例13
public RelDistribution distribution(Exchange exchange, RelMetadataQuery mq) {
return exchange(exchange.distribution);
}
示例14
public Double getPopulationSize(Exchange rel, RelMetadataQuery mq,
ImmutableBitSet groupKey) {
return mq.getPopulationSize(rel.getInput(), groupKey);
}
示例15
public Boolean areColumnsUnique(Exchange rel, RelMetadataQuery mq,
ImmutableBitSet columns, boolean ignoreNulls) {
return mq.areColumnsUnique(rel.getInput(), columns, ignoreNulls);
}
示例16
public Boolean isPhaseTransition(Exchange rel, RelMetadataQuery mq) {
return true;
}
示例17
/**
* Extract predicates for an Exchange.
*/
public RelOptPredicateList getAllPredicates(Exchange exchange,
RelMetadataQuery mq) {
return mq.getAllPredicates(exchange.getInput());
}
示例18
/**
* Infers predicates for an Exchange.
*/
public RelOptPredicateList getPredicates(Exchange exchange,
RelMetadataQuery mq) {
RelNode input = exchange.getInput();
return mq.getPulledUpPredicates(input);
}
示例19
public T visitExchange(Exchange exchange, List<T> inputStreams) throws Exception {
return defaultValue(exchange, inputStreams);
}
示例20
@Override public Exchange copy(RelTraitSet traitSet, RelNode newInput,
RelDistribution newDistribution) {
return new LogicalExchange(getCluster(), traitSet, newInput,
newDistribution);
}
示例21
public Set<RelColumnOrigin> getColumnOrigins(Exchange rel,
RelMetadataQuery mq, int iOutputColumn) {
return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}
示例22
/**
* Table references from Exchange.
*/
public Set<RelTableRef> getTableReferences(Exchange rel, RelMetadataQuery mq) {
return mq.getTableReferences(rel.getInput());
}
示例23
public Multimap<Class<? extends RelNode>, RelNode> getNodeTypes(Exchange rel,
RelMetadataQuery mq) {
return getNodeTypes(rel, Exchange.class, mq);
}
示例24
/**
* Expression lineage from Exchange.
*/
public Set<RexNode> getExpressionLineage(Exchange rel,
RelMetadataQuery mq, RexNode outputExpression) {
return mq.getExpressionLineage(rel.getInput(), outputExpression);
}
示例25
public Double getDistinctRowCount(Exchange rel, RelMetadataQuery mq,
ImmutableBitSet groupKey, RexNode predicate) {
return mq.getDistinctRowCount(rel.getInput(), groupKey, predicate);
}
示例26
public List<Double> averageColumnSizes(Exchange rel, RelMetadataQuery mq) {
return mq.getAverageColumnSizes(rel.getInput());
}
示例27
public RelDistribution distribution(Exchange exchange, RelMetadataQuery mq) {
return exchange(exchange.distribution);
}
示例28
public Double getPopulationSize(Exchange rel, RelMetadataQuery mq,
ImmutableBitSet groupKey) {
return mq.getPopulationSize(rel.getInput(), groupKey);
}
示例29
public Double getMaxRowCount(Exchange rel, RelMetadataQuery mq) {
return mq.getMaxRowCount(rel.getInput());
}
示例30
public Double getMinRowCount(Exchange rel, RelMetadataQuery mq) {
return mq.getMinRowCount(rel.getInput());
}