2023-01-18 flink 11.6 时间水印 和 窗口周期的关系计算方法

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

forBoundedOutOfOrderness 和 TumblingEventTimeWindows

forBoundedOutOfOrdernessM

TumblingEventTimeWindowsN

第一条数据的时间TS1

第一个窗口期公式

窗口开始时间

win_start = ((TS1-M)/N) * N

窗口结束时间

win_end = win_start+N

数据过期

凡是<win_start都是过期数据

第一个窗口汇总计算触发

与数据之间的接收的间隔时间无关,与总时长也无关。

只与接收到的数据的时间TS2有关。

当 TS2>=win_end+M 时会将时间水印在 >= win_start && <=win_end 给到Apply。

TS2>=win_end+M 是唯一条件。


import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class WaterMarkTest {
    public WaterMarkTest() {
        
    }
    static StringBuilder sb = new StringBuilder();
    static long sts = 0L;
    static long ets = 0L;
    static long sleeps = 500; 
    public <R> void run()  throws Exception{
        sts = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        OutputTag<Tuple2<String, String>> lateOutputTag = new OutputTag<Tuple2<String, String>>("late-data-lx"){private static final long serialVersionUID = 154621L;};

        DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
            private static final long serialVersionUID = 1134546L;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                ctx.collect("hello,1553503188000");
                Thread.sleep(sleeps);//水印计算间隔是200ms所以不要低于这个值
                ctx.collect("hello,1553503186000");
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503183000");
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503180000"); 
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503185000");
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503188000");
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503189000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503188000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503189000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503190000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503191000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503186000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503187000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503185000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503184000"); //丢弃
                Thread.sleep(1000);
                ctx.collect("hello,1553503183000"); //丢弃
                Thread.sleep(1000);
                ctx.collect("hello,1553503190000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503192000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503193000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503194000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503195000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503196000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503197000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503198000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503199000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503200000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503201000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503202000");
//                Thread.sleep(15000);
                System.out.println("1 ============================================================");
                sb.append("time use 1="+(ets-sts)+", ets="+ets+"\n");
            }
 
            @Override
            public void cancel() {
 
            }
        }, "source1")
         /**
          *  assignTimestampsAndWatermarks 的代码注释翻译
          *  Assigns timestamps to the elements in the data stream and generates watermarks to signalevent time progress. 
          *  The given WatermarkStrategy is used to create a TimestampAssigner and WatermarkGenerator.
          *
          *  为数据流里面的元素设置时间并且给”信号事件时间处理”计算水印
          *  给定的水印策略是用来创建 TimestampAssigner and WatermarkGenerator
          *
          *  For each event in the data stream, the TimestampAssigner.extractTimestamp(Object, long) method is called to assign an event timestamp.
          *
          *  数据流里面的每一个事件都会调用 TimestampAssigner.extractTimestamp(Object, long) 方法去给事件添加时间记录。
          *
          *  For each event in the data stream, the WatermarkGenerator.onEvent(Object, long, WatermarkOutput) will be called.
          *  数据流里面的每个事件都会调用WatermarkGenerator.onEvent方法
          *
          */
        .assignTimestampsAndWatermarks(
                WatermarkStrategy                    
                    .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))                    
                    /**
                     * 给数据打上时间信息
                     */
                    .withTimestampAssigner(
                        new SerializableTimestampAssigner<String>() {
                            private static final long serialVersionUID = 134231L;
                            long recordTimestamp = 0L;
                            long lst_ts = 0L;
                            @Override
                            public long extractTimestamp(String element, long _recordTimestamp) {//_recordTimestamp 是element的内部时间
                                String[] fields = element.split(",");
                                Long aLong = new Long(fields[1]);
                                long now = System.currentTimeMillis();
//                                if(aLong>recordTimestamp) {
                                    String msg = now+"["+(lst_ts>0?(now-lst_ts):0)+"] Key-> " + fields[0] + ",EventTime:" + aLong +", recordTimestamp="+recordTimestamp;
                                    System.out.println(msg);
                                    sb.append(msg).append("\n");
                                    if(lst_ts==0)lst_ts=  now;
//                                }
                                recordTimestamp  = Math.max(aLong, recordTimestamp);
                                return aLong;
                            }
                    }
            )
        );

        dataStream.map(new MapFunction<String, Tuple2<String, String>>() {
            private static final long serialVersionUID = 12342L;
            @Override
            public Tuple2<String, String> map(String s) throws Exception {
                return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);
            }
        }).keyBy(f->f.f0)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))//n秒种滚动窗口
        .allowedLateness(Time.seconds(0))//这个设置大于0就会出现一个周期反复出现结果的情况而且是会把当前周期退回晚的数据的周期就是有一条迟到就会改允许迟到的周期。
        .sideOutputLateData(lateOutputTag)
        .apply(new WindowFunction<Tuple2<String,String>, String, String, TimeWindow>() {
            private static final long serialVersionUID = 1112151L;
            private long last_deal_ts = 0L;
            int pos = 0;
            String msg0 = "";
            String msg1 = "";
            @Override
            public void apply(java.lang.String key, TimeWindow window,    Iterable<Tuple2<java.lang.String, java.lang.String>> input, Collector<java.lang.String> out) throws Exception {
                long cur_ts = System.currentTimeMillis();
                if(last_deal_ts==0)
                    last_deal_ts = sts;
                String msg =  cur_ts+"["+(last_deal_ts>0?(cur_ts-last_deal_ts):"-")+"]"+" 当前窗口开始时间[" + window.getStart() + ",结束时间" + window.getEnd() + ")";
                sb.append(msg).append("\n");
                System.out.println(msg);
                last_deal_ts = cur_ts;
                List<Tuple2<String, String>> list = new ArrayList<>();
                input.forEach(o -> list.add(o));
                list.sort((o1, o2) -> o1.f1.compareTo(o2.f1));
                //list.sort(Comparator.comparing(o -> o.f1)); // 与上句代码同义按照第二个属性升序排序
                pos = 0;
                msg0 = "";
                msg1 = "";
                list.forEach(o -> {
                    if(pos++<1)
                        msg0 ="> "+o.f1+"\n";
                    else
                        msg1 ="> "+o.f1+"\n";
                    out.collect(" - " + o.f1);
                    
                    System.out.println("> "+o.f1);
                });
                sb.append(msg0).append(msg1);
                ets = System.currentTimeMillis();
            }
        })
        .getSideOutput(lateOutputTag).map(new MapFunction<Tuple2<String,String>, String>() {
            private static final long serialVersionUID = 341902L;
            @Override
            public String map(Tuple2<String, String> value) throws Exception {
                String msg = "[Expire Data]> "+value.f0+"->"+value.f1;
                sb.append(msg).append("\n");
                return msg;
            }
        }).print();

        env.execute("Flink WaterMark Test1");
        
        
        /* 
2 ============================================================
1674029100314[0] Key-> hello,EventTime:1553503188000, recordTimestamp=0
1674029100832[518] Key-> hello,EventTime:1553503186000, recordTimestamp=1553503188000
1674029101341[1027] Key-> hello,EventTime:1553503183000, recordTimestamp=1553503188000
[Expire Data]> hello->1553503183000
1674029101850[1536] Key-> hello,EventTime:1553503180000, recordTimestamp=1553503188000
[Expire Data]> hello->1553503180000
1674029102362[2048] Key-> hello,EventTime:1553503185000, recordTimestamp=1553503188000
1674029102872[2558] Key-> hello,EventTime:1553503188000, recordTimestamp=1553503188000
1674029103384[3070] Key-> hello,EventTime:1553503189000, recordTimestamp=1553503188000
1674029104392[4078] Key-> hello,EventTime:1553503188000, recordTimestamp=1553503189000
1674029105400[5086] Key-> hello,EventTime:1553503189000, recordTimestamp=1553503189000
1674029106404[6090] Key-> hello,EventTime:1553503190000, recordTimestamp=1553503189000
1674029107408[7094] Key-> hello,EventTime:1553503191000, recordTimestamp=1553503190000
1674029108420[8106] Key-> hello,EventTime:1553503186000, recordTimestamp=1553503191000
1674029109426[9112] Key-> hello,EventTime:1553503187000, recordTimestamp=1553503191000
1674029110433[10119] Key-> hello,EventTime:1553503185000, recordTimestamp=1553503191000
1674029111438[11124] Key-> hello,EventTime:1553503184000, recordTimestamp=1553503191000
[Expire Data]> hello->1553503184000
1674029112444[12130] Key-> hello,EventTime:1553503183000, recordTimestamp=1553503191000
[Expire Data]> hello->1553503183000
1674029113450[13136] Key-> hello,EventTime:1553503190000, recordTimestamp=1553503191000
1674029114464[14150] Key-> hello,EventTime:1553503192000, recordTimestamp=1553503191000
1674029115467[15153] Key-> hello,EventTime:1553503193000, recordTimestamp=1553503192000
1674029115625[19265] 当前窗口开始时间[1553503185000,结束时间1553503190000)
> 1553503185000
> 1553503189000
1674029116473[16159] Key-> hello,EventTime:1553503194000, recordTimestamp=1553503193000
1674029117488[17174] Key-> hello,EventTime:1553503195000, recordTimestamp=1553503194000
1674029118489[18175] Key-> hello,EventTime:1553503196000, recordTimestamp=1553503195000
1674029119489[19175] Key-> hello,EventTime:1553503197000, recordTimestamp=1553503196000
1674029120496[20182] Key-> hello,EventTime:1553503198000, recordTimestamp=1553503197000
1674029120716[5091] 当前窗口开始时间[1553503190000,结束时间1553503195000)
> 1553503190000
> 1553503194000
1674029121508[21194] Key-> hello,EventTime:1553503199000, recordTimestamp=1553503198000
1674029122516[22202] Key-> hello,EventTime:1553503200000, recordTimestamp=1553503199000
1674029123517[23203] Key-> hello,EventTime:1553503201000, recordTimestamp=1553503200000
1674029124533[24219] Key-> hello,EventTime:1553503202000, recordTimestamp=1553503201000
time use 1=24357, ets=1674029120717
1674029124553[3837] 当前窗口开始时间[1553503195000,结束时间1553503200000)
> 1553503195000
> 1553503199000
1674029124554[1] 当前窗口开始时间[1553503200000,结束时间1553503205000)
> 1553503200000
> 1553503202000
time use2=28256, sts=1674029096360

         */
        
    }
     public static void main(String[] args) throws Exception {
         WaterMarkTest test = new WaterMarkTest();
         test.run();
         
         System.out.println("2 ============================================================");
         sb.append("time use2="+(System.currentTimeMillis()-sts)+", sts="+sts+"\n");
         System.out.println(sb.toString());
     }
}

参考

https://blog.csdn.net/Vector97/article/details/110150925

https://blog.csdn.net/RonieWhite/article/details/114386907

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6