實時流計算系統設計與實現之數據處理:事件序列分析

2024年2月6日 23点热度 0人点赞

事件序列分析

CEP通過分析事件流中事件之間的關系(如時間關系、空間關系、聚合關系、依賴關系等)產生一個具有更高層次含義的復合事件。

那我們為什麼要挖掘事件流中事件之間的關系呢?

這是因為,有些時候單獨發生的一個事件,可能並沒有十分明顯或有用的業務含義,但是當這個事件是發生在特定的上下文背景中,並且與其他事件產生關聯時,這些發生在一起的多個事件就具有更加復雜的業務含義了。

根據CEP所發生的復合事件,我們可以結合其業務含義做出一些有用的推斷和決策,這就是CEP的價值所在。

下面列舉幾個CEP經常被用到的場景。

·銀行卡異常檢測。如果一張銀行卡在30分鐘內,連續3次轉賬給不同銀行卡,或者15分鐘內在2個不同城市取款,則意味著該銀行卡行為異常,有可能被盜或被騙,需要給持卡人發送告警短信並采取相應措施。

·工廠環境監控。某紙筒生產車間為了保證安全生產,在車間安裝了溫度傳感器,當溫度傳感器上報的環境溫度記錄出現1次高溫事件時,需要發送輕微告警,而當30秒內連續兩次出現高溫事件時,則需要發出嚴重告警了。

·推薦系統。如果用戶在10分鐘之類單擊了3次同類商品,那麼他很有可能對該類商品感興趣,之後可以更加主動地給他推薦同類商品。

·離職員工數據泄露檢測。如果員工最近經常訪問招聘網站,電子郵件的附件很大,還用USB復制數據,那麼該員工準備離職的可能性就比較大,公司需要提前采取措施。除了以上列舉的幾個例子外,CEP使用的場景還有很多。CEP是一個令筆者覺得非常有趣的技術,因為隻要我們設置好了感興趣的事件發生模式,之後就會從數據流中不斷冒出符合我們所設置模式的事件序列。這些事件序列有著明確的業務含義,告訴我們現在系統發生什麼,我們要做什麼。CEP的這種工作模式真有點兒“春種一粒粟,秋收萬顆子”的大豐收即視感,這正是筆者覺得它有趣的原因。

接下來,我們就來看看CEP的編程模式和方法。

CEP編程模式

CEP的實現方式有多種,比較常見的有自動機、匹配樹、Petri網、有向圖等。這裡我們不具體討論CEP的實現方式,因為這超出了本問文范圍,我們把重點放在CEP技術的使用上。提供CEP功能的產品比較豐富,如WSO2 CEP(Siddhi)、Drools、Pulsar、Esper、Flink CEP等,這些產品各有特色且名聲都不小,感興趣的讀者可以自行查閱相關資料。這裡我們以Flink CEP為例來說明如何使用CEP。

在Flink CEP的實現中,事件間的各種各樣的關系被抽象為模式(pattern)。在定義好模式後,將這個模式設置到數據流上,之後當數據流過時,如果匹配到定義的模式,就會觸發一個復合事件。這個復合事件包含所有參與這次模式匹配的事件。為了方便用戶定義事件間的關系,也就是模式,Flink CEP提供了豐富的API。Flink CEP常用API如表4-3所示。

表4-3 Flink CEP常用API

註意,表4-3隻列舉了Flink CEP的部分API,實際上Flink CEP還有很多其他API,在這裡我們就不一一列舉出來了,建議感興趣的讀者自行參考Flink官方文檔。

Flink CEP實例

下面我們以倉庫環境溫度監控為例來演示Flink CEP在實際場景中的運用。假設現在我們收到公司老板的需求,需要監控倉庫的環境溫度,以及時發現和避免火災。我們使用的溫度傳感器每秒上報一次事件到基於Flink的實時流計算系統。我們設定告警規則如下,當15秒內兩次監控溫度超過閾值時發出預警,當30秒內產生兩次預警事件且第二次預警溫度比第一次預警溫度高時就發出嚴重告警。

接下來就是具體的Flin.CEP實現了。首先,定義“15秒內兩次監控溫度超過閾值”的模式:

DataStream<JSONObject> temperatureStream = env

.addSource(new PeriodicSourceFunction())

.assignTimestampsAndWatermarks(new EventTimestampPeriodicWatermarks())

.setParallelism(1);

Pattern<JSONObject, JSONObject> alarmPattern = Pattern.<JSONObject>begin

("alarm")

.where(new SimpleCondition<JSONObject>() {

@Override

public boolean filter(JSONObject value) throws Exception {

return value.getDouble("temperature") > 100.0d;

}

})

.times(2)

.within(Time.seconds(15));

在上面的代碼中,我們用begin定義一個模式alarm,再用where指定了我們關註的是溫度高於100℃的事件;然後用times配合within,指定高溫事件在15秒內發生兩次才發出預警。

然後,我們將預警模式安裝到溫度事件流上:

DataStream<JSONObject> alarmStream = CEP.pattern(temperatureStream, alarmPattern)

.select(new PatternSelectFunction<JSONObject, JSONObject>() {

@Override public JSONObject select(Map<String, List<JSONObject>> pattern) throws

Exception {

return pattern.get("alarm").stream()

.max(Comparator.comparingDouble(o -> o.getLongValue

("temperature")))

.orElseThrow(() -> new IllegalStateException("should contains

2 events, but none"));

}

}).setParallelism(1);

在上面的代碼中,我們將預警模式alarmPattern安裝到溫度事件流temperatureStream上。當溫度事件流上有匹配到預警模式的事件時,就會發出一個預警事件,這是用select函數完成的。select函數指定了發出的預警事件是兩個高溫事件中溫度更高的那個事件。

接下來,定義嚴重告警模式:

Pattern<JSONObject, JSONObject> criticalPattern = Pattern.

<JSONObject>begin("critical")

.times(2)

.within(Time.seconds(30));

與預警模式的定義類似,在上面的代碼中,我們定義了嚴重告警模式,即“在30秒內發生兩次”。

再將告警模式安裝在告警事件流上:

DataStream<JSONObject> criticalStream = CEP.pattern(alarmStream, criticalPattern)

.flatSelect(new PatternFlatSelectFunction<JSONObject, JSONObject>() {

@Override

public void flatSelect(Map<String, List<JSONObject>> pattern,

Collector<JSONObject> out) throws Exception {

List<JSONObject> critical = pattern.get("critical");

JSONObject first = critical.get(0);

JSONObject second = critical.get(1);

if (first.getLongValue("temperature") <

second.getLongValue("temperature")) {

JSONObject jsonObject = new JSONObject();

jsonObject.putAll(second);

out.collect(jsonObject);

} }

}).setParallelism(1);

這一次,我們的告警模式不再是安裝在溫度事件流上,而是安裝在步驟2中的預警事件流上。當預警事件流中有匹配告警模式的事件(在30秒內發生兩次預警)時,就觸發告警。不過這裡還有一個要求沒有達到,即第二次預警溫度比第一次預警溫度高,這是通過flatSelect來實現的,在flatSelect中,我們設定隻有第二次預警溫度比第一次預警溫度高時,才將告警事件輸出至out.collect。

至此,一個關於倉庫環境溫度監控的CEP應用就實現了。

本篇文章給大傢講解的內容是數據處理:事件序列分析

下篇文章給大傢講解的內容是數據處理: 模型學習和預測