Java源码示例:org.apache.flink.streaming.api.functions.aggregation.SumAggregator
示例1
@Test
public void groupSumIntegerTest() throws Exception {
// preparing expected outputs
List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<>();
List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<>();
int groupedSum0 = 0;
int groupedSum1 = 0;
int groupedSum2 = 0;
for (int i = 0; i < 9; i++) {
int groupedSum;
switch (i % 3) {
case 0:
groupedSum = groupedSum0 += i;
break;
case 1:
groupedSum = groupedSum1 += i;
break;
default:
groupedSum = groupedSum2 += i;
break;
}
expectedGroupSumList.add(new Tuple2<>(i % 3, groupedSum));
expectedGroupMinList.add(new Tuple2<>(i % 3, i % 3));
expectedGroupMaxList.add(new Tuple2<>(i % 3, i));
}
// some necessary boiler plate
TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor.getForObject(new Tuple2<>(0, 0));
ExecutionConfig config = new ExecutionConfig();
KeySelector<Tuple2<Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
typeInfo, config);
TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
// aggregations tested
ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
new SumAggregator<>(1, typeInfo, config);
ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<>(
1, typeInfo, AggregationType.MIN, config);
ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
1, typeInfo, AggregationType.MAX, config);
List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
getInputList(),
keySelector, keyType);
List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
getInputList(),
keySelector, keyType);
List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
getInputList(),
keySelector, keyType);
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
}
示例2
@Test
public void pojoGroupSumIntegerTest() throws Exception {
// preparing expected outputs
List<MyPojo> expectedGroupSumList = new ArrayList<>();
List<MyPojo> expectedGroupMinList = new ArrayList<>();
List<MyPojo> expectedGroupMaxList = new ArrayList<>();
int groupedSum0 = 0;
int groupedSum1 = 0;
int groupedSum2 = 0;
for (int i = 0; i < 9; i++) {
int groupedSum;
switch (i % 3) {
case 0:
groupedSum = groupedSum0 += i;
break;
case 1:
groupedSum = groupedSum1 += i;
break;
default:
groupedSum = groupedSum2 += i;
break;
}
expectedGroupSumList.add(new MyPojo(i % 3, groupedSum));
expectedGroupMinList.add(new MyPojo(i % 3, i % 3));
expectedGroupMaxList.add(new MyPojo(i % 3, i));
}
// some necessary boiler plate
TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(0, 0));
ExecutionConfig config = new ExecutionConfig();
KeySelector<MyPojo, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
typeInfo, config);
TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
// aggregations tested
ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
ReduceFunction<MyPojo> minFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MIN,
false, config);
ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
false, config);
List<MyPojo> groupedSumList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
getInputPojoList(),
keySelector, keyType);
List<MyPojo> groupedMinList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
getInputPojoList(),
keySelector, keyType);
List<MyPojo> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
getInputPojoList(),
keySelector, keyType);
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
}
示例3
@Test
public void groupSumIntegerTest() throws Exception {
// preparing expected outputs
List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<>();
List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<>();
int groupedSum0 = 0;
int groupedSum1 = 0;
int groupedSum2 = 0;
for (int i = 0; i < 9; i++) {
int groupedSum;
switch (i % 3) {
case 0:
groupedSum = groupedSum0 += i;
break;
case 1:
groupedSum = groupedSum1 += i;
break;
default:
groupedSum = groupedSum2 += i;
break;
}
expectedGroupSumList.add(new Tuple2<>(i % 3, groupedSum));
expectedGroupMinList.add(new Tuple2<>(i % 3, i % 3));
expectedGroupMaxList.add(new Tuple2<>(i % 3, i));
}
// some necessary boiler plate
TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor.getForObject(new Tuple2<>(0, 0));
ExecutionConfig config = new ExecutionConfig();
KeySelector<Tuple2<Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
typeInfo, config);
TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
// aggregations tested
ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
new SumAggregator<>(1, typeInfo, config);
ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<>(
1, typeInfo, AggregationType.MIN, config);
ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
1, typeInfo, AggregationType.MAX, config);
List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
getInputList(),
keySelector, keyType);
List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
getInputList(),
keySelector, keyType);
List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
getInputList(),
keySelector, keyType);
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
}
示例4
@Test
public void pojoGroupSumIntegerTest() throws Exception {
// preparing expected outputs
List<MyPojo> expectedGroupSumList = new ArrayList<>();
List<MyPojo> expectedGroupMinList = new ArrayList<>();
List<MyPojo> expectedGroupMaxList = new ArrayList<>();
int groupedSum0 = 0;
int groupedSum1 = 0;
int groupedSum2 = 0;
for (int i = 0; i < 9; i++) {
int groupedSum;
switch (i % 3) {
case 0:
groupedSum = groupedSum0 += i;
break;
case 1:
groupedSum = groupedSum1 += i;
break;
default:
groupedSum = groupedSum2 += i;
break;
}
expectedGroupSumList.add(new MyPojo(i % 3, groupedSum));
expectedGroupMinList.add(new MyPojo(i % 3, i % 3));
expectedGroupMaxList.add(new MyPojo(i % 3, i));
}
// some necessary boiler plate
TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(0, 0));
ExecutionConfig config = new ExecutionConfig();
KeySelector<MyPojo, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
typeInfo, config);
TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
// aggregations tested
ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
ReduceFunction<MyPojo> minFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MIN,
false, config);
ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
false, config);
List<MyPojo> groupedSumList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
getInputPojoList(),
keySelector, keyType);
List<MyPojo> groupedMinList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
getInputPojoList(),
keySelector, keyType);
List<MyPojo> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
getInputPojoList(),
keySelector, keyType);
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
}
示例5
@Test
public void groupSumIntegerTest() throws Exception {
// preparing expected outputs
List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<>();
List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<>();
int groupedSum0 = 0;
int groupedSum1 = 0;
int groupedSum2 = 0;
for (int i = 0; i < 9; i++) {
int groupedSum;
switch (i % 3) {
case 0:
groupedSum = groupedSum0 += i;
break;
case 1:
groupedSum = groupedSum1 += i;
break;
default:
groupedSum = groupedSum2 += i;
break;
}
expectedGroupSumList.add(new Tuple2<>(i % 3, groupedSum));
expectedGroupMinList.add(new Tuple2<>(i % 3, i % 3));
expectedGroupMaxList.add(new Tuple2<>(i % 3, i));
}
// some necessary boiler plate
TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor.getForObject(new Tuple2<>(0, 0));
ExecutionConfig config = new ExecutionConfig();
KeySelector<Tuple2<Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
typeInfo, config);
TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
// aggregations tested
ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
new SumAggregator<>(1, typeInfo, config);
ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<>(
1, typeInfo, AggregationType.MIN, config);
ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
1, typeInfo, AggregationType.MAX, config);
List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
getInputList(),
keySelector, keyType);
List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
getInputList(),
keySelector, keyType);
List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
getInputList(),
keySelector, keyType);
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
}
示例6
@Test
public void pojoGroupSumIntegerTest() throws Exception {
// preparing expected outputs
List<MyPojo> expectedGroupSumList = new ArrayList<>();
List<MyPojo> expectedGroupMinList = new ArrayList<>();
List<MyPojo> expectedGroupMaxList = new ArrayList<>();
int groupedSum0 = 0;
int groupedSum1 = 0;
int groupedSum2 = 0;
for (int i = 0; i < 9; i++) {
int groupedSum;
switch (i % 3) {
case 0:
groupedSum = groupedSum0 += i;
break;
case 1:
groupedSum = groupedSum1 += i;
break;
default:
groupedSum = groupedSum2 += i;
break;
}
expectedGroupSumList.add(new MyPojo(i % 3, groupedSum));
expectedGroupMinList.add(new MyPojo(i % 3, i % 3));
expectedGroupMaxList.add(new MyPojo(i % 3, i));
}
// some necessary boiler plate
TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(0, 0));
ExecutionConfig config = new ExecutionConfig();
KeySelector<MyPojo, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
typeInfo, config);
TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
// aggregations tested
ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
ReduceFunction<MyPojo> minFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MIN,
false, config);
ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
false, config);
List<MyPojo> groupedSumList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
getInputPojoList(),
keySelector, keyType);
List<MyPojo> groupedMinList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
getInputPojoList(),
keySelector, keyType);
List<MyPojo> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
getInputPojoList(),
keySelector, keyType);
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
}
示例7
/**
* Applies an aggregation that sums every window of the data stream at the
* given position.
*
* @param positionToSum The position in the tuple/array to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
示例8
/**
* Applies an aggregation that sums every window of the pojo data stream at the given field for
* every window.
*
* <p>A field expression is either the name of a public field or a getter method with
* parentheses of the stream's underlying type. A dot can be used to drill down into objects,
* as in {@code "field1.getInnerField2()" }.
*
* @param field The field to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
示例9
/**
* Applies an aggregation that sums every window of the data stream at the
* given position.
*
* @param positionToSum The position in the tuple/array to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
示例10
/**
* Applies an aggregation that sums every window of the pojo data stream at
* the given field for every window.
*
* <p>A field expression is either the name of a public field or a getter method with
* parentheses of the stream's underlying type. A dot can be used to drill down into objects,
* as in {@code "field1.getInnerField2()" }.
*
* @param field The field to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
示例11
/**
* Applies an aggregation that gives a rolling sum of the data stream at the
* given position grouped by the given key. An independent aggregate is kept
* per key.
*
* @param positionToSum
* The field position in the data points to sum. This is applicable to
* Tuple types, basic and primitive array types, Scala case classes,
* and primitive types (which is considered as having one field).
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
}
示例12
/**
* Applies an aggregation that gives the current sum of the data
* stream at the given field by the given key. An independent
* aggregate is kept per key.
*
* @param field
* In case of a POJO, Scala case class, or Tuple type, the
* name of the (public) field on which to perform the aggregation.
* Additionally, a dot can be used to drill down into nested
* objects, as in {@code "field1.fieldxy" }.
* Furthermore "*" can be specified in case of a basic type
* (which is considered as having only one field).
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
}
示例13
/**
* Applies an aggregation that sums every window of the data stream at the
* given position.
*
* @param positionToSum The position in the tuple/array to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
示例14
/**
* Applies an aggregation that sums every window of the pojo data stream at the given field for
* every window.
*
* <p>A field expression is either the name of a public field or a getter method with
* parentheses of the stream's underlying type. A dot can be used to drill down into objects,
* as in {@code "field1.getInnerField2()" }.
*
* @param field The field to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
示例15
/**
* Applies an aggregation that sums every window of the data stream at the
* given position.
*
* @param positionToSum The position in the tuple/array to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
示例16
/**
* Applies an aggregation that sums every window of the pojo data stream at
* the given field for every window.
*
* <p>A field expression is either the name of a public field or a getter method with
* parentheses of the stream's underlying type. A dot can be used to drill down into objects,
* as in {@code "field1.getInnerField2()" }.
*
* @param field The field to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
示例17
/**
* Applies an aggregation that gives a rolling sum of the data stream at the
* given position grouped by the given key. An independent aggregate is kept
* per key.
*
* @param positionToSum
* The field position in the data points to sum. This is applicable to
* Tuple types, basic and primitive array types, Scala case classes,
* and primitive types (which is considered as having one field).
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
}
示例18
/**
* Applies an aggregation that gives the current sum of the data
* stream at the given field by the given key. An independent
* aggregate is kept per key.
*
* @param field
* In case of a POJO, Scala case class, or Tuple type, the
* name of the (public) field on which to perform the aggregation.
* Additionally, a dot can be used to drill down into nested
* objects, as in {@code "field1.fieldxy" }.
* Furthermore "*" can be specified in case of a basic type
* (which is considered as having only one field).
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
}
示例19
/**
* Applies an aggregation that sums every window of the data stream at the
* given position.
*
* @param positionToSum The position in the tuple/array to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
示例20
/**
* Applies an aggregation that sums every window of the pojo data stream at the given field for
* every window.
*
* <p>A field expression is either the name of a public field or a getter method with
* parentheses of the stream's underlying type. A dot can be used to drill down into objects,
* as in {@code "field1.getInnerField2()" }.
*
* @param field The field to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
示例21
/**
* Applies an aggregation that sums every window of the data stream at the
* given position.
*
* @param positionToSum The position in the tuple/array to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
示例22
/**
* Applies an aggregation that sums every window of the pojo data stream at
* the given field for every window.
*
* <p>A field expression is either the name of a public field or a getter method with
* parentheses of the stream's underlying type. A dot can be used to drill down into objects,
* as in {@code "field1.getInnerField2()" }.
*
* @param field The field to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
示例23
/**
* Applies an aggregation that gives a rolling sum of the data stream at the
* given position grouped by the given key. An independent aggregate is kept
* per key.
*
* @param positionToSum
* The field position in the data points to sum. This is applicable to
* Tuple types, basic and primitive array types, Scala case classes,
* and primitive types (which is considered as having one field).
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
}
示例24
/**
* Applies an aggregation that gives the current sum of the data
* stream at the given field by the given key. An independent
* aggregate is kept per key.
*
* @param field
* In case of a POJO, Scala case class, or Tuple type, the
* name of the (public) field on which to perform the aggregation.
* Additionally, a dot can be used to drill down into nested
* objects, as in {@code "field1.fieldxy" }.
* Furthermore "*" can be specified in case of a basic type
* (which is considered as having only one field).
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
}