[case52]聊聊flink KeyedStream的aggregation操作

news/2024/7/1 7:43:21

本文主要研究一下flink KeyedStream的aggregation操作

实例

    @Test
    public void testMax() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        WordCount[] data = new WordCount[]{new WordCount(1,"Hello", 1), new
                WordCount(1,"World", 3), new WordCount(2,"Hello", 1)};
        env.fromElements(data)
                .keyBy("word")
                .max("frequency")
                .addSink(new SinkFunction<WordCount>() {
                    @Override
                    public void invoke(WordCount value, Context context) throws Exception {
                        LOGGER.info("value:{}",value);
                    }
                });
        env.execute("testMax");
    }
  • 这里先对word字段进行keyBy操作,然后再通过KeyedStream的max方法按frequency字段取最大的WordCount

KeyedStream.aggregate

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

    public SingleOutputStreamOperator<T> sum(int positionToSum) {
        return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> sum(String field) {
        return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(int positionToMax) {
        return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(String field) {
        return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
                false, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(int positionToMin) {
        return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(String field) {
        return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
                false, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
        return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
        return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
                first, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
        return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
        return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
                first, getExecutionConfig()));
    }

    protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
        StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
                clean(aggregate), getType().createSerializer(getExecutionConfig()));
        return transform("Keyed Aggregation", getType(), operator);
    }
  • KeyedStream的aggregation方法是protected修饰的,sum、max、min、maxBy、minBy这几个方法实际都是调用aggregate方法,只是它们创建的ComparableAggregator的AggregationType不一样,分别是SUM, MAX, MIN, MAXBY, MINBY
  • 每个sum、max、min、maxBy、minBy都有两个重载方法,一个是int类型的参数,一个是String类型的参数
  • maxBy、minBy比sum、max、min多了first(boolean)参数,该参数用于指定在碰到多个compare值相等时,是否取第一个返回

ComparableAggregator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java

@Internal
public class ComparableAggregator<T> extends AggregationFunction<T> {

    private static final long serialVersionUID = 1L;

    private Comparator comparator;
    private boolean byAggregate;
    private boolean first;
    private final FieldAccessor<T, Object> fieldAccessor;

    private ComparableAggregator(AggregationType aggregationType, FieldAccessor<T, Object> fieldAccessor, boolean first) {
        this.comparator = Comparator.getForAggregation(aggregationType);
        this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
        this.first = first;
        this.fieldAccessor = fieldAccessor;
    }

    public ComparableAggregator(int positionToAggregate,
            TypeInformation<T> typeInfo,
            AggregationType aggregationType,
            ExecutionConfig config) {
        this(positionToAggregate, typeInfo, aggregationType, false, config);
    }

    public ComparableAggregator(int positionToAggregate,
            TypeInformation<T> typeInfo,
            AggregationType aggregationType,
            boolean first,
            ExecutionConfig config) {
        this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, positionToAggregate, config), first);
    }

    public ComparableAggregator(String field,
            TypeInformation<T> typeInfo,
            AggregationType aggregationType,
            boolean first,
            ExecutionConfig config) {
        this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, field, config), first);
    }

    @SuppressWarnings("unchecked")
    @Override
    public T reduce(T value1, T value2) throws Exception {
        Comparable<Object> o1 = (Comparable<Object>) fieldAccessor.get(value1);
        Object o2 = fieldAccessor.get(value2);

        int c = comparator.isExtremal(o1, o2);

        if (byAggregate) {
            // if they are the same we choose based on whether we want to first or last
            // element with the min/max.
            if (c == 0) {
                return first ? value1 : value2;
            }

            return c == 1 ? value1 : value2;

        } else {
            if (c == 0) {
                value1 = fieldAccessor.set(value1, o2);
            }
            return value1;
        }
    }
}
  • ComparableAggregator继承了AggregationFunction,而AggregationFunction则实现了ReduceFunction接口,这里ComparableAggregator实现的reduce方法,它首先借助Comparator来比较两个对象,然后根据是否是byAggregate做不同处理,如果是byAggregate,则在比较值为0时,判断是否返回最先遇到的元素,如果是则返回value1,否则返回value2,比较值非0时,则取比较值最大的元素返回;如果不是byAggregate,则如果比较值为0(比较字段的值value1小于等于value2的情况),则使用反射方法将value2的比较字段的值更新到value1,最后都是返回value1

AggregationFunction

@Internal
public abstract class AggregationFunction<T> implements ReduceFunction<T> {
    private static final long serialVersionUID = 1L;

    /**
     * Aggregation types that can be used on a windowed stream or keyed stream.
     */
    public enum AggregationType {
        SUM, MIN, MAX, MINBY, MAXBY,
    }
}
  • AggregationFunction声明实现了ReduceFunction,同时定义了五种类型的AggregationType,分别是SUM, MIN, MAX, MINBY, MAXBY

Comparator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/Comparator.java

@Internal
public abstract class Comparator implements Serializable {

    private static final long serialVersionUID = 1L;

    public abstract <R> int isExtremal(Comparable<R> o1, R o2);

    public static Comparator getForAggregation(AggregationType type) {
        switch (type) {
        case MAX:
            return new MaxComparator();
        case MIN:
            return new MinComparator();
        case MINBY:
            return new MinByComparator();
        case MAXBY:
            return new MaxByComparator();
        default:
            throw new IllegalArgumentException("Unsupported aggregation type.");
        }
    }

    private static class MaxComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            return o1.compareTo(o2) > 0 ? 1 : 0;
        }

    }

    private static class MaxByComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            int c = o1.compareTo(o2);
            if (c > 0) {
                return 1;
            }
            if (c == 0) {
                return 0;
            } else {
                return -1;
            }
        }

    }

    private static class MinByComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            int c = o1.compareTo(o2);
            if (c < 0) {
                return 1;
            }
            if (c == 0) {
                return 0;
            } else {
                return -1;
            }
        }

    }

    private static class MinComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            return o1.compareTo(o2) < 0 ? 1 : 0;
        }

    }
}
  • Comparator则实现Serializable接口,定义了isExtremal抽象方法,同时提供了getForAggregation工厂方法,根据不同的AggregationType创建不同的Comparator
  • Comparator里头定义了MaxComparator、MinComparator、MinByComparator、MaxByComparator四个子类,它们都实现了isExtremal方法
  • MaxComparator直接利用Comparable接口定义的compareTo方法,不过它的返回只有0和1,compareTo大于0的时候才返回1,否则返回0,也就是大于的情况才返回1,否则返回0;MaxByComparator也先根据Comparable接口定义的compareTo方法获取值,不过它的返回值有3种,大于0的时候返回1,等于0时返回0,小于0时返回-1,也就是大于的情况返回1,相等的情况返回0,小于的情况返回-1

小结

  • KeyedStream的aggregation操作主要分为sum、max、min、maxBy、minBy这几个方法,它们内部都调用了protected修饰的aggregation方法,只是它们创建的ComparableAggregator的AggregationType不一样,分别是SUM, MAX, MIN, MAXBY, MINBY
  • ComparableAggregator继承了AggregationFunction,而AggregationFunction则实现了ReduceFunction接口,这里ComparableAggregator实现的reduce方法,它首先借助Comparator来比较两个对象,然后根据是否是byAggregate做不同处理,如果是byAggregate,则在比较值为0时,判断是否返回最先遇到的元素,如果是则返回最先遇到的,否则返回最后遇到的,比较值非0时,则取比较值最大的元素返回;如果不是byAggregate,则如果比较值为0,则使用反射方法将后者的值更新到value1,最后都是返回value1
  • Comparator里头定义了MaxComparator、MinComparator、MinByComparator、MaxByComparator四个子类,它们都实现了isExtremal方法;MaxComparator与MaxByComparator的区别在于,MaxComparator大于返回1,小于等于返回0,而MaxByComparator返回值更精细,大于返回1,等于返回0,小于返回-1;这个区别也体现在ComparableAggregator的reduce方法中,而且maxBy、minBy比其他方法多了一个first(boolean)参数,专门用于在比较值为的0的时候选择返回哪个元素;而reduce方法对于非byAggregate操作,始终返回的是value1,在比较值小于等于的时候,使用反射更新value1,然后返回value1

doc

  • DataStream Transformations

http://www.niftyadmin.cn/n/4821471.html

相关文章

【译】Swift算法俱乐部-快速排序

本文是对 Swift Algorithm Club 翻译的一篇文章。 Swift Algorithm Club是 raywenderlich.com网站出品的用Swift实现算法和数据结构的开源项目&#xff0c;目前在GitHub上有18000⭐️&#xff0c;我初略统计了一下&#xff0c;大概有一百左右个的算法和数据结构&#xff0c;基本…

如何解决安卓手机显示google play服务停止运行?

相信不少的安卓用户都遇到过这种情况&#xff1a;“很抱歉&#xff0c;‘google play服务’已停止运行”。这到底是怎么一回事呢&#xff1f;接下来就通过本文来给大家介绍一下&#xff0c;我们一起往下看&#xff01; 其实呢&#xff0c;这句话的意思就是说“您的设备不支持部…

第一次做代理,我建议你选择小程序代理!

40岁的老李&#xff0c;做了十年的家电批发&#xff0c;今年转型做了小程序代理&#xff0c;跟当地的一个购物商场合作&#xff0c;一个月赚了20多万&#xff0c;同时做了一个“吃喝玩乐”小程序平台&#xff0c;一个月能够带来5万以上的收益。 在这个世界上&#xff0c;赚钱有…

getDate方法的妙用(js判断闰年)

对于js中的Date对象&#xff0c;我们new Date()后做的最多的操作就是getTime()、getFullYear()、getMonth()、getSecond()&#xff0c;在实际开发中几乎很少会用到getDate()这个方法&#xff0c;因为应用场景太少了。在工作中我们经常会需要判断某个年份是否是闰年这个需求&…

阿里云服务器的一点小坑---端口不通问题,还是防火墙的锅

https://www.cnblogs.com/grey-wolf/p/8961581.html 一、问题概述 最近在组长支持下&#xff0c;一直在折腾jenkins&#xff0c;也推广到了两三个组。期间也加了jenkins相关的qq群&#xff0c;群里的一个哥们问题很奇怪&#xff1a; centos 7.4 64位&#xff0c;使用了如下链接…

存储过程用到的表、分组、排序、联结

查询存储过程用到的表&#xff0c;并进行分组、排序、联结&#xff1a; 1 SELECT 2 REFERENCED_OWNER,3 REFERENCED_NAME,4 LISTAGG(XH||>||NAME,,) WITHIN GROUP(ORDER BY XH ) NAME 5 FROM 6 ( 7 SELECT 8 A.REFERENCED_OWNER…

springboot和redis处理页面缓存

页面缓存是应对高并发的一个比较常见的方案&#xff0c;当请求页面的时候&#xff0c;会先查询redis缓存中是否存在&#xff0c;若存在则直接从缓存中返回页面&#xff0c;否则会通过代码逻辑去渲染页面&#xff0c;并将渲染后的页面缓存到redis中&#xff0c;然后返回。下面通…

Ribbon 框架简介及搭建

Ribbon简介1. 负载均衡框架&#xff0c;支持可插拔式的负载均衡规则2. 支持多种协议&#xff0c;如HTTP、UDP等3. 提供负载均衡客户端Ribbon子模块1. ribbon-core&#xff08;ribbon的核心&#xff0c;主要包含负载均衡器、负载均衡接口、客户端接口、内置负载均衡实现API&…