在 Flink 中自定義觸發(fā)器需要實(shí)現(xiàn) Trigger 接口,該接口定義如下:
public interface Trigger<T, W extends Window> extends Serializable {
// 初始化觸發(fā)器
void open(TriggerContext ctx) throws Exception;
// 每次元素到來(lái)時(shí)都會(huì)調(diào)用此方法,決定是否觸發(fā)窗口計(jì)算
TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
// 每次處理時(shí)間定時(shí)器到來(lái)時(shí)都會(huì)調(diào)用此方法,決定是否觸發(fā)窗口計(jì)算
TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
// 每次事件時(shí)間定時(shí)器到來(lái)時(shí)都會(huì)調(diào)用此方法,決定是否觸發(fā)窗口計(jì)算
TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
// 當(dāng)窗口計(jì)算完成時(shí)會(huì)調(diào)用此方法
void clear(W window, TriggerContext ctx) throws Exception;
// 序列化
default void write(DataOutputView out) throws IOException {}
// 反序列化
default void read(DataInputView in) throws IOException {}
}
自定義觸發(fā)器需要實(shí)現(xiàn) onElement、onProcessingTime、onEventTime、clear 這幾個(gè)方法,并在 open 方法中對(duì)觸發(fā)器進(jìn)行初始化。此外,TriggerContext 提供了一些上下文信息,可以在觸發(fā)器中使用。通過(guò)實(shí)現(xiàn) Trigger 接口,可以根據(jù)自己的業(yè)務(wù)需求定義觸發(fā)邏輯,實(shí)現(xiàn)更靈活的窗口計(jì)算方式。