Process Function
處理函式(Process Function)是一個低階的流轉換操作,相比MapFunction除了基本功能,還可以訪問記錄的時間戳和水位,並支援註冊一個在將來某個特定時間觸發的計時器,處理函式的側輸出功能還允許將記錄傳送到多個側輸出流。Flink SQL支援的大多數功能都是用處理函式實現的。
Flink提供了8種不同的處理函式,ProcessFunction、KeyedProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction。。。。,這些函式適用於不同的上下文,但是功能都是相似的,我們以KeyedProcessFunction為例討論這些函式的通用的功能及其實現。
Process Function
KeyedProcessFunction作用於KeyedStream上,生成 一個新的DataStream,該轉換函式會針對流中的每條記錄呼叫一次,並emit零個、一個或多個記錄。函式內定義了以下兩個核心方法:
processElement(I value, Context ctx, Collector
onTimer(long timestamp, OnTimerContext ctx, Collector
Process Function有兩個類Context、OnTimerContext,這兩個類都提供了timerService()方法,用於獲取到時間服務(TimerService)。這兩個類分別作為引數傳遞到了processElement、onTimer方法內。
應用KeyedProcessFunction的流程:
stream。keyBy(。。。)。process(new MyProcessFunction())
KeyedProcessFunction
TimerService提供了一下方法:
currentProcessingTime:返回當前的處理時間
currentWatermark:返回當前水位時間戳
registerProcessingTimeTImer:針對當前key註冊一個processing time計時器,當處理時間到達註冊的時間時,計時器會被觸發
registerEventTimeTimer:針對當前key註冊一個event-time計時器,當水位大於等於註冊的時間時,計時器會被觸發
對於每個key和時間戳只能註冊一個計時器,也就是說每個key可以有多個計時器,具體到每個時間戳只能有一個,註冊方法會做去重操作,onTime函式僅僅會呼叫一次。預設情況下event-time、processing-time計時器會分別放到了兩個不同的優先佇列堆裡。以過去時間戳註冊的計時器不會被刪除,同樣也會被處理,處理時間的計時器會在註冊方法返回後立即觸發,事件時間計時器會在處理下一條水位線時觸發。
TimerService的一個簡單實現是SimpleTimerService類,該類中計時器由InternalTimerServiceManager來管理,呼叫計時器的註冊方法,會進一步呼叫manager的註冊方法。計時器的觸發也是由manager管理的,event-time是水位觸發、processing-time是定時觸發。
event-time計時器觸發機制:manager接收到一個新生成的水位時,會將小於等於水位的所有計時器依次觸發:
processing time是用ScheduledThreadPoolExecutor定時觸發的:
當計時器觸發時,Process Function的onTimer函式會被呼叫,下面會分析onTimer是怎麼被回撥執行的。
KeyedProcessFunction
接下來分析Process Function呼叫過程,這裡還以KeyedProcessFunction為例,KeyedProcessFunction會委託給KeyedProcessOperator執行,KeyedProcessOperator是一個實現了OneInputStreamOperator作用在單流操作上的運算元,OneInputStreamOperator其中兩個函式:
processElement:處理到達運算元的元素,operator傳遞給Process Function的processElement方法做進一步處理;
processWatermark:處理接收到新生成的水位,呼叫InternalTimerServiceManager的advanceWatermark函式(該函式會觸發event-time計時器),併發送水位;
KeyedProcessOperator同時實現了Triggerable介面,介面定義了兩個函式:
onEventTime:當event-time計時器觸發時被呼叫
onProcessingTime:當processing-time計時器觸發時被呼叫
KeyedProcessOperator在實現這兩個方法內直接呼叫Process Function的onTimer函式,所以當觸發計時器時,會回撥Process Function onTimer函式:
這樣我們可以在Process Function內實現一些基於時間的自定義視窗邏輯或者類似某些鍵不再使用後清除鍵狀態的功能。