Java源码示例:org.apache.flink.streaming.api.functions.aggregation.SumAggregator

示例1
@Test
public void groupSumIntegerTest() throws Exception {

	// preparing expected outputs
	List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
	List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<>();
	List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<>();

	int groupedSum0 = 0;
	int groupedSum1 = 0;
	int groupedSum2 = 0;

	for (int i = 0; i < 9; i++) {
		int groupedSum;
		switch (i % 3) {
			case 0:
				groupedSum = groupedSum0 += i;
				break;
			case 1:
				groupedSum = groupedSum1 += i;
				break;
			default:
				groupedSum = groupedSum2 += i;
				break;
		}

		expectedGroupSumList.add(new Tuple2<>(i % 3, groupedSum));
		expectedGroupMinList.add(new Tuple2<>(i % 3, i % 3));
		expectedGroupMaxList.add(new Tuple2<>(i % 3, i));
	}

	// some necessary boiler plate
	TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor.getForObject(new Tuple2<>(0, 0));

	ExecutionConfig config = new ExecutionConfig();

	KeySelector<Tuple2<Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
			new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
			typeInfo, config);
	TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);

	// aggregations tested
	ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
			new SumAggregator<>(1, typeInfo, config);
	ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<>(
			1, typeInfo, AggregationType.MIN, config);
	ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
			1, typeInfo, AggregationType.MAX, config);

	List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
			getInputList(),
			keySelector, keyType);

	List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
			getInputList(),
			keySelector, keyType);

	List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
			getInputList(),
			keySelector, keyType);

	assertEquals(expectedGroupSumList, groupedSumList);
	assertEquals(expectedGroupMinList, groupedMinList);
	assertEquals(expectedGroupMaxList, groupedMaxList);
}
 
示例2
@Test
public void pojoGroupSumIntegerTest() throws Exception {

	// preparing expected outputs
	List<MyPojo> expectedGroupSumList = new ArrayList<>();
	List<MyPojo> expectedGroupMinList = new ArrayList<>();
	List<MyPojo> expectedGroupMaxList = new ArrayList<>();

	int groupedSum0 = 0;
	int groupedSum1 = 0;
	int groupedSum2 = 0;

	for (int i = 0; i < 9; i++) {
		int groupedSum;
		switch (i % 3) {
			case 0:
				groupedSum = groupedSum0 += i;
				break;
			case 1:
				groupedSum = groupedSum1 += i;
				break;
			default:
				groupedSum = groupedSum2 += i;
				break;
		}

		expectedGroupSumList.add(new MyPojo(i % 3, groupedSum));
		expectedGroupMinList.add(new MyPojo(i % 3, i % 3));
		expectedGroupMaxList.add(new MyPojo(i % 3, i));
	}

	// some necessary boiler plate
	TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(0, 0));

	ExecutionConfig config = new ExecutionConfig();

	KeySelector<MyPojo, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
			new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
			typeInfo, config);
	TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);

	// aggregations tested
	ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
	ReduceFunction<MyPojo> minFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MIN,
			false, config);
	ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
			false, config);

	List<MyPojo> groupedSumList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
			getInputPojoList(),
			keySelector, keyType);

	List<MyPojo> groupedMinList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
			getInputPojoList(),
			keySelector, keyType);

	List<MyPojo> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
			getInputPojoList(),
			keySelector, keyType);

	assertEquals(expectedGroupSumList, groupedSumList);
	assertEquals(expectedGroupMinList, groupedMinList);
	assertEquals(expectedGroupMaxList, groupedMaxList);
}
 
示例3
@Test
public void groupSumIntegerTest() throws Exception {

	// preparing expected outputs
	List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
	List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<>();
	List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<>();

	int groupedSum0 = 0;
	int groupedSum1 = 0;
	int groupedSum2 = 0;

	for (int i = 0; i < 9; i++) {
		int groupedSum;
		switch (i % 3) {
			case 0:
				groupedSum = groupedSum0 += i;
				break;
			case 1:
				groupedSum = groupedSum1 += i;
				break;
			default:
				groupedSum = groupedSum2 += i;
				break;
		}

		expectedGroupSumList.add(new Tuple2<>(i % 3, groupedSum));
		expectedGroupMinList.add(new Tuple2<>(i % 3, i % 3));
		expectedGroupMaxList.add(new Tuple2<>(i % 3, i));
	}

	// some necessary boiler plate
	TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor.getForObject(new Tuple2<>(0, 0));

	ExecutionConfig config = new ExecutionConfig();

	KeySelector<Tuple2<Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
			new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
			typeInfo, config);
	TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);

	// aggregations tested
	ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
			new SumAggregator<>(1, typeInfo, config);
	ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<>(
			1, typeInfo, AggregationType.MIN, config);
	ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
			1, typeInfo, AggregationType.MAX, config);

	List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
			getInputList(),
			keySelector, keyType);

	List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
			getInputList(),
			keySelector, keyType);

	List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
			getInputList(),
			keySelector, keyType);

	assertEquals(expectedGroupSumList, groupedSumList);
	assertEquals(expectedGroupMinList, groupedMinList);
	assertEquals(expectedGroupMaxList, groupedMaxList);
}
 
示例4
@Test
public void pojoGroupSumIntegerTest() throws Exception {

	// preparing expected outputs
	List<MyPojo> expectedGroupSumList = new ArrayList<>();
	List<MyPojo> expectedGroupMinList = new ArrayList<>();
	List<MyPojo> expectedGroupMaxList = new ArrayList<>();

	int groupedSum0 = 0;
	int groupedSum1 = 0;
	int groupedSum2 = 0;

	for (int i = 0; i < 9; i++) {
		int groupedSum;
		switch (i % 3) {
			case 0:
				groupedSum = groupedSum0 += i;
				break;
			case 1:
				groupedSum = groupedSum1 += i;
				break;
			default:
				groupedSum = groupedSum2 += i;
				break;
		}

		expectedGroupSumList.add(new MyPojo(i % 3, groupedSum));
		expectedGroupMinList.add(new MyPojo(i % 3, i % 3));
		expectedGroupMaxList.add(new MyPojo(i % 3, i));
	}

	// some necessary boiler plate
	TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(0, 0));

	ExecutionConfig config = new ExecutionConfig();

	KeySelector<MyPojo, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
			new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
			typeInfo, config);
	TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);

	// aggregations tested
	ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
	ReduceFunction<MyPojo> minFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MIN,
			false, config);
	ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
			false, config);

	List<MyPojo> groupedSumList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
			getInputPojoList(),
			keySelector, keyType);

	List<MyPojo> groupedMinList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
			getInputPojoList(),
			keySelector, keyType);

	List<MyPojo> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
			getInputPojoList(),
			keySelector, keyType);

	assertEquals(expectedGroupSumList, groupedSumList);
	assertEquals(expectedGroupMinList, groupedMinList);
	assertEquals(expectedGroupMaxList, groupedMaxList);
}
 
示例5
@Test
public void groupSumIntegerTest() throws Exception {

	// preparing expected outputs
	List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
	List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<>();
	List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<>();

	int groupedSum0 = 0;
	int groupedSum1 = 0;
	int groupedSum2 = 0;

	for (int i = 0; i < 9; i++) {
		int groupedSum;
		switch (i % 3) {
			case 0:
				groupedSum = groupedSum0 += i;
				break;
			case 1:
				groupedSum = groupedSum1 += i;
				break;
			default:
				groupedSum = groupedSum2 += i;
				break;
		}

		expectedGroupSumList.add(new Tuple2<>(i % 3, groupedSum));
		expectedGroupMinList.add(new Tuple2<>(i % 3, i % 3));
		expectedGroupMaxList.add(new Tuple2<>(i % 3, i));
	}

	// some necessary boiler plate
	TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor.getForObject(new Tuple2<>(0, 0));

	ExecutionConfig config = new ExecutionConfig();

	KeySelector<Tuple2<Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
			new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
			typeInfo, config);
	TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);

	// aggregations tested
	ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
			new SumAggregator<>(1, typeInfo, config);
	ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<>(
			1, typeInfo, AggregationType.MIN, config);
	ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
			1, typeInfo, AggregationType.MAX, config);

	List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
			getInputList(),
			keySelector, keyType);

	List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
			getInputList(),
			keySelector, keyType);

	List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
			getInputList(),
			keySelector, keyType);

	assertEquals(expectedGroupSumList, groupedSumList);
	assertEquals(expectedGroupMinList, groupedMinList);
	assertEquals(expectedGroupMaxList, groupedMaxList);
}
 
示例6
@Test
public void pojoGroupSumIntegerTest() throws Exception {

	// preparing expected outputs
	List<MyPojo> expectedGroupSumList = new ArrayList<>();
	List<MyPojo> expectedGroupMinList = new ArrayList<>();
	List<MyPojo> expectedGroupMaxList = new ArrayList<>();

	int groupedSum0 = 0;
	int groupedSum1 = 0;
	int groupedSum2 = 0;

	for (int i = 0; i < 9; i++) {
		int groupedSum;
		switch (i % 3) {
			case 0:
				groupedSum = groupedSum0 += i;
				break;
			case 1:
				groupedSum = groupedSum1 += i;
				break;
			default:
				groupedSum = groupedSum2 += i;
				break;
		}

		expectedGroupSumList.add(new MyPojo(i % 3, groupedSum));
		expectedGroupMinList.add(new MyPojo(i % 3, i % 3));
		expectedGroupMaxList.add(new MyPojo(i % 3, i));
	}

	// some necessary boiler plate
	TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(0, 0));

	ExecutionConfig config = new ExecutionConfig();

	KeySelector<MyPojo, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
			new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
			typeInfo, config);
	TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);

	// aggregations tested
	ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
	ReduceFunction<MyPojo> minFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MIN,
			false, config);
	ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
			false, config);

	List<MyPojo> groupedSumList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
			getInputPojoList(),
			keySelector, keyType);

	List<MyPojo> groupedMinList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
			getInputPojoList(),
			keySelector, keyType);

	List<MyPojo> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
			new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
			getInputPojoList(),
			keySelector, keyType);

	assertEquals(expectedGroupSumList, groupedSumList);
	assertEquals(expectedGroupMinList, groupedMinList);
	assertEquals(expectedGroupMaxList, groupedMaxList);
}
 
示例7
/**
 * Applies an aggregation that sums every window of the data stream at the
 * given position.
 *
 * @param positionToSum The position in the tuple/array to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(int positionToSum) {
	return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
 
示例8
/**
 * Applies an aggregation that sums every window of the pojo data stream at the given field for
 * every window.
 *
 * <p>A field expression is either the name of a public field or a getter method with
 * parentheses of the stream's underlying type. A dot can be used to drill down into objects,
 * as in {@code "field1.getInnerField2()" }.
 *
 * @param field The field to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(String field) {
	return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
 
示例9
/**
 * Applies an aggregation that sums every window of the data stream at the
 * given position.
 *
 * @param positionToSum The position in the tuple/array to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(int positionToSum) {
	return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
 
示例10
/**
 * Applies an aggregation that sums every window of the pojo data stream at
 * the given field for every window.
 *
 * <p>A field expression is either the name of a public field or a getter method with
 * parentheses of the stream's underlying type. A dot can be used to drill down into objects,
 * as in {@code "field1.getInnerField2()" }.
 *
 * @param field The field to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(String field) {
	return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
 
示例11
/**
 * Applies an aggregation that gives a rolling sum of the data stream at the
 * given position grouped by the given key. An independent aggregate is kept
 * per key.
 *
 * @param positionToSum
 *            The field position in the data points to sum. This is applicable to
 *            Tuple types, basic and primitive array types, Scala case classes,
 *            and primitive types (which is considered as having one field).
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(int positionToSum) {
	return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
}
 
示例12
/**
 * Applies an aggregation that gives the current sum of the data
 * stream at the given field by the given key. An independent
 * aggregate is kept per key.
 *
 * @param field
 *            In case of a POJO, Scala case class, or Tuple type, the
 *            name of the (public) field on which to perform the aggregation.
 *            Additionally, a dot can be used to drill down into nested
 *            objects, as in {@code "field1.fieldxy" }.
 *            Furthermore "*" can be specified in case of a basic type
 *            (which is considered as having only one field).
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(String field) {
	return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
}
 
示例13
/**
 * Applies an aggregation that sums every window of the data stream at the
 * given position.
 *
 * @param positionToSum The position in the tuple/array to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(int positionToSum) {
	return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
 
示例14
/**
 * Applies an aggregation that sums every window of the pojo data stream at the given field for
 * every window.
 *
 * <p>A field expression is either the name of a public field or a getter method with
 * parentheses of the stream's underlying type. A dot can be used to drill down into objects,
 * as in {@code "field1.getInnerField2()" }.
 *
 * @param field The field to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(String field) {
	return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
 
示例15
/**
 * Applies an aggregation that sums every window of the data stream at the
 * given position.
 *
 * @param positionToSum The position in the tuple/array to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(int positionToSum) {
	return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
 
示例16
/**
 * Applies an aggregation that sums every window of the pojo data stream at
 * the given field for every window.
 *
 * <p>A field expression is either the name of a public field or a getter method with
 * parentheses of the stream's underlying type. A dot can be used to drill down into objects,
 * as in {@code "field1.getInnerField2()" }.
 *
 * @param field The field to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(String field) {
	return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
 
示例17
/**
 * Applies an aggregation that gives a rolling sum of the data stream at the
 * given position grouped by the given key. An independent aggregate is kept
 * per key.
 *
 * @param positionToSum
 *            The field position in the data points to sum. This is applicable to
 *            Tuple types, basic and primitive array types, Scala case classes,
 *            and primitive types (which is considered as having one field).
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(int positionToSum) {
	return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
}
 
示例18
/**
 * Applies an aggregation that gives the current sum of the data
 * stream at the given field by the given key. An independent
 * aggregate is kept per key.
 *
 * @param field
 *            In case of a POJO, Scala case class, or Tuple type, the
 *            name of the (public) field on which to perform the aggregation.
 *            Additionally, a dot can be used to drill down into nested
 *            objects, as in {@code "field1.fieldxy" }.
 *            Furthermore "*" can be specified in case of a basic type
 *            (which is considered as having only one field).
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(String field) {
	return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
}
 
示例19
/**
 * Applies an aggregation that sums every window of the data stream at the
 * given position.
 *
 * @param positionToSum The position in the tuple/array to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(int positionToSum) {
	return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
 
示例20
/**
 * Applies an aggregation that sums every window of the pojo data stream at the given field for
 * every window.
 *
 * <p>A field expression is either the name of a public field or a getter method with
 * parentheses of the stream's underlying type. A dot can be used to drill down into objects,
 * as in {@code "field1.getInnerField2()" }.
 *
 * @param field The field to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(String field) {
	return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
 
示例21
/**
 * Applies an aggregation that sums every window of the data stream at the
 * given position.
 *
 * @param positionToSum The position in the tuple/array to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(int positionToSum) {
	return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}
 
示例22
/**
 * Applies an aggregation that sums every window of the pojo data stream at
 * the given field for every window.
 *
 * <p>A field expression is either the name of a public field or a getter method with
 * parentheses of the stream's underlying type. A dot can be used to drill down into objects,
 * as in {@code "field1.getInnerField2()" }.
 *
 * @param field The field to sum
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(String field) {
	return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}
 
示例23
/**
 * Applies an aggregation that gives a rolling sum of the data stream at the
 * given position grouped by the given key. An independent aggregate is kept
 * per key.
 *
 * @param positionToSum
 *            The field position in the data points to sum. This is applicable to
 *            Tuple types, basic and primitive array types, Scala case classes,
 *            and primitive types (which is considered as having one field).
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(int positionToSum) {
	return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
}
 
示例24
/**
 * Applies an aggregation that gives the current sum of the data
 * stream at the given field by the given key. An independent
 * aggregate is kept per key.
 *
 * @param field
 *            In case of a POJO, Scala case class, or Tuple type, the
 *            name of the (public) field on which to perform the aggregation.
 *            Additionally, a dot can be used to drill down into nested
 *            objects, as in {@code "field1.fieldxy" }.
 *            Furthermore "*" can be specified in case of a basic type
 *            (which is considered as having only one field).
 * @return The transformed DataStream.
 */
public SingleOutputStreamOperator<T> sum(String field) {
	return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
}