Java源码示例:org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows
示例1
@Test
public void testMergeCoveringWindow() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(
Lists.newArrayList(
new TimeWindow(1, 1),
new TimeWindow(0, 2),
new TimeWindow(4, 7),
new TimeWindow(5, 6)),
callback);
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
eq(new TimeWindow(0, 2)));
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
eq(new TimeWindow(4, 7)));
verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
示例2
@Test
public void testMergeCoveringWindow() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(
Lists.newArrayList(
new TimeWindow(1, 1),
new TimeWindow(0, 2),
new TimeWindow(4, 7),
new TimeWindow(5, 6)),
callback);
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
eq(new TimeWindow(0, 2)));
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
eq(new TimeWindow(4, 7)));
verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
示例3
@Test
public void testMergeCoveringWindow() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(
Lists.newArrayList(
new TimeWindow(1, 1),
new TimeWindow(0, 2),
new TimeWindow(4, 7),
new TimeWindow(5, 6)),
callback);
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
eq(new TimeWindow(0, 2)));
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
eq(new TimeWindow(4, 7)));
verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
示例4
@Test
public void testDynamicGapProperties() {
SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
DynamicEventTimeSessionWindows<String> assigner = EventTimeSessionWindows.withDynamicGap(extractor);
assertNotNull(assigner);
assertTrue(assigner.isEventTime());
}
示例5
@Test
public void testWindowAssignment() {
WindowAssigner.WindowAssignerContext mockContext = mock(WindowAssigner.WindowAssignerContext.class);
SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(eq("gap5000"))).thenReturn(5000L);
when(extractor.extract(eq("gap4000"))).thenReturn(4000L);
when(extractor.extract(eq("gap9000"))).thenReturn(9000L);
DynamicEventTimeSessionWindows<String> assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assertThat(assigner.assignWindows("gap5000", 0L, mockContext), contains(timeWindow(0, 5000)));
assertThat(assigner.assignWindows("gap4000", 4999L, mockContext), contains(timeWindow(4999, 8999)));
assertThat(assigner.assignWindows("gap9000", 5000L, mockContext), contains(timeWindow(5000, 14000)));
}
示例6
@Test
public void testMergeSinglePointWindow() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback);
verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
示例7
@Test
public void testMergeSingleWindow() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), callback);
verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
示例8
@Test
public void testMergeConsecutiveWindows() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(
Lists.newArrayList(
new TimeWindow(0, 1),
new TimeWindow(1, 2),
new TimeWindow(2, 3),
new TimeWindow(4, 5),
new TimeWindow(5, 6)),
callback);
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))),
eq(new TimeWindow(0, 3)));
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
eq(new TimeWindow(4, 6)));
verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
示例9
@Test
public void testProperties() {
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assertTrue(assigner.isEventTime());
assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
}
示例10
@Test
public void testDynamicGapProperties() {
SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
DynamicEventTimeSessionWindows<String> assigner = EventTimeSessionWindows.withDynamicGap(extractor);
assertNotNull(assigner);
assertTrue(assigner.isEventTime());
}
示例11
@Test
public void testWindowAssignment() {
WindowAssigner.WindowAssignerContext mockContext = mock(WindowAssigner.WindowAssignerContext.class);
SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(eq("gap5000"))).thenReturn(5000L);
when(extractor.extract(eq("gap4000"))).thenReturn(4000L);
when(extractor.extract(eq("gap9000"))).thenReturn(9000L);
DynamicEventTimeSessionWindows<String> assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assertThat(assigner.assignWindows("gap5000", 0L, mockContext), contains(timeWindow(0, 5000)));
assertThat(assigner.assignWindows("gap4000", 4999L, mockContext), contains(timeWindow(4999, 8999)));
assertThat(assigner.assignWindows("gap9000", 5000L, mockContext), contains(timeWindow(5000, 14000)));
}
示例12
@Test
public void testMergeSinglePointWindow() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback);
verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
示例13
@Test
public void testMergeSingleWindow() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), callback);
verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
示例14
@Test
public void testMergeConsecutiveWindows() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(
Lists.newArrayList(
new TimeWindow(0, 1),
new TimeWindow(1, 2),
new TimeWindow(2, 3),
new TimeWindow(4, 5),
new TimeWindow(5, 6)),
callback);
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))),
eq(new TimeWindow(0, 3)));
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
eq(new TimeWindow(4, 6)));
verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
示例15
@Test
public void testProperties() {
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assertTrue(assigner.isEventTime());
assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
}
示例16
@Test
public void testDynamicGapProperties() {
SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
DynamicEventTimeSessionWindows<String> assigner = EventTimeSessionWindows.withDynamicGap(extractor);
assertNotNull(assigner);
assertTrue(assigner.isEventTime());
}
示例17
@Test
public void testWindowAssignment() {
WindowAssigner.WindowAssignerContext mockContext = mock(WindowAssigner.WindowAssignerContext.class);
SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(eq("gap5000"))).thenReturn(5000L);
when(extractor.extract(eq("gap4000"))).thenReturn(4000L);
when(extractor.extract(eq("gap9000"))).thenReturn(9000L);
DynamicEventTimeSessionWindows<String> assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assertThat(assigner.assignWindows("gap5000", 0L, mockContext), contains(timeWindow(0, 5000)));
assertThat(assigner.assignWindows("gap4000", 4999L, mockContext), contains(timeWindow(4999, 8999)));
assertThat(assigner.assignWindows("gap9000", 5000L, mockContext), contains(timeWindow(5000, 14000)));
}
示例18
@Test
public void testMergeSinglePointWindow() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback);
verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
示例19
@Test
public void testMergeSingleWindow() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), callback);
verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
}
示例20
@Test
public void testMergeConsecutiveWindows() {
MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assigner.mergeWindows(
Lists.newArrayList(
new TimeWindow(0, 1),
new TimeWindow(1, 2),
new TimeWindow(2, 3),
new TimeWindow(4, 5),
new TimeWindow(5, 6)),
callback);
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))),
eq(new TimeWindow(0, 3)));
verify(callback, times(1)).merge(
(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
eq(new TimeWindow(4, 6)));
verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
}
示例21
@Test
public void testProperties() {
SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any())).thenReturn(5000L);
DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
assertTrue(assigner.isEventTime());
assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
}
示例22
@Test
@SuppressWarnings("unchecked")
public void testDynamicEventTimeSessionWindows() throws Exception {
closeCalled.set(0);
SessionWindowTimeGapExtractor<Tuple2<String, Integer>> extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any(Tuple2.class))).thenAnswer(invocation -> {
Tuple2<String, Integer> element = (Tuple2<String, Integer>) invocation.getArguments()[0];
switch (element.f0) {
case "key1":
return 3000L;
case "key2":
switch (element.f1) {
case 10:
return 1000L;
default:
return 2000L;
}
default:
return 0L;
}
});
ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
DynamicEventTimeSessionWindows.withDynamicGap(extractor),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
EventTimeTrigger.create(),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// test different gaps for different keys
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 10));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
testHarness.processWatermark(new Watermark(8999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 3010L), 3009));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-9", 5000L, 8000L), 7999));
expectedOutput.add(new Watermark(8999));
// test gap when it produces an end time before current timeout
// the furthest timeout is respected
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 9000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 10000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 10500));
testHarness.processWatermark(new Watermark(12999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-13", 9000L, 12000L), 11999));
expectedOutput.add(new Watermark(12999));
// test gap when it produces an end time after current timeout
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14000));
testHarness.processWatermark(new Watermark(16999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-21", 13000L, 16000L), 15999));
expectedOutput.add(new Watermark(16999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
}
示例23
@Test
@SuppressWarnings("unchecked")
public void testDynamicEventTimeSessionWindows() throws Exception {
closeCalled.set(0);
SessionWindowTimeGapExtractor<Tuple2<String, Integer>> extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any(Tuple2.class))).thenAnswer(invocation -> {
Tuple2<String, Integer> element = (Tuple2<String, Integer>) invocation.getArguments()[0];
switch (element.f0) {
case "key1":
return 3000L;
case "key2":
switch (element.f1) {
case 10:
return 1000L;
default:
return 2000L;
}
default:
return 0L;
}
});
ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
DynamicEventTimeSessionWindows.withDynamicGap(extractor),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
EventTimeTrigger.create(),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// test different gaps for different keys
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 10));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
testHarness.processWatermark(new Watermark(8999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 3010L), 3009));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-9", 5000L, 8000L), 7999));
expectedOutput.add(new Watermark(8999));
// test gap when it produces an end time before current timeout
// the furthest timeout is respected
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 9000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 10000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 10500));
testHarness.processWatermark(new Watermark(12999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-13", 9000L, 12000L), 11999));
expectedOutput.add(new Watermark(12999));
// test gap when it produces an end time after current timeout
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14000));
testHarness.processWatermark(new Watermark(16999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-21", 13000L, 16000L), 15999));
expectedOutput.add(new Watermark(16999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
}
示例24
@Test
@SuppressWarnings("unchecked")
public void testDynamicEventTimeSessionWindows() throws Exception {
closeCalled.set(0);
SessionWindowTimeGapExtractor<Tuple2<String, Integer>> extractor = mock(SessionWindowTimeGapExtractor.class);
when(extractor.extract(any(Tuple2.class))).thenAnswer(invocation -> {
Tuple2<String, Integer> element = (Tuple2<String, Integer>) invocation.getArguments()[0];
switch (element.f0) {
case "key1":
return 3000L;
case "key2":
switch (element.f1) {
case 10:
return 1000L;
default:
return 2000L;
}
default:
return 0L;
}
});
ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
DynamicEventTimeSessionWindows.withDynamicGap(extractor),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
EventTimeTrigger.create(),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// test different gaps for different keys
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 10));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
testHarness.processWatermark(new Watermark(8999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 3010L), 3009));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-9", 5000L, 8000L), 7999));
expectedOutput.add(new Watermark(8999));
// test gap when it produces an end time before current timeout
// the furthest timeout is respected
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 9000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 10000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 10500));
testHarness.processWatermark(new Watermark(12999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-13", 9000L, 12000L), 11999));
expectedOutput.add(new Watermark(12999));
// test gap when it produces an end time after current timeout
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14000));
testHarness.processWatermark(new Watermark(16999));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-21", 13000L, 16000L), 15999));
expectedOutput.add(new Watermark(16999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
testHarness.close();
}