Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。每个 WindowAssigner 都有一个默认的 Trigger。如果默认 trigger 无法满足你的需要,你可以在 trigger(…) 调用中指定自定义的 trigger。
窗口的计算触发依赖于窗口触发器,每种类型的窗口都有对应的窗口触发机制,都有一个默认的窗口触发器,触发器的作用就是去控制什么时候来触发计算。flink内部定义多种触发器,每种触发器对应于不同的WindowAssigner。常见的触发器如下:
Trigger 接口提供了五个方法来响应不同的事件:
触发器接口的源码如下:
@PublicEvolvingpublic abstract class Trigger<T, W extends Window> implements Serializable { private static final long serialVersionUID = -4104633972991191369L; /** * Called for every element that gets added to a pane. The result of this will determine whether * the pane is evaluated to emit results. * * @param element The element that arrived. * @param timestamp The timestamp of the element that arrived. * @param window The window to which the element is being added. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; /** * Called when a processing-time timer that was set using the trigger context fires. * * @param time The timestamp at which the timer fired. * @param window The window for which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; /** * Called when an event-time timer that was set using the trigger context fires. * * @param time The timestamp at which the timer fired. * @param window The window for which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; /** * Returns true if this trigger supports merging of trigger state and can therefore be used with * a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}. * * <p>If this returns {@code true} you must properly implement {@link #onMerge(Window, * OnMergeContext)} */ public boolean canMerge() { return false; } /** * Called when several windows have been merged into one window by the {@link * org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. * * @param window The new window that results from the merge. * @param ctx A context object that can be used to register timer callbacks and access state. */ public void onMerge(W window, OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } /** * Clears any state that the trigger might still hold for the given window. This is called when * a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and * {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as * state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}. */ public abstract void clear(W window, TriggerContext ctx) throws Exception; // ------------------------------------------------------------------------ /** * A context object that is given to {@link Trigger} methods to allow them to register timer * callbacks and deal with state. */ public interface TriggerContext { // ... } /** * Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window, * OnMergeContext)}. */ public interface OnMergeContext extends TriggerContext { <S extends MergingState<?, ?>> void mergePartitionedState( StateDescriptor<S, ?> stateDescriptor); }}
关于上述方法,需要注意三件事:
(1)前三个方法返回TriggerResult枚举类型,其包含四个枚举值:
源码如下:
public enum TriggerResult { // 不触发,也不删除元素 CONTINUE(false, false), // 触发窗口,窗口出发后删除窗口中的元素 FIRE_AND_PURGE(true, true), // 触发窗口,但是保留窗口元素 FIRE(true, false), // 不触发窗口,丢弃窗口,并且删除窗口的元素 PURGE(false, true); // ------------------------------------------------------------------------ private final boolean fire; private final boolean purge; TriggerResult(boolean fire, boolean purge) { this.purge = purge; this.fire = fire; } public boolean isFire() { return fire; } public boolean isPurge() { return purge; }}
(2) 每一个窗口分配器都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除,当定时器触发后,会调用对应的回调返回,返回TriggerResult。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。
@PublicEvolvingpublic class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private ProcessingTimeTrigger() {} @Override public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { // only register a timer if the time is not yet past the end of the merged window // this is in line with the logic in onElement(). If the time is past the end of // the window onElement() will fire and setting a timer here would fire the window twice. long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } @Override public String toString() { return "ProcessingTimeTrigger()"; } /** Creates a new trigger that fires once system time passes the end of the window. */ public static ProcessingTimeTrigger create() { return new ProcessingTimeTrigger(); }}
在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())将会注册一个ProcessingTime定时器,时间参数是window.maxTimestamp(),也就是窗口的最终时间,当时间到达这个窗口最终时间,定时器触发并调用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,触发窗口中数据的计算,但是会保留窗口元素。
需要注意的是ProcessingTimeTrigger类只会在窗口的最终时间到达的时候触发窗口函数的计算,计算完成后并不会清除窗口中的数据,这些数据存储在内存中,除非调用PURGE或FIRE_AND_PURGE,否则数据将一直存在内存中。实际上,Flink中提供的Trigger类,除了PurgingTrigger类,其他的都不会对窗口中的数据进行清除。
EventTimeTriggerr在onElement设置的定时器:
图片
EventTime通过registerEventTimeTimer注册定时器,在内部Watermark达到或超过Timer设定的时间戳时触发。
图片
当一个元素进入stream中之后,一般要经历Window(开窗)、Trigger(触发器)、Evitor(移除器)、Windowfunction(窗口计算操作),具体过程如下:
现在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪个位置,让我们继续看为何使用Evictor。
Evictor接口定义如下:
图片
evictBefore()包含要在窗口函数之前应用的清除逻辑,而evictAfter()包含要在窗口函数之后应用的清除逻辑。应用窗口函数之前清除的元素将不会被窗口函数处理。
窗格是具有相同Key和相同窗口的元素组成的桶,即同一个窗口中相同Key的元素一定属于同一个窗格。一个元素可以在多个窗格中(当一个元素被分配给多个窗口时),这些窗格都有自己的清除器实例。
注:window默认没有evictor,一旦把window指定Evictor,该window会由EvictWindowOperator类来负责操作。
2.2 Flink内置的Evitor
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { if (size <= maxCount) { // 小于最大数量,不做处理 return; } else { int evictedCount = 0; for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){ iterator.next(); evictedCount++; if (evictedCount > size - maxCount) { break; } else { // 移除前size - maxCount个元素,只剩下最后maxCount个元素 iterator.remove(); } } }}
DeltaEvictor通过计算DeltaFunction的值(依次传入每个元素和最后一个元素),并将其与threshold进行对比,如果DeltaFunction计算结果大于等于threshold,则该元素会被移除。DeltaEvictor的实现如下:
private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) { // 获取最后一个元素 TimestampedValue<T> lastElement = Iterables.getLast(elements); for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){ TimestampedValue<T> element = iterator.next(); // 依次计算每个元素和最后一个元素的delta值,同时和threshold的值进行比较 // 若计算结果大于threshold值或者是相等,则该元素会被移除 if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) { iterator.remove(); } }}
TimeEvictor以时间为判断标准,决定元素是否会被移除。TimeEvictor会获取窗口中所有元素的最大时间戳currentTime,currentTime减去窗口大小(windowSize) 可得到能保留最久的元素的时间戳evictCutoff,然后再遍历窗口中的元素,如果元素的时间戳小于evictCutoff,就执行移除操作,否则不移除。具体逻辑如下图所示:
TimeEvictor的代码实现如下:
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { // 如果element没有timestamp,直接返回 if (!hasTimestamp(elements)) { return; } // 获取elements中最大的时间戳(到来最晚的元素的时间) long currentTime = getMaxTimestamp(elements); // 截止时间为: 到来最晚的元素的时间 - 窗口大小(可以理解为保留最近的多久的元素) long evictCutoff = currentTime - windowSize; for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) { TimestampedValue<Object> record = iterator.next(); // 清除所有时间戳小于截止时间的元素 if (record.getTimestamp() <= evictCutoff) { iterator.remove(); } }}
本文链接:http://www.28at.com/showinfo-26-79842-0.html聊聊Flink:这次把Flink的触发器(Trigger)、移除器(Evictor)讲透
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com