すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink と Sedona を使用したリアルタイム交通密度の分析

最終更新日:Nov 10, 2025

Realtime Compute for Apache Flink は Apache Sedona と統合され、強力で分散されたリアルタイムの地理空間分析を提供します。このトピックでは、リアルタイムの交通密度分析シナリオを例として、Flink で Sedona の空間関数を呼び出し、データストリームに対して効率的に地理空間計算を実行する方法を説明します。

制限

この機能は、Ververica Runtime (VVR) 8.0.11 以降を使用する Realtime Compute for Apache Flink でのみサポートされています。

Apache Sedona

Apache Sedona は、大規模な地理空間データを処理するために設計された、分散型のハイパフォーマンスコンピューティング (HPC) フレームワークです。

Sedona 1.7.2 は、3 つの集計関数 (AggregateFunction) と 200 以上のスカラー関数 (ScalarFunction) をサポートしています。詳細については、公式の「API Docs」をご参照ください。

Sedona の使用

SQL ジョブ

このセクションでは、リアルタイム交通密度分析用のシンプルな SQL ジョブを作成し、Realtime Compute for Apache Flink にデプロイする方法について説明します。

1. JAR パッケージのダウンロード

Sedona Maven リポジトリ から、sedona-flink-shaded_2.12-1.7.2.jar などの必要な JAR パッケージバージョンをダウンロードします。

または、Sedona 公式ダウンロードページ から圧縮パッケージをダウンロードし、解凍して Flink 用の sedona-flink-shaded_2.12-1.7.2.jar パッケージを取得することもできます。

image.png

2. UDF JAR の登録

Realtime Compute コンソールで、[データ開発] > [ETL]/[データクエリ] > [関数] ページに移動します。プラス記号 (+) をクリックして UDF JAR を登録します。ダウンロードした sedona-flink-shaded_2.12-1.7.2.jar パッケージをアップロードし、登録する必要な関数を選択します。

image.png

image.png

説明

Realtime Compute for Apache Flink が関数名を自動的に抽出する際、内部クラスである UDF の名前には、外部クラス名がプレフィックスとして追加されます。この動作は、上の図に示すようにデフォルトです。デフォルトの関数名をそのまま使用することも、必要に応じて変更することもできます。

登録後、[データ開発] > [ETL] > [関数] ページの関数リストですべての登録済み関数を表示できます。

image.png

3. ジョブの作成とシリアライザーの登録

[データ開発] > [ETL] > [ストリームドラフト] ページで、空のストリームドラフトを作成し、ジョブログを記述します。

重要

次の例に示すように、ジョブの冒頭で pipeline.default-kryo-serializers パラメーターを設定して、専用のシリアライザーを登録してください。これにより、シリアル化のオーバーヘッドが約 3 分の 2 削減されます。パラメーターの詳細については、「pipeline.default-kryo-serializers」をご参照ください。

SET 'pipeline.default-kryo-serializers' = 'class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde';

Realtime Compute for Apache Flink は、リアルタイム交通密度分析のために次の SQL クエリを提供します。

-- =============================================================================
--  0. 専用シリアライザーの登録
-- =============================================================================
SET 'pipeline.default-kryo-serializers' = 'class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde';

-- =============================================================================
--  1. データソーステーブルの定義
-- =============================================================================
-- 車両位置の連続データストリームをシミュレートするための一時テーブルを作成します。
CREATE TEMPORARY TABLE vehicle_positions (
  `vehicle_id` STRING,
  `lon`        DOUBLE,
  `lat`        DOUBLE,
  `event_time` TIMESTAMP_LTZ(3),
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '2' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.vehicle_id.length' = '5',
  'fields.lon.min' = '116.3',
  'fields.lon.max' = '116.5',
  'fields.lat.min' = '39.85',
  'fields.lat.max' = '40.0'
);

-- =============================================================================
--  2. データシンクテーブルの定義
-- =============================================================================
-- 最終的な集約結果をコンソールに出力するための一時テーブルを作成します。
CREATE TEMPORARY TABLE print_sink (
  `area_name`     STRING,
  `vehicle_count` BIGINT,
  `window_end`    TIMESTAMP(3)
) WITH (
  'connector' = 'print',
  'print-identifier' = 'AREA_STATS_SQL'
);

-- =============================================================================
--  3. コア ETL ロジック
-- =============================================================================
-- 各エリアとウィンドウ内で車両 ID を重複排除してカウントします。
INSERT INTO print_sink
SELECT
    F.area_name,
    COUNT(DISTINCT V.vehicle_id) AS vehicle_count,
    V.window_end
FROM
    TABLE(TUMBLE(TABLE vehicle_positions, DESCRIPTOR(event_time), INTERVAL '10' SECOND)) AS V
-- エリア範囲
INNER JOIN (
    VALUES
        ('area_01', 'West District', 'POLYGON((116.30 39.90, 116.40 39.90, 116.40 39.95, 116.30 39.95, 116.30 39.90))'),
        ('area_02', 'East District', 'POLYGON((116.41 39.90, 116.48 39.90, 116.48 39.95, 116.41 39.95, 116.41 39.90))')
)
AS F(area_id, area_name, wkt_polygon)
-- 結合条件は、車両が定義されたエリア内にあるかどうかです。
ON Predicates$ST_Contains(Constructors$ST_GeomFromWKT(F.wkt_polygon), Constructors$ST_Point(V.lon, V.lat))
GROUP BY
    V.window_start,
    V.window_end,
    F.area_id,
    F.area_name;

4. ジョブのデプロイ

SQL エディターで、右上隅にある [デプロイ] をクリックします。

前のセクションの SQL クエリをデプロイして開始した後、次の図に示すように、[Flink Web UI] > [タスクマネージャー] > [Stdout] ページで出力を表示できます。

image.png

JAR ジョブ

説明

Sedona の公式ドキュメントには、参考としていくつかの Table API チュートリアル と、すぐに使用できる デモ が用意されています。

このセクションでは、リアルタイム交通密度分析用のシンプルな JAR ジョブを作成し、Realtime Compute for Apache Flink にデプロイする方法について説明します。

1. 依存関係の追加

ドキュメント の説明に従って、Sedona 1.7.2 を使用するには、次の依存関係を追加します。

<dependency>
    <groupId>org.apache.sedona</groupId>
    <artifactId>sedona-flink-shaded_2.12</artifactId>
    <version>1.7.2</version>
</dependency>
<!-- Optional: https://mvnrepository.com/artifact/org.datasyslab/geotools-wrapper -->
<dependency>
    <groupId>org.datasyslab</groupId>
    <artifactId>geotools-wrapper</artifactId>
    <version>1.7.2-28.5</version>
</dependency>

2. 環境の初期化

ジョブログを記述する前に、次のコードを使用して Sedona の関数とシリアライザーを登録します。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Register functions and serializers
SedonaContext.create(env, tableEnv);

TableEnvironment インターフェイスを使用する場合は、次のコードを使用して手動で登録できます。これは SedonaContext を使用するのと同じです。

TableEnvironment tableEnv =
    TableEnvironment.create(EnvironmentSettings.newInstance().build());

TelemetryCollector.send("flink", "java");

// シリアライザーの登録
tableEnv.getConfig()
    .set(
        PipelineOptions.KRYO_DEFAULT_SERIALIZERS.key(),
        "class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;"
        + "class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde");

// 関数の登録
Arrays.stream(Catalog.getFuncs())
    .forEach(
        (func) -> {
            tableEnv.createTemporarySystemFunction(
                func.getClass().getSimpleName(), func);
        });
Arrays.stream(Catalog.getPredicates())
    .forEach(
        (func) -> {
            tableEnv.createTemporarySystemFunction(
                func.getClass().getSimpleName(), func);
        });

3. ジョブの作成とパッケージ化

ジョブログを記述し、JAR ファイルにパッケージ化します。

Realtime Compute for Apache Flink は、リアルタイム交通密度分析用のサンプルジョブコード FlinkSedonaExample.zip と、それに対応する JAR パッケージ FlinkSedonaExample-1.0-SNAPSHOT.jar を提供します。このコードは、Apache Flink 1.17.2 に対応する Realtime Compute for Apache Flink VVR 8.0.11 に基づいています。

FlinkSedonaExample.zip

FlinkSedonaExample-1.0-SNAPSHOT.jar

4. ジョブのデプロイ

Realtime Compute コンソールで JAR ジョブをデプロイできます。[オペレーションセンター] > [ジョブ O&M] > [ジョブのデプロイ] > [JAR ジョブ] に移動します。

前のセクションの JAR パッケージをデプロイして開始した後、次の図に示すように、[Flink Web UI] > [タスクマネージャー] > [Stdout] ページで出力を表示できます。

image.png