上一篇我们介绍了flink的滚动计数窗口函数,这篇文章我们介绍下Flink的滑动计数窗口函数案例。
滑动技术窗口主要其实就是在滚动技术窗口的countWindow里面添加一个滑动大小的值,其他的都是一模一样的。我们直接上主类
package com.test.demo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.flink.demo.function.Splitter; import com.flink.demo.sources.MySources; public class CountWindowJob { public static void main(String[] args) throws Exception { // 初始化一个DataStream API得到执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // 由于是需要使用到时间窗口,因此这里我们需要设置下时间语义,如果没有设置这个时间语义,就会报错 environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 添加一个数据源 DataStreamSource<String> source = environment.addSource(new MySources()); source.flatMap(new Splitter()) // 根据第一个数据进行聚合 .keyBy(0) // 滑动窗口的数量是5,滑动大小是1 .countWindow(5,1) // 根据key进行聚合求和 .sum(1) // 打印输出出来 .print("CountWindowJob"); environment.execute(); } }
然后我们运行一下看下结果
最后我们看到滑动计数窗口比滚动技术窗口有更直观的表现,也就是接收到了几个数据,就会统计几个,不同等到设置的阈值再进行计算逻辑。
还没有评论,来说两句吧...