Realtime Compute for Apache Flink は Apache Sedona と統合され、強力で分散されたリアルタイムの地理空間分析を提供します。このトピックでは、リアルタイムの交通密度分析シナリオを例として、Flink で Sedona の空間関数を呼び出し、データストリームに対して効率的に地理空間計算を実行する方法を説明します。
制限
この機能は、Ververica Runtime (VVR) 8.0.11 以降を使用する Realtime Compute for Apache Flink でのみサポートされています。
Apache Sedona
Apache Sedona は、大規模な地理空間データを処理するために設計された、分散型のハイパフォーマンスコンピューティング (HPC) フレームワークです。
Sedona 1.7.2 は、3 つの集計関数 (AggregateFunction) と 200 以上のスカラー関数 (ScalarFunction) をサポートしています。詳細については、公式の「API Docs」をご参照ください。
Sedona の使用
SQL ジョブ
このセクションでは、リアルタイム交通密度分析用のシンプルな SQL ジョブを作成し、Realtime Compute for Apache Flink にデプロイする方法について説明します。
1. JAR パッケージのダウンロード
Sedona Maven リポジトリ から、sedona-flink-shaded_2.12-1.7.2.jar などの必要な JAR パッケージバージョンをダウンロードします。
または、Sedona 公式ダウンロードページ から圧縮パッケージをダウンロードし、解凍して Flink 用の sedona-flink-shaded_2.12-1.7.2.jar パッケージを取得することもできます。

2. UDF JAR の登録
Realtime Compute コンソールで、[データ開発] > [ETL]/[データクエリ] > [関数] ページに移動します。プラス記号 (+) をクリックして UDF JAR を登録します。ダウンロードした sedona-flink-shaded_2.12-1.7.2.jar パッケージをアップロードし、登録する必要な関数を選択します。


Realtime Compute for Apache Flink が関数名を自動的に抽出する際、内部クラスである UDF の名前には、外部クラス名がプレフィックスとして追加されます。この動作は、上の図に示すようにデフォルトです。デフォルトの関数名をそのまま使用することも、必要に応じて変更することもできます。
登録後、[データ開発] > [ETL] > [関数] ページの関数リストですべての登録済み関数を表示できます。

3. ジョブの作成とシリアライザーの登録
[データ開発] > [ETL] > [ストリームドラフト] ページで、空のストリームドラフトを作成し、ジョブログを記述します。
次の例に示すように、ジョブの冒頭で pipeline.default-kryo-serializers パラメーターを設定して、専用のシリアライザーを登録してください。これにより、シリアル化のオーバーヘッドが約 3 分の 2 削減されます。パラメーターの詳細については、「pipeline.default-kryo-serializers」をご参照ください。
SET 'pipeline.default-kryo-serializers' = 'class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde';Realtime Compute for Apache Flink は、リアルタイム交通密度分析のために次の SQL クエリを提供します。
-- =============================================================================
-- 0. 専用シリアライザーの登録
-- =============================================================================
SET 'pipeline.default-kryo-serializers' = 'class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde';
-- =============================================================================
-- 1. データソーステーブルの定義
-- =============================================================================
-- 車両位置の連続データストリームをシミュレートするための一時テーブルを作成します。
CREATE TEMPORARY TABLE vehicle_positions (
`vehicle_id` STRING,
`lon` DOUBLE,
`lat` DOUBLE,
`event_time` TIMESTAMP_LTZ(3),
WATERMARK FOR `event_time` AS `event_time` - INTERVAL '2' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.vehicle_id.length' = '5',
'fields.lon.min' = '116.3',
'fields.lon.max' = '116.5',
'fields.lat.min' = '39.85',
'fields.lat.max' = '40.0'
);
-- =============================================================================
-- 2. データシンクテーブルの定義
-- =============================================================================
-- 最終的な集約結果をコンソールに出力するための一時テーブルを作成します。
CREATE TEMPORARY TABLE print_sink (
`area_name` STRING,
`vehicle_count` BIGINT,
`window_end` TIMESTAMP(3)
) WITH (
'connector' = 'print',
'print-identifier' = 'AREA_STATS_SQL'
);
-- =============================================================================
-- 3. コア ETL ロジック
-- =============================================================================
-- 各エリアとウィンドウ内で車両 ID を重複排除してカウントします。
INSERT INTO print_sink
SELECT
F.area_name,
COUNT(DISTINCT V.vehicle_id) AS vehicle_count,
V.window_end
FROM
TABLE(TUMBLE(TABLE vehicle_positions, DESCRIPTOR(event_time), INTERVAL '10' SECOND)) AS V
-- エリア範囲
INNER JOIN (
VALUES
('area_01', 'West District', 'POLYGON((116.30 39.90, 116.40 39.90, 116.40 39.95, 116.30 39.95, 116.30 39.90))'),
('area_02', 'East District', 'POLYGON((116.41 39.90, 116.48 39.90, 116.48 39.95, 116.41 39.95, 116.41 39.90))')
)
AS F(area_id, area_name, wkt_polygon)
-- 結合条件は、車両が定義されたエリア内にあるかどうかです。
ON Predicates$ST_Contains(Constructors$ST_GeomFromWKT(F.wkt_polygon), Constructors$ST_Point(V.lon, V.lat))
GROUP BY
V.window_start,
V.window_end,
F.area_id,
F.area_name;4. ジョブのデプロイ
SQL エディターで、右上隅にある [デプロイ] をクリックします。
前のセクションの SQL クエリをデプロイして開始した後、次の図に示すように、[Flink Web UI] > [タスクマネージャー] > [Stdout] ページで出力を表示できます。

JAR ジョブ
このセクションでは、リアルタイム交通密度分析用のシンプルな JAR ジョブを作成し、Realtime Compute for Apache Flink にデプロイする方法について説明します。
1. 依存関係の追加
ドキュメント の説明に従って、Sedona 1.7.2 を使用するには、次の依存関係を追加します。
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-flink-shaded_2.12</artifactId>
<version>1.7.2</version>
</dependency>
<!-- Optional: https://mvnrepository.com/artifact/org.datasyslab/geotools-wrapper -->
<dependency>
<groupId>org.datasyslab</groupId>
<artifactId>geotools-wrapper</artifactId>
<version>1.7.2-28.5</version>
</dependency>2. 環境の初期化
ジョブログを記述する前に、次のコードを使用して Sedona の関数とシリアライザーを登録します。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Register functions and serializers
SedonaContext.create(env, tableEnv);TableEnvironment インターフェイスを使用する場合は、次のコードを使用して手動で登録できます。これは SedonaContext を使用するのと同じです。
TableEnvironment tableEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().build());
TelemetryCollector.send("flink", "java");
// シリアライザーの登録
tableEnv.getConfig()
.set(
PipelineOptions.KRYO_DEFAULT_SERIALIZERS.key(),
"class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;"
+ "class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde");
// 関数の登録
Arrays.stream(Catalog.getFuncs())
.forEach(
(func) -> {
tableEnv.createTemporarySystemFunction(
func.getClass().getSimpleName(), func);
});
Arrays.stream(Catalog.getPredicates())
.forEach(
(func) -> {
tableEnv.createTemporarySystemFunction(
func.getClass().getSimpleName(), func);
});3. ジョブの作成とパッケージ化
ジョブログを記述し、JAR ファイルにパッケージ化します。
Realtime Compute for Apache Flink は、リアルタイム交通密度分析用のサンプルジョブコード FlinkSedonaExample.zip と、それに対応する JAR パッケージ FlinkSedonaExample-1.0-SNAPSHOT.jar を提供します。このコードは、Apache Flink 1.17.2 に対応する Realtime Compute for Apache Flink VVR 8.0.11 に基づいています。
4. ジョブのデプロイ
Realtime Compute コンソールで JAR ジョブをデプロイできます。[オペレーションセンター] > [ジョブ O&M] > [ジョブのデプロイ] > [JAR ジョブ] に移動します。
前のセクションの JAR パッケージをデプロイして開始した後、次の図に示すように、[Flink Web UI] > [タスクマネージャー] > [Stdout] ページで出力を表示できます。
