Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。每个 WindowAssigner 都有一个默认的 Trigger。如果默认 trigger 无法满足你的需要,你可以在 trigger(…) 调用中指定自定义的 trigger。
Trigger 接口提供了五个方法来响应不同的事件:
@PublicEvolvingpublic abstract class Trigger<T, W extends Window> implements Serializable { private static final long serialVersionUID = -4104633972991191369L; /** * Called for every element that gets added to a pane. The result of this will determine whether * the pane is evaluated to emit results. * * @param element The element that arrived. * @param timestamp The timestamp of the element that arrived. * @param window The window to which the element is being added. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; /** * Called when a processing-time timer that was set using the trigger context fires. * * @param time The timestamp at which the timer fired. * @param window The window for which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; /** * Called when an event-time timer that was set using the trigger context fires. * * @param time The timestamp at which the timer fired. * @param window The window for which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; /** * Returns true if this trigger supports merging of trigger state and can therefore be used with * a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}. * * <p>If this returns {@code true} you must properly implement {@link #onMerge(Window, * OnMergeContext)} */ public boolean canMerge() { return false; } /** * Called when several windows have been merged into one window by the {@link * org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. * * @param window The new window that results from the merge. * @param ctx A context object that can be used to register timer callbacks and access state. */ public void onMerge(W window, OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } /** * Clears any state that the trigger might still hold for the given window. This is called when * a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and * {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as * state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}. */ public abstract void clear(W window, TriggerContext ctx) throws Exception; // ------------------------------------------------------------------------ /** * A context object that is given to {@link Trigger} methods to allow them to register timer * callbacks and deal with state. */ public interface TriggerContext { // ... } /** * Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window, * OnMergeContext)}. */ public interface OnMergeContext extends TriggerContext { <S extends MergingState<?, ?>> void mergePartitionedState( StateDescriptor<S, ?> stateDescriptor); }}
public enum TriggerResult { // 不触发,也不删除元素 CONTINUE(false, false), // 触发窗口,窗口出发后删除窗口中的元素 FIRE_AND_PURGE(true, true), // 触发窗口,但是保留窗口元素 FIRE(true, false), // 不触发窗口,丢弃窗口,并且删除窗口的元素 PURGE(false, true); // ------------------------------------------------------------------------ private final boolean fire; private final boolean purge; TriggerResult(boolean fire, boolean purge) { this.purge = purge; = fire; } public boolean isFire() { return fire; } public boolean isPurge() { return purge; }}
(2) 每一个窗口分配器都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除,当定时器触发后,会调用对应的回调返回,返回TriggerResult。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。
@PublicEvolvingpublic class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private ProcessingTimeTrigger() {} @Override public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { // only register a timer if the time is not yet past the end of the merged window // this is in line with the logic in onElement(). If the time is past the end of // the window onElement() will fire and setting a timer here would fire the window twice. long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } @Override public String toString() { return "ProcessingTimeTrigger()"; } /** Creates a new trigger that fires once system time passes the end of the window. */ public static ProcessingTimeTrigger create() { return new ProcessingTimeTrigger(); }}
在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())将会注册一个ProcessingTime定时器,时间参数是window.maxTimestamp(),也就是窗口的最终时间,当时间到达这个窗口最终时间,定时器触发并调用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,触发窗口中数据的计算,但是会保留窗口元素。
2.2 Flink内置的Evitor
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { if (size <= maxCount) { // 小于最大数量,不做处理 return; } else { int evictedCount = 0; for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){; evictedCount++; if (evictedCount > size - maxCount) { break; } else { // 移除前size - maxCount个元素,只剩下最后maxCount个元素 iterator.remove(); } } }}
private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) { // 获取最后一个元素 TimestampedValue<T> lastElement = Iterables.getLast(elements); for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){ TimestampedValue<T> element =; // 依次计算每个元素和最后一个元素的delta值,同时和threshold的值进行比较 // 若计算结果大于threshold值或者是相等,则该元素会被移除 if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) { iterator.remove(); } }}
TimeEvictor以时间为判断标准,决定元素是否会被移除。TimeEvictor会获取窗口中所有元素的最大时间戳currentTime,currentTime减去窗口大小(windowSize) 可得到能保留最久的元素的时间戳evictCutoff,然后再遍历窗口中的元素,如果元素的时间戳小于evictCutoff,就执行移除操作,否则不移除。具体逻辑如下图所示:
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { // 如果element没有timestamp,直接返回 if (!hasTimestamp(elements)) { return; } // 获取elements中最大的时间戳(到来最晚的元素的时间) long currentTime = getMaxTimestamp(elements); // 截止时间为: 到来最晚的元素的时间 - 窗口大小(可以理解为保留最近的多久的元素) long evictCutoff = currentTime - windowSize; for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) { TimestampedValue<Object> record =; // 清除所有时间戳小于截止时间的元素 if (record.getTimestamp() <= evictCutoff) { iterator.remove(); } }}