Java源码示例:org.apache.flink.api.common.functions.RichFoldFunction
示例1
/**
* .fold() does not support RichFoldFunction, since the fold function is used internally
* in a {@code FoldingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testFoldWithRichFolderFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return null;
}
});
fail("exception was not thrown");
}
示例2
/**
* .fold() does not support RichFoldFunction, since the fold function is used internally
* in a {@code FoldingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testFoldWithRichFolderFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private static final long serialVersionUID = -6448847205314995812L;
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return null;
}
});
fail("exception was not thrown");
}
示例3
/**
* .fold() does not support RichFoldFunction, since the fold function is used internally
* in a {@code FoldingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testFoldWithRichFolderFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return null;
}
});
fail("exception was not thrown");
}
示例4
/**
* .fold() does not support RichFoldFunction, since the fold function is used internally
* in a {@code FoldingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testFoldWithRichFolderFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private static final long serialVersionUID = -6448847205314995812L;
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return null;
}
});
fail("exception was not thrown");
}
示例5
/**
* .fold() does not support RichFoldFunction, since the fold function is used internally
* in a {@code FoldingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testFoldWithRichFolderFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return null;
}
});
fail("exception was not thrown");
}
示例6
/**
* .fold() does not support RichFoldFunction, since the fold function is used internally
* in a {@code FoldingState}.
*/
@Test(expected = UnsupportedOperationException.class)
public void testFoldWithRichFolderFails() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
source
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private static final long serialVersionUID = -6448847205314995812L;
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return null;
}
});
fail("exception was not thrown");
}