Java源码示例:org.apache.flink.streaming.api.windowing.windows.GlobalWindow
示例1
static <T> WindowAssigner<T, GlobalWindow> mockGlobalWindowAssigner() throws Exception {
@SuppressWarnings("unchecked")
WindowAssigner<T, GlobalWindow> mockAssigner = mock(WindowAssigner.class);
when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new GlobalWindow.Serializer());
when(mockAssigner.isEventTime()).thenReturn(true);
when(mockAssigner.assignWindows(Mockito.<T>any(), anyLong(), anyAssignerContext())).thenReturn(Collections.singletonList(GlobalWindow.get()));
return mockAssigner;
}
示例2
private void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, GlobalWindow> mockAssigner = mockGlobalWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, GlobalWindow> mockTrigger = mockTrigger();
InternalWindowFunction<Iterable<Integer>, Void, Integer, GlobalWindow> mockWindowFunction = mockWindowFunction();
// this needs to be true for the test to succeed
assertEquals(Long.MAX_VALUE, GlobalWindow.get().maxTimestamp());
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
assertEquals(0, testHarness.getOutput().size());
assertEquals(0, testHarness.numKeyedStateEntries());
testHarness.processElement(new StreamRecord<>(0, 0L));
// just the window contents
assertEquals(1, testHarness.numKeyedStateEntries());
// verify we have no timers for either time domain
assertEquals(0, testHarness.numEventTimeTimers());
assertEquals(0, testHarness.numProcessingTimeTimers());
}
示例3
@Test
public void testWindowAssignment() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
GlobalWindows assigner = GlobalWindows.create();
assertThat(assigner.assignWindows("String", 0L, mockContext), contains(GlobalWindow.get()));
assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(GlobalWindow.get()));
assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(GlobalWindow.get()));
}
示例4
static <T> WindowAssigner<T, GlobalWindow> mockGlobalWindowAssigner() throws Exception {
@SuppressWarnings("unchecked")
WindowAssigner<T, GlobalWindow> mockAssigner = mock(WindowAssigner.class);
when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new GlobalWindow.Serializer());
when(mockAssigner.isEventTime()).thenReturn(true);
when(mockAssigner.assignWindows(Mockito.<T>any(), anyLong(), anyAssignerContext())).thenReturn(Collections.singletonList(GlobalWindow.get()));
return mockAssigner;
}
示例5
@Test
public void testWindowAssignment() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
GlobalWindows assigner = GlobalWindows.create();
assertThat(assigner.assignWindows("String", 0L, mockContext), contains(GlobalWindow.get()));
assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(GlobalWindow.get()));
assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(GlobalWindow.get()));
}
示例6
private void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, GlobalWindow> mockAssigner = mockGlobalWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, GlobalWindow> mockTrigger = mockTrigger();
InternalWindowFunction<Iterable<Integer>, Void, Integer, GlobalWindow> mockWindowFunction = mockWindowFunction();
// this needs to be true for the test to succeed
assertEquals(Long.MAX_VALUE, GlobalWindow.get().maxTimestamp());
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
testHarness.open();
assertEquals(0, testHarness.getOutput().size());
assertEquals(0, testHarness.numKeyedStateEntries());
testHarness.processElement(new StreamRecord<>(0, 0L));
// just the window contents
assertEquals(1, testHarness.numKeyedStateEntries());
// verify we have no timers for either time domain
assertEquals(0, testHarness.numEventTimeTimers());
assertEquals(0, testHarness.numProcessingTimeTimers());
}
示例7
@Test
public void testWindowAssignment() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
GlobalWindows assigner = GlobalWindows.create();
assertThat(assigner.assignWindows("String", 0L, mockContext), contains(GlobalWindow.get()));
assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(GlobalWindow.get()));
assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(GlobalWindow.get()));
}
示例8
@Test
public void testProperties() {
GlobalWindows assigner = GlobalWindows.create();
assertFalse(assigner.isEventTime());
assertEquals(new GlobalWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(GlobalWindows.NeverTrigger.class));
}
示例9
@Override
public TriggerResult onElement(ConnectedCarEvent event, long timestamp, GlobalWindow window, TriggerContext context) throws Exception {
// if this is a stop event, set a timer
if (event.speed == 0.0) {
context.registerEventTimeTimer(event.timestamp);
}
return TriggerResult.CONTINUE;
}
示例10
@Override
public void evictAfter(Iterable<TimestampedValue<ConnectedCarEvent>> elements, int size, GlobalWindow window, EvictorContext ctx) {
long firstStop = ConnectedCarEvent.earliestStopElement(elements);
// remove all events up to (and including) the first stop event (which is the event that triggered the window)
for (Iterator<TimestampedValue<ConnectedCarEvent>> iterator = elements.iterator(); iterator.hasNext(); ) {
TimestampedValue<ConnectedCarEvent> element = iterator.next();
if (element.getTimestamp() <= firstStop) {
iterator.remove();
}
}
}
示例11
@Override
public void apply(Tuple key, GlobalWindow window, Iterable<ConnectedCarEvent> events, Collector<StoppedSegment> out) {
StoppedSegment seg = new StoppedSegment(events);
if (seg.length > 0) {
out.collect(seg);
}
}
示例12
@Test
public void testProperties() {
GlobalWindows assigner = GlobalWindows.create();
assertFalse(assigner.isEventTime());
assertEquals(new GlobalWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(GlobalWindows.NeverTrigger.class));
}
示例13
@Override
public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(GlobalWindow.get());
}
示例14
@Override
public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return new NeverTrigger();
}
示例15
@Override
public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
示例16
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
示例17
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
示例18
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
示例19
@Override
public void onMerge(GlobalWindow window, OnMergeContext ctx) {
}
示例20
@Override
public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new GlobalWindow.Serializer();
}
示例21
/**
* Tests CountEvictor evictAfter behavior.
*/
@Test
public void testCountEvictorEvictAfter() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
final int windowSize = 4;
final int triggerCount = 2;
final boolean evictAfter = true;
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(triggerCount),
CountEvictor.of(windowSize, evictAfter),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.close();
Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
示例22
/**
* Tests TimeEvictor evictAfter behavior.
*/
@Test
public void testTimeEvictorEvictAfter() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
final int triggerCount = 2;
final boolean evictAfter = true;
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(triggerCount),
TimeEvictor.of(Time.seconds(2), evictAfter),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1002));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.close();
Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
示例23
/**
* Tests time evictor, if no timestamp information in the StreamRecord.
* No element will be evicted from the window.
*/
@Test
public void testTimeEvictorNoTimestamp() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
final int triggerCount = 2;
final boolean evictAfter = true;
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(triggerCount),
TimeEvictor.of(Time.seconds(2), evictAfter),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.close();
Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
示例24
/**
* Tests DeltaEvictor, evictBefore behavior.
*/
@Test
public void testDeltaEvictorEvictBefore() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
final int triggerCount = 2;
final boolean evictAfter = false;
final int threshold = 2;
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(triggerCount),
DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, Integer>>() {
@Override
public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
return newDataPoint.f1 - oldDataPoint.f1;
}
}, evictAfter),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 5), initialTime + 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 11), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 10999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 8), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 10), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.close();
Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
示例25
/**
* Tests DeltaEvictor, evictAfter behavior.
*/
@Test
public void testDeltaEvictorEvictAfter() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
final int triggerCount = 2;
final boolean evictAfter = true;
final int threshold = 2;
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(triggerCount),
DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, Integer>>() {
@Override
public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
return newDataPoint.f1 - oldDataPoint.f1;
}
}, evictAfter),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 5), initialTime + 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), initialTime + 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 15), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 9), initialTime + 10999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 16), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 22), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.close();
Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
示例26
@Test
@SuppressWarnings("unchecked")
public void testCountTrigger() throws Exception {
final int windowSize = 4;
final int windowSlide = 2;
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(
new ReduceApplyWindowFunction<>(
new SumReducer(),
// on some versions of Java we seem to need the explicit type
new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>())),
CountTrigger.of(windowSlide),
CountEvictor.of(windowSize),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// The global window actually ignores these timestamps...
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.close();
}
示例27
@Test
@SuppressWarnings("unchecked")
public void testCountTriggerWithApply() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
final int windowSize = 4;
final int windowSlide = 2;
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(windowSlide),
CountEvictor.of(windowSize),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// The global window actually ignores these timestamps...
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.close();
Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
示例28
@Test
@SuppressWarnings("unchecked")
public void testContinuousWatermarkTrigger() throws Exception {
closeCalled.set(0);
final int windowSize = 3;
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
new SumReducer(),
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
ContinuousEventTimeTrigger.of(Time.of(windowSize, TimeUnit.SECONDS)),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// The global window actually ignores these timestamps...
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(1000));
expectedOutput.add(new Watermark(1000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(2000));
expectedOutput.add(new Watermark(2000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(3000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
expectedOutput.add(new Watermark(3000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(4000));
expectedOutput.add(new Watermark(4000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(5000));
expectedOutput.add(new Watermark(5000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processWatermark(new Watermark(6000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
expectedOutput.add(new Watermark(6000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
// those don't have any effect...
testHarness.processWatermark(new Watermark(7000));
testHarness.processWatermark(new Watermark(8000));
expectedOutput.add(new Watermark(7000));
expectedOutput.add(new Watermark(8000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.close();
}
示例29
@Test
@SuppressWarnings("unchecked")
public void testCountTrigger() throws Exception {
closeCalled.set(0);
final int windowSize = 4;
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
new SumReducer(),
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
PurgingTrigger.of(CountTrigger.of(windowSize)),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
createTestHarness(operator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// The global window actually ignores these timestamps...
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
// do a snapshot, close and restore again
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
testHarness.close();
ConcurrentLinkedQueue<Object> outputBeforeClose = testHarness.getOutput();
stateDesc = new ReducingStateDescriptor<>("window-contents",
new SumReducer(),
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
operator = new WindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
PurgingTrigger.of(CountTrigger.of(windowSize)),
0,
null /* late data output tag */);
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, Iterables.concat(outputBeforeClose, testHarness.getOutput()), new Tuple2ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, Iterables.concat(outputBeforeClose, testHarness.getOutput()), new Tuple2ResultSortComparator());
testHarness.close();
}
示例30
@Test
@SuppressWarnings("unchecked")
public void testCountTrigger() throws Exception {
final int windowSize = 4;
final int windowSlide = 2;
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(
new ReduceApplyWindowFunction<>(
new SumReducer(),
// on some versions of Java we seem to need the explicit type
new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>())),
CountTrigger.of(windowSlide),
CountEvictor.of(windowSize),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// The global window actually ignores these timestamps...
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
testHarness.close();
}