Java源码示例:org.apache.flink.streaming.api.transformations.FeedbackTransformation

示例1
protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
	super(dataStream.getExecutionEnvironment(),
			new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
	this.originalInput = dataStream;
	this.maxWaitTime = maxWaitTime;
	setBufferTimeout(dataStream.environment.getBufferTimeout());
}
 
示例2
protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
	super(dataStream.getExecutionEnvironment(),
			new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
	this.originalInput = dataStream;
	this.maxWaitTime = maxWaitTime;
	setBufferTimeout(dataStream.environment.getBufferTimeout());
}
 
示例3
protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
	super(dataStream.getExecutionEnvironment(),
			new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
	this.originalInput = dataStream;
	this.maxWaitTime = maxWaitTime;
	setBufferTimeout(dataStream.environment.getBufferTimeout());
}
 
示例4
/**
 * Transforms a {@code FeedbackTransformation}.
 *
 * <p>This will recursively transform the input and the feedback edges. We return the
 * concatenation of the input IDs and the feedback IDs so that downstream operations can be
 * wired to both.
 *
 * <p>This is responsible for creating the IterationSource and IterationSink which are used to
 * feed back the elements.
 */
private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {

	if (iterate.getFeedbackEdges().size() <= 0) {
		throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
	}

	StreamTransformation<T> input = iterate.getInput();
	List<Integer> resultIds = new ArrayList<>();

	// first transform the input stream(s) and store the result IDs
	Collection<Integer> inputIds = transform(input);
	resultIds.addAll(inputIds);

	// the recursive transform might have already transformed this
	if (alreadyTransformed.containsKey(iterate)) {
		return alreadyTransformed.get(iterate);
	}

	// create the fake iteration source/sink pair
	Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
		iterate.getId(),
		getNewIterationNodeId(),
		getNewIterationNodeId(),
		iterate.getWaitTime(),
		iterate.getParallelism(),
		iterate.getMaxParallelism(),
		iterate.getMinResources(),
		iterate.getPreferredResources());

	StreamNode itSource = itSourceAndSink.f0;
	StreamNode itSink = itSourceAndSink.f1;

	// We set the proper serializers for the sink/source
	streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(env.getConfig()));
	streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(env.getConfig()), null, null);

	// also add the feedback source ID to the result IDs, so that downstream operators will
	// add both as input
	resultIds.add(itSource.getId());

	// at the iterate to the already-seen-set with the result IDs, so that we can transform
	// the feedback edges and let them stop when encountering the iterate node
	alreadyTransformed.put(iterate, resultIds);

	// so that we can determine the slot sharing group from all feedback edges
	List<Integer> allFeedbackIds = new ArrayList<>();

	for (StreamTransformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
		Collection<Integer> feedbackIds = transform(feedbackEdge);
		allFeedbackIds.addAll(feedbackIds);
		for (Integer feedbackId: feedbackIds) {
			streamGraph.addEdge(feedbackId,
					itSink.getId(),
					0
			);
		}
	}

	String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);

	itSink.setSlotSharingGroup(slotSharingGroup);
	itSource.setSlotSharingGroup(slotSharingGroup);

	return resultIds;
}
 
示例5
/**
 * Transforms a {@code FeedbackTransformation}.
 *
 * <p>This will recursively transform the input and the feedback edges. We return the
 * concatenation of the input IDs and the feedback IDs so that downstream operations can be
 * wired to both.
 *
 * <p>This is responsible for creating the IterationSource and IterationSink which are used to
 * feed back the elements.
 */
private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {

	if (iterate.getFeedbackEdges().size() <= 0) {
		throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
	}

	Transformation<T> input = iterate.getInput();
	List<Integer> resultIds = new ArrayList<>();

	// first transform the input stream(s) and store the result IDs
	Collection<Integer> inputIds = transform(input);
	resultIds.addAll(inputIds);

	// the recursive transform might have already transformed this
	if (alreadyTransformed.containsKey(iterate)) {
		return alreadyTransformed.get(iterate);
	}

	// create the fake iteration source/sink pair
	Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
		iterate.getId(),
		getNewIterationNodeId(),
		getNewIterationNodeId(),
		iterate.getWaitTime(),
		iterate.getParallelism(),
		iterate.getMaxParallelism(),
		iterate.getMinResources(),
		iterate.getPreferredResources());

	StreamNode itSource = itSourceAndSink.f0;
	StreamNode itSink = itSourceAndSink.f1;

	// We set the proper serializers for the sink/source
	streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(executionConfig));
	streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(executionConfig), null, null);

	// also add the feedback source ID to the result IDs, so that downstream operators will
	// add both as input
	resultIds.add(itSource.getId());

	// at the iterate to the already-seen-set with the result IDs, so that we can transform
	// the feedback edges and let them stop when encountering the iterate node
	alreadyTransformed.put(iterate, resultIds);

	// so that we can determine the slot sharing group from all feedback edges
	List<Integer> allFeedbackIds = new ArrayList<>();

	for (Transformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
		Collection<Integer> feedbackIds = transform(feedbackEdge);
		allFeedbackIds.addAll(feedbackIds);
		for (Integer feedbackId: feedbackIds) {
			streamGraph.addEdge(feedbackId,
					itSink.getId(),
					0
			);
		}
	}

	String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
	// slot sharing group of iteration node must exist
	if (slotSharingGroup == null) {
		slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
	}

	itSink.setSlotSharingGroup(slotSharingGroup);
	itSource.setSlotSharingGroup(slotSharingGroup);

	return resultIds;
}
 
示例6
/**
 * Transforms a {@code FeedbackTransformation}.
 *
 * <p>This will recursively transform the input and the feedback edges. We return the
 * concatenation of the input IDs and the feedback IDs so that downstream operations can be
 * wired to both.
 *
 * <p>This is responsible for creating the IterationSource and IterationSink which are used to
 * feed back the elements.
 */
private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {

	if (iterate.getFeedbackEdges().size() <= 0) {
		throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
	}

	Transformation<T> input = iterate.getInput();
	List<Integer> resultIds = new ArrayList<>();

	// first transform the input stream(s) and store the result IDs
	Collection<Integer> inputIds = transform(input);
	resultIds.addAll(inputIds);

	// the recursive transform might have already transformed this
	if (alreadyTransformed.containsKey(iterate)) {
		return alreadyTransformed.get(iterate);
	}

	// create the fake iteration source/sink pair
	Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
		iterate.getId(),
		getNewIterationNodeId(),
		getNewIterationNodeId(),
		iterate.getWaitTime(),
		iterate.getParallelism(),
		iterate.getMaxParallelism(),
		iterate.getMinResources(),
		iterate.getPreferredResources());

	StreamNode itSource = itSourceAndSink.f0;
	StreamNode itSink = itSourceAndSink.f1;

	// We set the proper serializers for the sink/source
	streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(executionConfig));
	streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(executionConfig), null, null);

	// also add the feedback source ID to the result IDs, so that downstream operators will
	// add both as input
	resultIds.add(itSource.getId());

	// at the iterate to the already-seen-set with the result IDs, so that we can transform
	// the feedback edges and let them stop when encountering the iterate node
	alreadyTransformed.put(iterate, resultIds);

	// so that we can determine the slot sharing group from all feedback edges
	List<Integer> allFeedbackIds = new ArrayList<>();

	for (Transformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
		Collection<Integer> feedbackIds = transform(feedbackEdge);
		allFeedbackIds.addAll(feedbackIds);
		for (Integer feedbackId: feedbackIds) {
			streamGraph.addEdge(feedbackId,
					itSink.getId(),
					0
			);
		}
	}

	String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
	// slot sharing group of iteration node must exist
	if (slotSharingGroup == null) {
		slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
	}

	itSink.setSlotSharingGroup(slotSharingGroup);
	itSource.setSlotSharingGroup(slotSharingGroup);

	return resultIds;
}
 
示例7
/**
 * Closes the iteration. This method defines the end of the iterative
 * program part that will be fed back to the start of the iteration.
 *
 * <p>A common usage pattern for streaming iterations is to use output
 * splitting to send a part of the closing data stream to the head. Refer to
 * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
 * for more information.
 *
 * @param feedbackStream
 *            {@link DataStream} that will be used as input to the iteration
 *            head.
 *
 * @return The feedback stream.
 *
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
public DataStream<T> closeWith(DataStream<T> feedbackStream) {

	Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();

	if (!predecessors.contains(this.transformation)) {
		throw new UnsupportedOperationException(
				"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
	}

	((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());

	return feedbackStream;
}
 
示例8
/**
 * Closes the iteration. This method defines the end of the iterative
 * program part that will be fed back to the start of the iteration.
 *
 * <p>A common usage pattern for streaming iterations is to use output
 * splitting to send a part of the closing data stream to the head. Refer to
 * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
 * for more information.
 *
 * @param feedbackStream
 *            {@link DataStream} that will be used as input to the iteration
 *            head.
 *
 * @return The feedback stream.
 *
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
public DataStream<T> closeWith(DataStream<T> feedbackStream) {

	Collection<Transformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();

	if (!predecessors.contains(this.transformation)) {
		throw new UnsupportedOperationException(
				"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
	}

	((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());

	return feedbackStream;
}
 
示例9
/**
 * Closes the iteration. This method defines the end of the iterative
 * program part that will be fed back to the start of the iteration.
 *
 * <p>A common usage pattern for streaming iterations is to use output
 * splitting to send a part of the closing data stream to the head. Refer to
 * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
 * for more information.
 *
 * @param feedbackStream
 *            {@link DataStream} that will be used as input to the iteration
 *            head.
 *
 * @return The feedback stream.
 *
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
public DataStream<T> closeWith(DataStream<T> feedbackStream) {

	Collection<Transformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();

	if (!predecessors.contains(this.transformation)) {
		throw new UnsupportedOperationException(
				"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
	}

	((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());

	return feedbackStream;
}