Java源码示例:org.apache.flink.streaming.api.transformations.SideOutputTransformation
示例1
/**
* Gets the {@link DataStream} that contains the elements that are emitted from an operation
* into the side output with the given {@link OutputTag}.
*
* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
*/
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
if (wasSplitApplied) {
throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
"As a work-around, please add a no-op map function before the split() call.");
}
sideOutputTag = clean(requireNonNull(sideOutputTag));
// make a defensive copy
sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());
TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
throw new UnsupportedOperationException("A side output with a matching id was " +
"already requested with a different type. This is not allowed, side output " +
"ids need to be unique.");
}
requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());
SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
}
示例2
/**
* Transforms a {@code SideOutputTransformation}.
*
* <p>For this we create a virtual node in the {@code StreamGraph} that holds the side-output
* {@link org.apache.flink.util.OutputTag}.
*
* @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
*/
private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) {
StreamTransformation<?> input = sideOutput.getInput();
Collection<Integer> resultIds = transform(input);
// the recursive transform might have already transformed this
if (alreadyTransformed.containsKey(sideOutput)) {
return alreadyTransformed.get(sideOutput);
}
List<Integer> virtualResultIds = new ArrayList<>();
for (int inputId : resultIds) {
int virtualId = StreamTransformation.getNewNodeId();
streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag());
virtualResultIds.add(virtualId);
}
return virtualResultIds;
}
示例3
/**
* Gets the {@link DataStream} that contains the elements that are emitted from an operation
* into the side output with the given {@link OutputTag}.
*
* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
*/
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
if (wasSplitApplied) {
throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
"As a work-around, please add a no-op map function before the split() call.");
}
sideOutputTag = clean(requireNonNull(sideOutputTag));
// make a defensive copy
sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());
TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
throw new UnsupportedOperationException("A side output with a matching id was " +
"already requested with a different type. This is not allowed, side output " +
"ids need to be unique.");
}
requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());
SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
}
示例4
/**
* Transforms a {@code SideOutputTransformation}.
*
* <p>For this we create a virtual node in the {@code StreamGraph} that holds the side-output
* {@link org.apache.flink.util.OutputTag}.
*
* @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
*/
private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) {
Transformation<?> input = sideOutput.getInput();
Collection<Integer> resultIds = transform(input);
// the recursive transform might have already transformed this
if (alreadyTransformed.containsKey(sideOutput)) {
return alreadyTransformed.get(sideOutput);
}
List<Integer> virtualResultIds = new ArrayList<>();
for (int inputId : resultIds) {
int virtualId = Transformation.getNewNodeId();
streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag());
virtualResultIds.add(virtualId);
}
return virtualResultIds;
}
示例5
/**
* Gets the {@link DataStream} that contains the elements that are emitted from an operation
* into the side output with the given {@link OutputTag}.
*
* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
*/
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
if (wasSplitApplied) {
throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
"As a work-around, please add a no-op map function before the split() call.");
}
sideOutputTag = clean(requireNonNull(sideOutputTag));
// make a defensive copy
sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());
TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
throw new UnsupportedOperationException("A side output with a matching id was " +
"already requested with a different type. This is not allowed, side output " +
"ids need to be unique.");
}
requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());
SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
}
示例6
/**
* Transforms a {@code SideOutputTransformation}.
*
* <p>For this we create a virtual node in the {@code StreamGraph} that holds the side-output
* {@link org.apache.flink.util.OutputTag}.
*
* @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
*/
private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) {
Transformation<?> input = sideOutput.getInput();
Collection<Integer> resultIds = transform(input);
// the recursive transform might have already transformed this
if (alreadyTransformed.containsKey(sideOutput)) {
return alreadyTransformed.get(sideOutput);
}
List<Integer> virtualResultIds = new ArrayList<>();
for (int inputId : resultIds) {
int virtualId = Transformation.getNewNodeId();
streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag());
virtualResultIds.add(virtualId);
}
return virtualResultIds;
}
示例7
private <T> void validateSplitTransformation(StreamTransformation<T> input) {
if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
} else if (input instanceof SideOutputTransformation) {
throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
} else if (input instanceof UnionTransformation) {
for (StreamTransformation<T> transformation : ((UnionTransformation<T>) input).getInputs()) {
validateSplitTransformation(transformation);
}
} else if (input instanceof PartitionTransformation) {
validateSplitTransformation(((PartitionTransformation) input).getInput());
} else {
return;
}
}
示例8
private <T> void validateSplitTransformation(Transformation<T> input) {
if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
} else if (input instanceof SideOutputTransformation) {
throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
} else if (input instanceof UnionTransformation) {
for (Transformation<T> transformation : ((UnionTransformation<T>) input).getInputs()) {
validateSplitTransformation(transformation);
}
} else if (input instanceof PartitionTransformation) {
validateSplitTransformation(((PartitionTransformation) input).getInput());
} else {
return;
}
}
示例9
private <T> void validateSplitTransformation(Transformation<T> input) {
if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
} else if (input instanceof SideOutputTransformation) {
throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
} else if (input instanceof UnionTransformation) {
for (Transformation<T> transformation : ((UnionTransformation<T>) input).getInputs()) {
validateSplitTransformation(transformation);
}
} else if (input instanceof PartitionTransformation) {
validateSplitTransformation(((PartitionTransformation) input).getInput());
} else {
return;
}
}