Realtime Compute for Apache Flink は、DataStream プログラムでの動的 Flink 複雑なイベント処理 (CEP) をサポートし、ルールを動的に更新します。このトピックでは、最新のルールを動的に読み込んでアップストリームの Kafka メッセージを処理する Flink ジョブを開発する方法について説明します。
動的 Flink CEP のユースケース
Flink CEP は、Apache Flink の分散型、サブミリ秒の待機時間でのデータ処理能力を活用して、データストリーム内の複雑なデータパターンを検出します。これには、以下を含む幅広い実世界のアプリケーションがあります。
リアルタイムのリスク管理: Flink CEP は、ユーザーの行動ログを分析して、異常な顧客にフラグを立てることができます。たとえば、5 分間のウィンドウ内で 10 回のトランザクションで合計 10,000 米ドルを転送した場合などです。
リアルタイムマーケティング: ユーザーの行動ログを使用することで、Flink CEP はマーケティング戦略を最適化するための洞察を提供できます。たとえば、10 分以内に 3 つ以上の商品をカートに追加したが支払いをしないユーザーを特定するなどです。Flink CEP は、リアルタイムマーケティングにおける詐欺検出にも役立ちます。
IoT: Flink CEP は、異常検出とアラートを可能にします。たとえば、特定のエリアから 15 分以上離れている共有自転車にフラグを立てることができます。Flink CEP の別の例は、IoT センサーデータに基づいて工業生産における組立ラインの異常を特定することです。たとえば、センサーからの温度データが 3 つの連続したタイムウィンドウにわたって一貫してしきい値を超えた場合、アラートがトリガーされます。
概要
このトピックでは、動的 Flink CEP を実装するための実践的なガイドを提供します。例として、Flink が Kafka からユーザーのクリックストリームデータを消費し、MySQL から動的にルールをフェッチし、一致するイベントを検出する方法を示します。パターンの一致が見つかると、Flink ジョブはアラートをトリガーしたり、結果をデータストアに書き込んだりできます。データパイプラインのグラフは次のとおりです。
最初に、Flink ジョブを開始し、ルール 1 を挿入します。ルール 1 は、action = 0 のイベントが 3 回発生し、その後に action != 1 のイベントが続くというものです。このルールは、支払いをせずに製品ページを 3 回連続で表示したユーザーにフラグを立てます。次に、ルール 1 はルール 2 に更新され、適時性の制約が導入されます。action = 0 の 3 つの連続したイベントが 15 分のタイムウィンドウ内で発生するというものです。更新されたルールは、30 分以内に製品を購入せずに製品ページを繰り返し表示するユーザーを特定します。
前提条件
Realtime Compute for Apache Flink ワークスペースが作成されていること。詳細については、「Realtime Compute for Apache Flink をアクティブ化する」をご参照ください。
RAM ユーザーまたは RAM ロールに、Realtime Compute for Apache Flink のコンソールにアクセスするために必要な権限があること。詳細については、「権限管理」をご参照ください。
アップストリームおよびダウンストリームストレージ
ApsaraDB RDS for MySQL インスタンスが作成されていること。詳細については、「RDS for MySQL インスタンスの作成」をご参照ください。
ApsaraMQ for Kafka インスタンスが作成されていること。詳細については、「ApsaraMQ for Kafka の概要」をご参照ください。
手順
以下のセクションでは、事前定義されたルールに一致するアクションを持つユーザーを検索し、ルールを動的に更新する Flink プログラムを開発する方法について説明します。手順は次のとおりです。
ステップ 1: テストデータを準備する
アップストリーム Kafka Topic の作成
ApsaraMQ for Kafka コンソールにログオンします。
シミュレートされたユーザー行動ログを保存するために、demo_topic という名前の Topic を作成します。
詳細については、「ステップ 1: Topic を作成する」をご参照ください。
ApsaraDB RDS for MySQL データベースの準備
Data Management (DMS) コンソールで、ApsaraDB RDS for MySQL データベースにテストデータを準備します。
特権アカウントを使用して ApsaraDB RDS for MySQL インスタンスにログオンします。
詳細については、「DMS コンソールで RDS インスタンスにログオンする」をご参照ください。
ルールを保存するための rds_demo という名前のテーブルと、見つかった一致を受け取るための match_results という名前のテーブルを作成します。
次のステートメントを SQL エディターにコピーし、[Execute(F8)] をクリックします。
CREATE DATABASE cep_demo_db; USE cep_demo_db; CREATE TABLE rds_demo ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); CREATE TABLE match_results ( rule_id INT, rule_version INT, user_id INT, user_name VARCHAR(255), production_id INT, PRIMARY KEY (rule_id,rule_version,user_id,production_id) );rds_demo テーブルの各行はルールを表し、id (ルールの一意の識別子)、version、pattern、および function (一致したイベントシーケンスの処理方法を定義) の 4 つのフィールドで構成されます。
match_results テーブルの各行は、見つかった一致を表し、特定のパターンに沿ったユーザーの行動を示します。このレコードは、マーケティングチームがユーザーにクーポンを提供するなど、情報に基づいた意思決定を行うのに役立ちます。
ステップ 2: IP アドレスホワイトリストを構成する
Flink ワークスペースが ApsaraDB RDS for MySQL インスタンスにアクセスできるようにするには、Flink ワークスペースの CIDR ブロックを ApsaraDB RDS for MySQL インスタンスの IP アドレスホワイトリストに追加する必要があります。
Flink ワークスペースで使用される vSwitch の CIDR ブロックを取得します。
ターゲットワークスペースを見つけ、[アクション] 列で を選択します。
[ワークスペース詳細] ダイアログボックスで、vSwitch の CIDR ブロックをコピーします。

CIDR ブロックを ApsaraDB RDS for MySQL インスタンスの IP アドレスホワイトリストに追加します。
詳細については、ApsaraDB RDS for MySQL ドキュメントの「IP アドレスホワイトリストの構成」をご参照ください。

ステップ 3: 動的 CEP ジョブを開発する
このトピックのすべてのコードファイルは GitHub リポジトリからダウンロードできます。デモンストレーションの目的で、このトピックのサンプルコードは timeOrMoreAndWindow ブランチで変更されています。完全なコードについては、ververica-cep-demo-master.zip パッケージをダウンロードしてください。
プロジェクトの pom.xml ファイルに flink-cep 依存関係を追加します。
依存関係の構成と競合の処理については、「Apache Flink の環境依存関係を構成する」をご参照ください。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-cep</artifactId> <version>1.17-vvr-8.0.8</version> <scope>provided</scope> </dependency>動的 CEP プログラムのコードを記述します。
Kafka ソースを作成します。
詳細については、「Kafka DataStream コネクタ」をご参照ください。
CEP.dynamicPatterns() メソッドを使用して変換を定義します。
動的なルール変更を有効にし、複数のルールをサポートするために、Alibaba Cloud Realtime Compute for Apache Flink は CEP.dynamicPatterns() メソッドを定義しています。
public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns( DataStream<T> input, PatternProcessorDiscovererFactory<T> discovererFactory, TimeBehaviour timeBehaviour, TypeInformation<R> outTypeInfo)次の表に、必須パラメーターを示します。プレースホルダーの値を実際の構成値に置き換えてください。
パラメーター
説明
DataStream<T> input
入力イベントストリーム。
PatternProcessorDiscovererFactory<T> discovererFactory
PatternProcessorDiscoverer を構築するファクトリオブジェクト。PatternProcessorDiscoverer は最新のルールを取得して PatternProcessor インターフェイスを構築します。
TimeBehaviour timeBehaviour
Flink CEP ジョブがイベントを処理する方法を定義する時間属性。有効な値:
TimeBehaviour.ProcessingTime: イベントは処理時間に基づいて処理されます。
TimeBehaviour.EventTime: イベントはイベント時間に基づいて処理されます。
TypeInformation<R> outTypeInfo
出力ストリームの型情報。
DataStream、TimeBehavior、TypeInformation などの概念については、「Flink DataStream API プログラミングガイド」、「時間の概念: イベント時間と処理時間」、および「クラス TypeInformation<T>」をご参照ください。
PatternProcessor インターフェイスには、イベントを照合する方法を定義する Pattern と、アラートの送信など、照合されたイベントを処理する方法を指定する PatternProcessFunction が含まれています。さらに、id や version などの識別プロパティも含まれています。詳細については、「FLIP-200: 複数のルールと動的なルール変更のサポート (Flink CEP)」をご参照ください。
PatternProcessorDiscovererFactory は、最新の PatternProcessor をフェッチする PatternProcessorDiscoverer を構築します。Realtime Compute for Apache Flink は、外部ストレージを定期的にスキャンする抽象クラスを提供します。次のコードは、外部ストレージを定期的にポーリングして最新の PatternProcessor をフェッチするタイマーを構築します。
public abstract class PeriodicPatternProcessorDiscoverer<T> implements PatternProcessorDiscoverer<T> { ... @Override public void discoverPatternProcessorUpdates( PatternProcessorManager<T> patternProcessorManager) { // パターンプロセッサの更新を定期的に検出します。 timer.schedule( new TimerTask() { @Override public void run() { if (arePatternProcessorsUpdated()) { List<PatternProcessor<T>> patternProcessors = null; try { patternProcessors = getLatestPatternProcessors(); } catch (Exception e) { e.printStackTrace(); } patternProcessorManager.onPatternProcessorsUpdated(patternProcessors); } } }, 0, intervalMillis); } ... }Realtime Compute for Apache Flink は、ApsaraDB RDS や Hologres などの JDBC データベースから最新のルールをフェッチするための JDBCPeriodicPatternProcessorDiscoverer の実装も提供しています。次の表に、必須パラメーターを示します。
パラメーター
説明
jdbcUrl
データベースの JDBC URL。
jdbcDriver
データベースドライバークラスの名前。
tableName
データベース内のテーブルの名前。
initialPatternProcessors
初期 PatternProcessors のリスト。
intervalMillis
データベースがポーリングされる間隔。
次のサンプルコードは、JDBCPeriodicPatternProcessorDiscoverer の使用例を示しています。一致するイベントは TaskManager のログに出力されます。
// import ...... public class CepDemo { public static void main(String[] args) throws Exception { ...... // DataStream ソース DataStreamSource<Event> source = env.fromSource( kafkaSource, WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner((event, ts) -> event.getEventTime()), "Kafka Source"); env.setParallelism(1); // userId と productionId で keyBy // 注意: 同じキーを持つイベントのみが処理され、一致があるかどうかが確認されます KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream = source.assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5))) ).keyBy(new KeySelector<Event, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> getKey(Event value) throws Exception { return Tuple2.of(value.getId(), value.getProductionId()); } }); SingleOutputStreamOperator<String> output = CEP.dynamicPatterns( keyedStream, new JDBCPeriodicPatternProcessorDiscovererFactory<>( params.get(JDBC_URL_ARG), JDBC_DRIVE, params.get(TABLE_NAME_ARG), null, Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))), Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ? TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime, TypeInformation.of(new TypeHint<String>() {})); output.print(); // ジョブをコンパイルして送信します env.execute("CEPDemo"); } }説明デモコードでは、
CEP.dynamicPatterns()が呼び出される前に、入力データストリームが userId と productionId でキー付けされます。これにより、Flink は同じ userId と productionId を持つイベント内のパターンのみを検索します。
Realtime Compute for Apache Flink コンソールで、プログラム JAR をアップロードし、JAR デプロイメントを作成します。
次の表に、デプロイメントの作成時に構成する必要があるパラメーターを示します。
説明テストを容易にするために、cep-demo.jar をダウンロードし、そこから JAR デプロイメントを作成します。アップストリームの Kafka ソースにデータが保存されておらず、ルールテーブルが空であるため、デプロイメントを開始しても出力は返されません。
パラメーター
説明
デプロイメントモード
ストリームモードを選択します。
デプロイメント名
JAR デプロイメントの名前を入力します。
エンジンバージョン
エンジンバージョンの詳細については、「エンジンバージョン」および「ライフサイクルポリシー」をご参照ください。推奨バージョンまたは安定バージョンの使用をお勧めします。エンジンバージョンは次のタイプに分類されます。
推奨バージョン: 最新のメジャーバージョンの最新のマイナーバージョン。
安定バージョン: プロダクトのサービス期間内にあるメジャーバージョンの最新のマイナーバージョン。以前のバージョンの不具合は、このようなバージョンで修正されています。
通常バージョン: プロダクトのサービス期間内にあるその他のマイナーバージョン。
非推奨バージョン: プロダクトのサービス期間を超えたバージョン。
JAR URL
プログラム JAR またはテスト用の cep-demo.jar をアップロードします。
エントリポイントクラス
値を
com.alibaba.ververica.cep.demo.CepDemoに設定します。エントリポイントのメイン引数
アップストリームおよびダウンストリームシステムが構成されたプログラム JAR を使用する場合は、このフィールドをスキップします。提供されているテスト用の cep-demo.jar を使用する場合は、次のコードをフィールドにコピーします。
--kafkaBrokers YOUR_KAFKA_BROKERS --inputTopic YOUR_KAFKA_TOPIC --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD --tableName YOUR_TABLE_NAME --jdbcIntervalMs 3000 --usingEventTime false上記のコードのプレースホルダーの値を次のように置き換えます。
kafkaBrokers: Kafka ブローカーのアドレス。
inputTopic: Kafka Topic の名前。
inputTopicGroup: Kafka 使用者グループ。
jdbcUrl: データベースの JDBC URL。
説明JDBC URL を介してデータベースに接続するには、文字と数字のみで構成されるパスワードを持つ標準アカウントを使用します。ビジネス要件に基づいて、デプロイメントに認証方式を使用できます。
tableName: 宛先テーブルの名前。
jdbcIntervalMs: データベースがポーリングされる間隔。
usingEventTime: イベント時間を使用するかどうかを指定します。有効な値: true と false。
説明プレースホルダーの値を実際の構成値に置き換えてください。
本番環境では、プレーンテキストの認証情報ではなく変数を使用してください。詳細については、「変数の管理」をご参照ください。
[デプロイメント] ページの [デプロイメント] タブで、[パラメーター] セクションの [編集] をクリックします。次に、[その他の構成] フィールドに次のパラメーターを追加します。
flink-cep依存関係は AppClassLoader に依存し、ユーザー JAR 内のaviatorクラスは UserCodeClassLoader に依存します。読み込みの失敗を防ぎ、AppClassLoader がユーザー JAR 内のクラスにアクセスできるようにするには、次の構成を追加します。kubernetes.application-mode.classpath.include-user-jar: 'true' classloader.resolve-order: parent-first[パラメーター] セクションでパラメーターを構成する方法の詳細については、「パラメーター」をご参照ください。
に移動します。ターゲットデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。
詳細については、「デプロイメントの開始」をご参照ください。
ステップ 4: ルールを追加する
JAR デプロイメントが開始された後、ルール 1 を追加します。ルール 1 は、action = 0 の 3 つの連続したイベントの後に、action != 1 のイベントが続くというものです。このイベントシーケンスは、ユーザーが製品ページを 3 回表示したが支払いをしなかったことを示します。
ApsaraDB RDS for MySQL コンソールにログオンします。
ルールを追加します。
パターンを定義する JSON 文字列を id、version、および function フィールドと連結し、INSERT INTO 文を実行して ApsaraDB RDS for MySQL データベースのルールテーブルにデータを挿入します。
INSERT INTO rds_demo ( `id`, `version`, `pattern`, `function` ) values( '1', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}', 'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction') ;データベース内の pattern フィールドの読みやすさを向上させるため、Realtime Compute for Apache Flink では JSON フォーマットでパターンを定義できます。詳細については、「動的 Flink CEP における JSON フォーマットでのルールの定義」をご参照ください。前の SQL 文の pattern フィールドの値は、Realtime Compute for Apache Flink でサポートされている JSON フォーマットのシリアル化されたパターン文字列の例です。このパターンは、action = 0 の 3 つの連続したイベントの直後に action != 1 のイベントが続くイベントシーケンスに一致します。
説明EndCondition は、「action != 1」をチェックするためにコードで定義されています。
Pattern API を使用してパターンを定義します。
Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new StartCondition("action == 0")) .timesOrMore(3) .followedBy("end") .where(new EndCondition());CepJsonUtils の convertPatternToJSONString メソッドを使用して JSON 文字列に変換できます。
public void printTestPattern(Pattern<?, ?> pattern) throws JsonProcessingException { System.out.println(CepJsonUtils.convertPatternToJSONString(pattern)); }JSON でパターンを定義します。
Kafka クライアントを使用して demo_topic Topic にメッセージを送信します。
このデモでは、ApsaraMQ for Kafka コンソールの demo_topic Topic の [メッセージの送受信を開始] パネルでテストメッセージを送信することもできます。
1,Ken,0,1,1662022777000 1,Ken,0,1,1662022778000 1,Ken,0,1,1662022779000 1,Ken,0,1,1662022780000
次の表に、demo_topic Topic のメッセージのフィールドを示します。
フィールド
説明
id
ユーザー ID。
username
ユーザー名。
action
ユーザーのアクション。有効な値:
0: 表示操作。
1: 購入操作。
product_id
プロダクト ID。
event_time
アクションが実行されたイベント時間。
JobManager ログに出力された最新のルールと、TaskManager ログに出力された見つかった一致を表示します。
JobManager ログで、JDBCPeriodicPatternProcessorDiscoverer をキーワードとして使用して最新のルールを検索します。

[ログ] タブの [実行中のタスクマネージャー] サブタブで、.out サフィックスが付いたログファイルを選択します。
A match for Pattern of (id, version): (1, 1)を検索し、ログエントリを表示します。
match_results テーブルで、
SELECT * FROM `match_results` ;を実行して見つかった一致をクエリします。
ステップ 5: ルールを更新する
カスタマイズされたマーケティング戦略では、通常、適時性が考慮されます。したがって、ルール 2 が挿入されます。ルール 2 は、action = 0 の 3 つの連続したイベントが 15 分のタイムウィンドウ内で発生するというものです。
usingEventTimeをtrueに設定します。に移動します。ターゲットデプロイメントを見つけ、[アクション] 列の [キャンセル] をクリックします。
デプロイメントの名前をクリックします。デプロイメント詳細ページで、[構成] タブを選択します。[基本] セクションの右上隅にある [編集] をクリックします。[エントリポイントのメイン引数] フィールドで、
usingEventTimeをtrueに設定します。[保存] をクリックします。デプロイメントを再度[開始] します。
新しいルールを挿入します。
Pattern API を使用して Java でパターンを定義します。
Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new StartCondition("action == 0")) .timesOrMore(3,Time.minutes(15)) .followedBy("end") .where(new EndCondition()); printTestPattern(pattern);rds_demo テーブルに新しいルールを挿入します。
# デモンストレーションのため、ルール 1 は削除されます。 DELETE FROM `rds_demo` WHERE `id` = 1; # ルール 2 を挿入: action = 0 の 3 つの連続したイベントが 15 分のウィンドウで発生し、その後に action != 1 のイベントが続く。ルールバージョンは (1,2) INSERT INTO rds_demo (`id`,`version`,`pattern`,`function`) values('1',2,'{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":{"unit":"MINUTES","size":15}},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');ApsaraMQ for Kafka コンソールで 8 つの簡略化されたメッセージを送信します。
例:
2,Tom,0,1,1739584800000 #10:00 2,Tom,0,1,1739585400000 #10:10 2,Tom,0,1,1739585700000 #10:15 2,Tom,0,1,1739586000000 #10:20 3,Ali,0,1,1739586600000 #10:30 3,Ali,0,1,1739588400000 #11:00 3,Ali,0,1,1739589000000 #11:10 3,Ali,0,1,1739590200000 #11:30match_results テーブルで、
SELECT * FROM `match_results` ;を実行して見つかった一致をクエリします。
この結果は、Tom の行動は事前定義されたパターンに一致するが、Ali の行動は一致しないことを示しています。これは、Ali のクリックが 15 分の時間制約を満たしていないためです。これらの洞察により、マーケティングチームは期間限定のセール中にターゲットを絞った介入を実施できます。たとえば、特定の時間枠内に製品ページを繰り返し表示するユーザーにクーポンを発行して、購入を促すことができます。