* Summarize a DataSet of Tuples by collecting single pass statistics for all columns.
* <p>Example usage:
* <pre>
* {@code
* Dataset<Tuple3<Double, String, Boolean>> input = // [...]
* Tuple3<NumericColumnSummary,StringColumnSummary, BooleanColumnSummary> summary = DataSetUtils.summarize(input)
* summary.f0.getStandardDeviation()
* summary.f1.getMaxLength()
* }
* </pre>
* @return the summary as a Tuple the same width as input rows
public static <R extends Tuple, T extends Tuple> R summarize(DataSet<T> input) throws Exception {
if (!input.getType().isTupleType()) {
throw new IllegalArgumentException("summarize() is only implemented for DataSet's of Tuples");
final TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getType();
DataSet<TupleSummaryAggregator<R>> result = input.mapPartition(new MapPartitionFunction<T, TupleSummaryAggregator<R>>() {
public void mapPartition(Iterable<T> values, Collector<TupleSummaryAggregator<R>> out) throws Exception {
TupleSummaryAggregator<R> aggregator = SummaryAggregatorFactory.create(inType);
for (Tuple value : values) {
}).reduce(new ReduceFunction<TupleSummaryAggregator<R>>() {
public TupleSummaryAggregator<R> reduce(TupleSummaryAggregator<R> agg1, TupleSummaryAggregator<R> agg2) throws Exception {
return agg1;
return result.collect().get(0).result();
* Summarize a DataSet of Tuples by collecting single pass statistics for all columns.
* <p>Example usage:
* <pre>
* {@code
* Dataset<Tuple3<Double, String, Boolean>> input = // [...]
* Tuple3<NumericColumnSummary,StringColumnSummary, BooleanColumnSummary> summary = DataSetUtils.summarize(input)
* summary.f0.getStandardDeviation()
* summary.f1.getMaxLength()
* }
* </pre>
* @return the summary as a Tuple the same width as input rows
public static <R extends Tuple, T extends Tuple> R summarize(DataSet<T> input) throws Exception {
if (!input.getType().isTupleType()) {
throw new IllegalArgumentException("summarize() is only implemented for DataSet's of Tuples");
final TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getType();
DataSet<TupleSummaryAggregator<R>> result = input.mapPartition(new MapPartitionFunction<T, TupleSummaryAggregator<R>>() {
public void mapPartition(Iterable<T> values, Collector<TupleSummaryAggregator<R>> out) throws Exception {
TupleSummaryAggregator<R> aggregator = SummaryAggregatorFactory.create(inType);
for (Tuple value : values) {
}).reduce(new ReduceFunction<TupleSummaryAggregator<R>>() {
public TupleSummaryAggregator<R> reduce(TupleSummaryAggregator<R> agg1, TupleSummaryAggregator<R> agg2) throws Exception {
return agg1;
return result.collect().get(0).result();
* Summarize a DataSet of Tuples by collecting single pass statistics for all columns.
* <p>Example usage:
* <pre>
* {@code
* Dataset<Tuple3<Double, String, Boolean>> input = // [...]
* Tuple3<NumericColumnSummary,StringColumnSummary, BooleanColumnSummary> summary = DataSetUtils.summarize(input)
* summary.f0.getStandardDeviation()
* summary.f1.getMaxLength()
* }
* </pre>
* @return the summary as a Tuple the same width as input rows
public static <R extends Tuple, T extends Tuple> R summarize(DataSet<T> input) throws Exception {
if (!input.getType().isTupleType()) {
throw new IllegalArgumentException("summarize() is only implemented for DataSet's of Tuples");
final TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getType();
DataSet<TupleSummaryAggregator<R>> result = input.mapPartition(new MapPartitionFunction<T, TupleSummaryAggregator<R>>() {
public void mapPartition(Iterable<T> values, Collector<TupleSummaryAggregator<R>> out) throws Exception {
TupleSummaryAggregator<R> aggregator = SummaryAggregatorFactory.create(inType);
for (Tuple value : values) {
}).reduce(new ReduceFunction<TupleSummaryAggregator<R>>() {
public TupleSummaryAggregator<R> reduce(TupleSummaryAggregator<R> agg1, TupleSummaryAggregator<R> agg2) throws Exception {
return agg1;
return result.collect().get(0).result();