Java源码示例:org.apache.flink.streaming.api.transformations.FeedbackTransformation
示例1
protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
super(dataStream.getExecutionEnvironment(),
new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
this.originalInput = dataStream;
this.maxWaitTime = maxWaitTime;
setBufferTimeout(dataStream.environment.getBufferTimeout());
}
示例2
protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
super(dataStream.getExecutionEnvironment(),
new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
this.originalInput = dataStream;
this.maxWaitTime = maxWaitTime;
setBufferTimeout(dataStream.environment.getBufferTimeout());
}
示例3
protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
super(dataStream.getExecutionEnvironment(),
new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
this.originalInput = dataStream;
this.maxWaitTime = maxWaitTime;
setBufferTimeout(dataStream.environment.getBufferTimeout());
}
示例4
/**
* Transforms a {@code FeedbackTransformation}.
*
* <p>This will recursively transform the input and the feedback edges. We return the
* concatenation of the input IDs and the feedback IDs so that downstream operations can be
* wired to both.
*
* <p>This is responsible for creating the IterationSource and IterationSink which are used to
* feed back the elements.
*/
private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {
if (iterate.getFeedbackEdges().size() <= 0) {
throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
}
StreamTransformation<T> input = iterate.getInput();
List<Integer> resultIds = new ArrayList<>();
// first transform the input stream(s) and store the result IDs
Collection<Integer> inputIds = transform(input);
resultIds.addAll(inputIds);
// the recursive transform might have already transformed this
if (alreadyTransformed.containsKey(iterate)) {
return alreadyTransformed.get(iterate);
}
// create the fake iteration source/sink pair
Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
iterate.getId(),
getNewIterationNodeId(),
getNewIterationNodeId(),
iterate.getWaitTime(),
iterate.getParallelism(),
iterate.getMaxParallelism(),
iterate.getMinResources(),
iterate.getPreferredResources());
StreamNode itSource = itSourceAndSink.f0;
StreamNode itSink = itSourceAndSink.f1;
// We set the proper serializers for the sink/source
streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(env.getConfig()));
streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(env.getConfig()), null, null);
// also add the feedback source ID to the result IDs, so that downstream operators will
// add both as input
resultIds.add(itSource.getId());
// at the iterate to the already-seen-set with the result IDs, so that we can transform
// the feedback edges and let them stop when encountering the iterate node
alreadyTransformed.put(iterate, resultIds);
// so that we can determine the slot sharing group from all feedback edges
List<Integer> allFeedbackIds = new ArrayList<>();
for (StreamTransformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
Collection<Integer> feedbackIds = transform(feedbackEdge);
allFeedbackIds.addAll(feedbackIds);
for (Integer feedbackId: feedbackIds) {
streamGraph.addEdge(feedbackId,
itSink.getId(),
0
);
}
}
String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
itSink.setSlotSharingGroup(slotSharingGroup);
itSource.setSlotSharingGroup(slotSharingGroup);
return resultIds;
}
示例5
/**
* Transforms a {@code FeedbackTransformation}.
*
* <p>This will recursively transform the input and the feedback edges. We return the
* concatenation of the input IDs and the feedback IDs so that downstream operations can be
* wired to both.
*
* <p>This is responsible for creating the IterationSource and IterationSink which are used to
* feed back the elements.
*/
private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {
if (iterate.getFeedbackEdges().size() <= 0) {
throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
}
Transformation<T> input = iterate.getInput();
List<Integer> resultIds = new ArrayList<>();
// first transform the input stream(s) and store the result IDs
Collection<Integer> inputIds = transform(input);
resultIds.addAll(inputIds);
// the recursive transform might have already transformed this
if (alreadyTransformed.containsKey(iterate)) {
return alreadyTransformed.get(iterate);
}
// create the fake iteration source/sink pair
Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
iterate.getId(),
getNewIterationNodeId(),
getNewIterationNodeId(),
iterate.getWaitTime(),
iterate.getParallelism(),
iterate.getMaxParallelism(),
iterate.getMinResources(),
iterate.getPreferredResources());
StreamNode itSource = itSourceAndSink.f0;
StreamNode itSink = itSourceAndSink.f1;
// We set the proper serializers for the sink/source
streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(executionConfig));
streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(executionConfig), null, null);
// also add the feedback source ID to the result IDs, so that downstream operators will
// add both as input
resultIds.add(itSource.getId());
// at the iterate to the already-seen-set with the result IDs, so that we can transform
// the feedback edges and let them stop when encountering the iterate node
alreadyTransformed.put(iterate, resultIds);
// so that we can determine the slot sharing group from all feedback edges
List<Integer> allFeedbackIds = new ArrayList<>();
for (Transformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
Collection<Integer> feedbackIds = transform(feedbackEdge);
allFeedbackIds.addAll(feedbackIds);
for (Integer feedbackId: feedbackIds) {
streamGraph.addEdge(feedbackId,
itSink.getId(),
0
);
}
}
String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
// slot sharing group of iteration node must exist
if (slotSharingGroup == null) {
slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
}
itSink.setSlotSharingGroup(slotSharingGroup);
itSource.setSlotSharingGroup(slotSharingGroup);
return resultIds;
}
示例6
/**
* Transforms a {@code FeedbackTransformation}.
*
* <p>This will recursively transform the input and the feedback edges. We return the
* concatenation of the input IDs and the feedback IDs so that downstream operations can be
* wired to both.
*
* <p>This is responsible for creating the IterationSource and IterationSink which are used to
* feed back the elements.
*/
private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {
if (iterate.getFeedbackEdges().size() <= 0) {
throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
}
Transformation<T> input = iterate.getInput();
List<Integer> resultIds = new ArrayList<>();
// first transform the input stream(s) and store the result IDs
Collection<Integer> inputIds = transform(input);
resultIds.addAll(inputIds);
// the recursive transform might have already transformed this
if (alreadyTransformed.containsKey(iterate)) {
return alreadyTransformed.get(iterate);
}
// create the fake iteration source/sink pair
Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
iterate.getId(),
getNewIterationNodeId(),
getNewIterationNodeId(),
iterate.getWaitTime(),
iterate.getParallelism(),
iterate.getMaxParallelism(),
iterate.getMinResources(),
iterate.getPreferredResources());
StreamNode itSource = itSourceAndSink.f0;
StreamNode itSink = itSourceAndSink.f1;
// We set the proper serializers for the sink/source
streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(executionConfig));
streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(executionConfig), null, null);
// also add the feedback source ID to the result IDs, so that downstream operators will
// add both as input
resultIds.add(itSource.getId());
// at the iterate to the already-seen-set with the result IDs, so that we can transform
// the feedback edges and let them stop when encountering the iterate node
alreadyTransformed.put(iterate, resultIds);
// so that we can determine the slot sharing group from all feedback edges
List<Integer> allFeedbackIds = new ArrayList<>();
for (Transformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
Collection<Integer> feedbackIds = transform(feedbackEdge);
allFeedbackIds.addAll(feedbackIds);
for (Integer feedbackId: feedbackIds) {
streamGraph.addEdge(feedbackId,
itSink.getId(),
0
);
}
}
String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
// slot sharing group of iteration node must exist
if (slotSharingGroup == null) {
slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
}
itSink.setSlotSharingGroup(slotSharingGroup);
itSource.setSlotSharingGroup(slotSharingGroup);
return resultIds;
}
示例7
/**
* Closes the iteration. This method defines the end of the iterative
* program part that will be fed back to the start of the iteration.
*
* <p>A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
* {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
* for more information.
*
* @param feedbackStream
* {@link DataStream} that will be used as input to the iteration
* head.
*
* @return The feedback stream.
*
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public DataStream<T> closeWith(DataStream<T> feedbackStream) {
Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
if (!predecessors.contains(this.transformation)) {
throw new UnsupportedOperationException(
"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
}
((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());
return feedbackStream;
}
示例8
/**
* Closes the iteration. This method defines the end of the iterative
* program part that will be fed back to the start of the iteration.
*
* <p>A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
* {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
* for more information.
*
* @param feedbackStream
* {@link DataStream} that will be used as input to the iteration
* head.
*
* @return The feedback stream.
*
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public DataStream<T> closeWith(DataStream<T> feedbackStream) {
Collection<Transformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
if (!predecessors.contains(this.transformation)) {
throw new UnsupportedOperationException(
"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
}
((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());
return feedbackStream;
}
示例9
/**
* Closes the iteration. This method defines the end of the iterative
* program part that will be fed back to the start of the iteration.
*
* <p>A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
* {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
* for more information.
*
* @param feedbackStream
* {@link DataStream} that will be used as input to the iteration
* head.
*
* @return The feedback stream.
*
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public DataStream<T> closeWith(DataStream<T> feedbackStream) {
Collection<Transformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
if (!predecessors.contains(this.transformation)) {
throw new UnsupportedOperationException(
"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
}
((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());
return feedbackStream;
}