环球时讯:聊聊Flink必知必会(四)
概述
Flink Streaming API借鉴了谷歌数据流模型(Google Data Flow Model),它的流API支持不同的时间概念。Flink明确支持以下3个不同的时间概念。
(资料图)
Flink明确支持以下3个不同的时间概念。(1)事件时间:事件发生的时间,由产生(或存储)事件的设备记录。
(2)接入时间:Flink在接入事件时记录的时间戳。
(3)处理时间:管道中特定操作符处理事件的时间。
支持事件时间的流处理器需要一种方法来度量事件时间的进度。在Flink中测量事件时间进展的机制是水印(watermark)。水印是一种特殊类型的事件,是告诉系统事件时间进度的一种方式。水印流是数据流的一部分,并带有时间戳t。水印(t)声明事件时间已经到达该流中的时间t,这意味着时间戳t′≤t(时间戳更早或等于水印的事件)的流中不应该有更多的元素。
Flink中水印的处理
水印的时间戳
时间t的水印标记了数据流中的一个位置,并断言此时的流在时间t之前已经完成。为了执行基于事件时间的事件处理,Flink需要知道与每个事件相关联的时间,它还需要包含水印的流。水印就是系统事件时间的时钟。水印触发是基于事件时间的计时器的触发。
事件流的类型有两种,一个是顺序的,一个是无序的。先看顺序场景下,水印的排列。
对于无序流,水印是至关重要的,其中事件不是按照它们的时间戳排序的。
例如,当操作符接收到w(11)这条水印时,可以认为时间戳小于或等于11的数据已经到达,此时可以触发计算。同样,当接收到w(17)这条水印时,可以认为时间戳小于或等于17的数据已经到达,此时可以触发计算。
可以看出,水印的时间戳是单调递增的,时间戳为t的水印意味着所有后续记录的时间戳将大于t。一般来讲,水印是一种声明,在流中的那个点之前,即在某个时间戳之前的所有事件都应该已经到达。
水印是在源函数处或直接在源函数之后生成的。源函数的每个并行子任务通常可以独立地生成水印。这些水印定义了特定并行源处的事件时间。
水印的生成
Flink提供了用于处理事件时间、时间戳和水印的API。
为了处理事件时间,Flink流程序需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过TimestampAssigner从元素中的某个字段访问/提取时间戳实现的。
Flink提供了两种方式创建水印。
1.使用WatermarkStrategy上的静态辅助方法实现公共水印策略:
2.实现WatermarkStrategy接口,自定义TimestampAssigner与WatermarkGenerator捆绑在一起:
@Publicpublic interface WatermarkStrategy extends TimestampAssignerSupplier, WatermarkGeneratorSupplier { @Override WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); @Override default TimestampAssigner createTimestampAssigner( TimestampAssignerSupplier.Context context) { return new RecordTimestampAssigner<>(); } @Experimental default WatermarkAlignmentParams getAlignmentParameters() { return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED; } default WatermarkStrategy withTimestampAssigner( TimestampAssignerSupplier timestampAssigner) { checkNotNull(timestampAssigner, "timestampAssigner"); return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner); } default WatermarkStrategy withTimestampAssigner( SerializableTimestampAssigner timestampAssigner) { checkNotNull(timestampAssigner, "timestampAssigner"); return new WatermarkStrategyWithTimestampAssigner<>( this, TimestampAssignerSupplier.of(timestampAssigner)); } default WatermarkStrategy withIdleness(Duration idleTimeout) { checkNotNull(idleTimeout, "idleTimeout"); checkArgument( !(idleTimeout.isZero() || idleTimeout.isNegative()), "idleTimeout must be greater than zero"); return new WatermarkStrategyWithIdleness<>(this, idleTimeout); } @Experimental default WatermarkStrategy withWatermarkAlignment( String watermarkGroup, Duration maxAllowedWatermarkDrift) { return withWatermarkAlignment( watermarkGroup, maxAllowedWatermarkDrift, WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL); } @Experimental default WatermarkStrategy withWatermarkAlignment( String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) { return new WatermarksWithWatermarkAlignment( this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval); } static WatermarkStrategy forMonotonousTimestamps() { return (ctx) -> new AscendingTimestampsWatermarks<>(); } static WatermarkStrategy forBoundedOutOfOrderness(Duration maxOutOfOrderness) { return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness); } static WatermarkStrategy forGenerator(WatermarkGeneratorSupplier generatorSupplier) { return generatorSupplier::createWatermarkGenerator; } static WatermarkStrategy noWatermarks() { return (ctx) -> new NoWatermarksGenerator<>(); }}
这里面提供了很多静态的方法和带有缺省实现的方法,只有一个方法是非default和没有缺省实现的,就是createWatermarkGenerator
方法。
所以默认情况下,我们只需要实现这个方法就行了,这个方法主要是返回一个 WatermarkGenerator。
@Publicpublic interface WatermarkGenerator {/** * Called for every event, allows the watermark generator to examine and remember the * event timestamps, or to emit a watermark based on the event itself. */void onEvent(T event, long eventTimestamp, WatermarkOutput output);/** * Called periodically, and might emit a new watermark, or not. * * The interval in which this method is called and Watermarks are generated * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. */void onPeriodicEmit(WatermarkOutput output);}
这个方法简单明了,主要是有两个方法:
onEvent :每个元素都会调用这个方法,如果我们想依赖每个元素生成一个水印,然后发射到下游(可选,就是看是否用output来收集水印),我们可以实现这个方法.
onPeriodicEmit : 如果数据量比较大的时候,我们每条数据都生成一个水印的话,会影响性能,所以这里还有一个周期性生成水印的方法。这个水印的生成周期可以这样设置:
env.getConfig().setAutoWatermarkInterval(5000L)