Java源码示例:org.apache.flink.streaming.api.windowing.windows.GlobalWindow

示例1
static <T> WindowAssigner<T, GlobalWindow> mockGlobalWindowAssigner() throws Exception {
	@SuppressWarnings("unchecked")
	WindowAssigner<T, GlobalWindow> mockAssigner = mock(WindowAssigner.class);

	when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new GlobalWindow.Serializer());
	when(mockAssigner.isEventTime()).thenReturn(true);
	when(mockAssigner.assignWindows(Mockito.<T>any(), anyLong(), anyAssignerContext())).thenReturn(Collections.singletonList(GlobalWindow.get()));

	return mockAssigner;
}
 
示例2
private void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {

		WindowAssigner<Integer, GlobalWindow> mockAssigner = mockGlobalWindowAssigner();
		timeAdaptor.setIsEventTime(mockAssigner);
		Trigger<Integer, GlobalWindow> mockTrigger = mockTrigger();
		InternalWindowFunction<Iterable<Integer>, Void, Integer, GlobalWindow> mockWindowFunction = mockWindowFunction();

		// this needs to be true for the test to succeed
		assertEquals(Long.MAX_VALUE, GlobalWindow.get().maxTimestamp());

		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);

		testHarness.open();

		assertEquals(0, testHarness.getOutput().size());
		assertEquals(0, testHarness.numKeyedStateEntries());

		testHarness.processElement(new StreamRecord<>(0, 0L));

		// just the window contents
		assertEquals(1, testHarness.numKeyedStateEntries());

		// verify we have no timers for either time domain
		assertEquals(0, testHarness.numEventTimeTimers());
		assertEquals(0, testHarness.numProcessingTimeTimers());
	}
 
示例3
@Test
public void testWindowAssignment() {
	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	GlobalWindows assigner = GlobalWindows.create();

	assertThat(assigner.assignWindows("String", 0L, mockContext), contains(GlobalWindow.get()));
	assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(GlobalWindow.get()));
	assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(GlobalWindow.get()));
}
 
示例4
static <T> WindowAssigner<T, GlobalWindow> mockGlobalWindowAssigner() throws Exception {
	@SuppressWarnings("unchecked")
	WindowAssigner<T, GlobalWindow> mockAssigner = mock(WindowAssigner.class);

	when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new GlobalWindow.Serializer());
	when(mockAssigner.isEventTime()).thenReturn(true);
	when(mockAssigner.assignWindows(Mockito.<T>any(), anyLong(), anyAssignerContext())).thenReturn(Collections.singletonList(GlobalWindow.get()));

	return mockAssigner;
}
 
示例5
@Test
public void testWindowAssignment() {
	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	GlobalWindows assigner = GlobalWindows.create();

	assertThat(assigner.assignWindows("String", 0L, mockContext), contains(GlobalWindow.get()));
	assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(GlobalWindow.get()));
	assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(GlobalWindow.get()));
}
 
示例6
private void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {

		WindowAssigner<Integer, GlobalWindow> mockAssigner = mockGlobalWindowAssigner();
		timeAdaptor.setIsEventTime(mockAssigner);
		Trigger<Integer, GlobalWindow> mockTrigger = mockTrigger();
		InternalWindowFunction<Iterable<Integer>, Void, Integer, GlobalWindow> mockWindowFunction = mockWindowFunction();

		// this needs to be true for the test to succeed
		assertEquals(Long.MAX_VALUE, GlobalWindow.get().maxTimestamp());

		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);

		testHarness.open();

		assertEquals(0, testHarness.getOutput().size());
		assertEquals(0, testHarness.numKeyedStateEntries());

		testHarness.processElement(new StreamRecord<>(0, 0L));

		// just the window contents
		assertEquals(1, testHarness.numKeyedStateEntries());

		// verify we have no timers for either time domain
		assertEquals(0, testHarness.numEventTimeTimers());
		assertEquals(0, testHarness.numProcessingTimeTimers());
	}
 
示例7
@Test
public void testWindowAssignment() {
	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	GlobalWindows assigner = GlobalWindows.create();

	assertThat(assigner.assignWindows("String", 0L, mockContext), contains(GlobalWindow.get()));
	assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(GlobalWindow.get()));
	assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(GlobalWindow.get()));
}
 
示例8
@Test
public void testProperties() {
	GlobalWindows assigner = GlobalWindows.create();

	assertFalse(assigner.isEventTime());
	assertEquals(new GlobalWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
	assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(GlobalWindows.NeverTrigger.class));
}
 
示例9
@Override
public TriggerResult onElement(ConnectedCarEvent event, long timestamp, GlobalWindow window, TriggerContext context) throws Exception {

	// if this is a stop event, set a timer
	if (event.speed == 0.0) {
		context.registerEventTimeTimer(event.timestamp);
	}

	return TriggerResult.CONTINUE;
}
 
示例10
@Override
public void evictAfter(Iterable<TimestampedValue<ConnectedCarEvent>> elements, int size, GlobalWindow window, EvictorContext ctx) {
	long firstStop = ConnectedCarEvent.earliestStopElement(elements);

	// remove all events up to (and including) the first stop event (which is the event that triggered the window)
	for (Iterator<TimestampedValue<ConnectedCarEvent>> iterator = elements.iterator(); iterator.hasNext(); ) {
		TimestampedValue<ConnectedCarEvent> element = iterator.next();
		if (element.getTimestamp() <= firstStop) {
			iterator.remove();
		}
	}
}
 
示例11
@Override
public void apply(Tuple key, GlobalWindow window, Iterable<ConnectedCarEvent> events, Collector<StoppedSegment> out) {
	StoppedSegment seg = new StoppedSegment(events);
	if (seg.length > 0) {
		out.collect(seg);
	}
}
 
示例12
@Test
public void testProperties() {
	GlobalWindows assigner = GlobalWindows.create();

	assertFalse(assigner.isEventTime());
	assertEquals(new GlobalWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
	assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(GlobalWindows.NeverTrigger.class));
}
 
示例13
@Override
public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
	return Collections.singletonList(GlobalWindow.get());
}
 
示例14
@Override
public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
	return new NeverTrigger();
}
 
示例15
@Override
public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
	return TriggerResult.CONTINUE;
}
 
示例16
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
	return TriggerResult.CONTINUE;
}
 
示例17
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
	return TriggerResult.CONTINUE;
}
 
示例18
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
 
示例19
@Override
public void onMerge(GlobalWindow window, OnMergeContext ctx) {
}
 
示例20
@Override
public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
	return new GlobalWindow.Serializer();
}
 
示例21
/**
 * Tests CountEvictor evictAfter behavior.
    */
@Test
public void testCountEvictorEvictAfter() throws Exception {
	AtomicInteger closeCalled = new AtomicInteger(0);
	final int windowSize = 4;
	final int triggerCount = 2;
	final boolean evictAfter = true;

	@SuppressWarnings({"unchecked", "rawtypes"})
	TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
		(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
		new ListStateDescriptor<>("window-contents", streamRecordSerializer);

	EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
		GlobalWindows.create(),
		new GlobalWindow.Serializer(),
		new TupleKeySelector(),
		BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
		stateDesc,
		new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
		CountTrigger.of(triggerCount),
		CountEvictor.of(windowSize, evictAfter),
		0,
		null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
		new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

	long initialTime = 0L;
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.close();

	Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
 
示例22
/**
 * Tests TimeEvictor evictAfter behavior.
 */
@Test
public void testTimeEvictorEvictAfter() throws Exception {
	AtomicInteger closeCalled = new AtomicInteger(0);
	final int triggerCount = 2;
	final boolean evictAfter = true;

	@SuppressWarnings({"unchecked", "rawtypes"})
	TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
		(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
		new ListStateDescriptor<>("window-contents", streamRecordSerializer);

	EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
		GlobalWindows.create(),
		new GlobalWindow.Serializer(),
		new TupleKeySelector(),
		BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
		stateDesc,
		new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
		CountTrigger.of(triggerCount),
		TimeEvictor.of(Time.seconds(2), evictAfter),
		0,
		null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
		new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

	long initialTime = 0L;
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4000));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3500));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1002));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.close();

	Assert.assertEquals("Close was not called.", 1, closeCalled.get());

}
 
示例23
/**
 * Tests time evictor, if no timestamp information in the StreamRecord.
 * No element will be evicted from the window.
 */
@Test
public void testTimeEvictorNoTimestamp() throws Exception {
	AtomicInteger closeCalled = new AtomicInteger(0);
	final int triggerCount = 2;
	final boolean evictAfter = true;

	@SuppressWarnings({"unchecked", "rawtypes"})
	TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
		(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
		new ListStateDescriptor<>("window-contents", streamRecordSerializer);

	EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
		GlobalWindows.create(),
		new GlobalWindow.Serializer(),
		new TupleKeySelector(),
		BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
		stateDesc,
		new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
		CountTrigger.of(triggerCount),
		TimeEvictor.of(Time.seconds(2), evictAfter),
		0,
		null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
		new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.close();

	Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
 
示例24
/**
 * Tests DeltaEvictor, evictBefore behavior.
 */
@Test
public void testDeltaEvictorEvictBefore() throws Exception {
	AtomicInteger closeCalled = new AtomicInteger(0);
	final int triggerCount = 2;
	final boolean evictAfter = false;
	final int threshold = 2;

	@SuppressWarnings({"unchecked", "rawtypes"})
	TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
		(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
		new ListStateDescriptor<>("window-contents", streamRecordSerializer);

	EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
		GlobalWindows.create(),
		new GlobalWindow.Serializer(),
		new TupleKeySelector(),
		BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
		stateDesc,
		new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
		CountTrigger.of(triggerCount),
		DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, Integer>>() {
			@Override
			public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
				return newDataPoint.f1 - oldDataPoint.f1;
			}
		}, evictAfter),
		0,
		null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
		new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

	long initialTime = 0L;
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 5), initialTime + 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 1998));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 1999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 11), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 10999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 8), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 10), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.close();

	Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
 
示例25
/**
 * Tests DeltaEvictor, evictAfter behavior.
 */
@Test
public void testDeltaEvictorEvictAfter() throws Exception {
	AtomicInteger closeCalled = new AtomicInteger(0);
	final int triggerCount = 2;
	final boolean evictAfter = true;
	final int threshold = 2;

	@SuppressWarnings({"unchecked", "rawtypes"})
	TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
		(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
		new ListStateDescriptor<>("window-contents", streamRecordSerializer);

	EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
		GlobalWindows.create(),
		new GlobalWindow.Serializer(),
		new TupleKeySelector(),
		BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
		stateDesc,
		new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
		CountTrigger.of(triggerCount),
		DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, Integer>>() {
			@Override
			public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
				return newDataPoint.f1 - oldDataPoint.f1;
			}
		}, evictAfter),
		0,
		null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
		new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

	long initialTime = 0L;
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 5), initialTime + 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 1998));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 1999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 15), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 9), initialTime + 10999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 16), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 22), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.close();

	Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
 
示例26
@Test
@SuppressWarnings("unchecked")
public void testCountTrigger() throws Exception {

	final int windowSize = 4;
	final int windowSlide = 2;

	@SuppressWarnings({"unchecked", "rawtypes"})
	TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
			(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
			new ListStateDescriptor<>("window-contents", streamRecordSerializer);

	EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
			GlobalWindows.create(),
			new GlobalWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalIterableWindowFunction<>(
					new ReduceApplyWindowFunction<>(
							new SumReducer(),
							// on some versions of Java we seem to need the explicit type
							new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>())),
			CountTrigger.of(windowSlide),
			CountEvictor.of(windowSize),
			0,
			null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

	long initialTime = 0L;
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	// The global window actually ignores these timestamps...

	// add elements out-of-order
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.close();
}
 
示例27
@Test
@SuppressWarnings("unchecked")
public void testCountTriggerWithApply() throws Exception {
	AtomicInteger closeCalled = new AtomicInteger(0);

	final int windowSize = 4;
	final int windowSlide = 2;

	@SuppressWarnings({"unchecked", "rawtypes"})
	TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
			(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
			new ListStateDescriptor<>("window-contents", streamRecordSerializer);

	EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
		GlobalWindows.create(),
		new GlobalWindow.Serializer(),
		new TupleKeySelector(),
		BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
		stateDesc,
		new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
		CountTrigger.of(windowSlide),
		CountEvictor.of(windowSize),
		0,
		null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

	long initialTime = 0L;
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	// The global window actually ignores these timestamps...

	// add elements out-of-order
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.close();

	Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
 
示例28
@Test
@SuppressWarnings("unchecked")
public void testContinuousWatermarkTrigger() throws Exception {
	closeCalled.set(0);

	final int windowSize = 3;

	ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
			new SumReducer(),
			STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
			GlobalWindows.create(),
			new GlobalWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
			ContinuousEventTimeTrigger.of(Time.of(windowSize, TimeUnit.SECONDS)),
			0,
			null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
			createTestHarness(operator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	// The global window actually ignores these timestamps...

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));

	// add elements out-of-order
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));

	testHarness.processWatermark(new Watermark(1000));
	expectedOutput.add(new Watermark(1000));
	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());

	testHarness.processWatermark(new Watermark(2000));
	expectedOutput.add(new Watermark(2000));
	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());

	testHarness.processWatermark(new Watermark(3000));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
	expectedOutput.add(new Watermark(3000));
	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());

	testHarness.processWatermark(new Watermark(4000));
	expectedOutput.add(new Watermark(4000));
	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());

	testHarness.processWatermark(new Watermark(5000));
	expectedOutput.add(new Watermark(5000));
	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());

	testHarness.processWatermark(new Watermark(6000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
	expectedOutput.add(new Watermark(6000));
	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());

	// those don't have any effect...
	testHarness.processWatermark(new Watermark(7000));
	testHarness.processWatermark(new Watermark(8000));
	expectedOutput.add(new Watermark(7000));
	expectedOutput.add(new Watermark(8000));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());

	testHarness.close();
}
 
示例29
@Test
@SuppressWarnings("unchecked")
public void testCountTrigger() throws Exception {
	closeCalled.set(0);

	final int windowSize = 4;

	ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
			new SumReducer(),
			STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
			GlobalWindows.create(),
			new GlobalWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
			PurgingTrigger.of(CountTrigger.of(windowSize)),
			0,
			null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
			createTestHarness(operator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	// The global window actually ignores these timestamps...

	// add elements out-of-order
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));

	// do a snapshot, close and restore again
	OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);

	testHarness.close();

	ConcurrentLinkedQueue<Object> outputBeforeClose = testHarness.getOutput();

	stateDesc = new ReducingStateDescriptor<>("window-contents",
			new SumReducer(),
			STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	operator = new WindowOperator<>(
			GlobalWindows.create(),
			new GlobalWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
			PurgingTrigger.of(CountTrigger.of(windowSize)),
			0,
			null /* late data output tag */);

	testHarness = createTestHarness(operator);

	testHarness.setup();
	testHarness.initializeState(snapshot);
	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, Iterables.concat(outputBeforeClose, testHarness.getOutput()), new Tuple2ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, Iterables.concat(outputBeforeClose, testHarness.getOutput()), new Tuple2ResultSortComparator());

	testHarness.close();
}
 
示例30
@Test
@SuppressWarnings("unchecked")
public void testCountTrigger() throws Exception {

	final int windowSize = 4;
	final int windowSlide = 2;

	@SuppressWarnings({"unchecked", "rawtypes"})
	TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
			(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

	ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
			new ListStateDescriptor<>("window-contents", streamRecordSerializer);

	EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
			GlobalWindows.create(),
			new GlobalWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			stateDesc,
			new InternalIterableWindowFunction<>(
					new ReduceApplyWindowFunction<>(
							new SumReducer(),
							// on some versions of Java we seem to need the explicit type
							new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>())),
			CountTrigger.of(windowSlide),
			CountEvictor.of(windowSize),
			0,
			null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

	long initialTime = 0L;
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	// The global window actually ignores these timestamps...

	// add elements out-of-order
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.close();
}