全部产品
Search
文档中心

Realtime Compute for Apache Flink:Analisis kerapatan lalu lintas real-time dengan Flink dan Sedona

更新时间:Nov 11, 2025

Realtime Compute for Apache Flink terintegrasi dengan Apache Sedona untuk menyediakan analisis geospasial yang andal, terdistribusi, dan real-time. Topik ini menggunakan skenario analisis kerapatan lalu lintas real-time sebagai contoh untuk menunjukkan cara memanggil fungsi spasial Sedona dalam Flink dan melakukan perhitungan geospasial secara efisien pada aliran data.

Batasan

Fitur ini hanya didukung di Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0.11 atau versi lebih baru.

Apache Sedona

Apache Sedona adalah kerangka kerja komputasi kinerja tinggi (HPC) terdistribusi yang dirancang untuk memproses data geospasial berskala besar.

Sedona 1.7.2 mendukung tiga fungsi agregat (AggregateFunction) dan lebih dari 200 fungsi skalar (ScalarFunction). Untuk informasi selengkapnya, lihat Dokumentasi API resmi.

Gunakan Sedona

Pekerjaan SQL

Bagian ini menjelaskan cara menulis pekerjaan SQL sederhana untuk analisis kerapatan lalu lintas real-time dan menerapkannya ke Realtime Compute for Apache Flink.

1. Unduh paket JAR

Unduh versi paket JAR yang diperlukan dari Repositori Maven Sedona, seperti sedona-flink-shaded_2.12-1.7.2.jar.

Atau, Anda dapat mengunduh paket terkompresi dari halaman unduhan resmi Sedona dan mengekstraknya untuk mendapatkan paket sedona-flink-shaded_2.12-1.7.2.jar untuk Flink.

image.png

2. Daftarkan JAR UDF

Di Konsol Realtime Compute, navigasi ke halaman Data Development > ETL/Data Query > Function. Klik tanda plus (+) untuk mendaftarkan JAR UDF. Unggah paket sedona-flink-shaded_2.12-1.7.2.jar yang telah diunduh dan pilih fungsi yang diperlukan untuk didaftarkan.

image.png

image.png

Catatan

Ketika Realtime Compute for Apache Flink secara otomatis mengekstrak nama fungsi, nama kelas luar ditambahkan sebagai awalan pada nama UDF yang merupakan kelas dalam. Perilaku ini merupakan default, seperti yang ditunjukkan pada gambar sebelumnya. Anda dapat mempertahankan nama fungsi default atau mengubahnya sesuai kebutuhan.

Setelah pendaftaran, Anda dapat melihat semua fungsi yang terdaftar di daftar fungsi pada halaman Data Development > ETL > Function.

image.png

3. Tulis pekerjaan dan daftarkan serializer

Di halaman Data Development > ETL > Stream Draft, buat draf aliran kosong dan tulis logika pekerjaan.

Penting

Pastikan untuk mengonfigurasi parameter pipeline.default-kryo-serializers di awal pekerjaan guna mendaftarkan serializer khusus, seperti yang ditunjukkan pada contoh berikut. Hal ini mengurangi beban serialisasi hingga sekitar dua pertiga. Untuk informasi selengkapnya tentang parameter tersebut, lihat pipeline.default-kryo-serializers.

SET 'pipeline.default-kryo-serializers' = 'class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde';

Realtime Compute for Apache Flink menyediakan kueri SQL berikut untuk analisis kerapatan lalu lintas real-time.

-- =============================================================================
--  0. Daftarkan serializer khusus
-- =============================================================================
SET 'pipeline.default-kryo-serializers' = 'class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde';

-- =============================================================================
--  1. Definisikan tabel sumber data
-- =============================================================================
-- Buat tabel sementara untuk mensimulasikan aliran data kontinu posisi kendaraan.
CREATE TEMPORARY TABLE vehicle_positions (
  `vehicle_id` STRING,
  `lon`        DOUBLE,
  `lat`        DOUBLE,
  `event_time` TIMESTAMP_LTZ(3),
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '2' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.vehicle_id.length' = '5',
  'fields.lon.min' = '116.3',
  'fields.lon.max' = '116.5',
  'fields.lat.min' = '39.85',
  'fields.lat.max' = '40.0'
);

-- =============================================================================
--  2. Definisikan tabel sink data
-- =============================================================================
-- Buat tabel sementara untuk mencetak hasil agregasi akhir ke konsol.
CREATE TEMPORARY TABLE print_sink (
  `area_name`     STRING,
  `vehicle_count` BIGINT,
  `window_end`    TIMESTAMP(3)
) WITH (
  'connector' = 'print',
  'print-identifier' = 'AREA_STATS_SQL'
);

-- =============================================================================
--  3. Logika ETL inti
-- =============================================================================
-- Hilangkan duplikasi dan hitung ID kendaraan dalam setiap area dan jendela.
INSERT INTO print_sink
SELECT
    F.area_name,
    COUNT(DISTINCT V.vehicle_id) AS vehicle_count,
    V.window_end
FROM
    TABLE(TUMBLE(TABLE vehicle_positions, DESCRIPTOR(event_time), INTERVAL '10' SECOND)) AS V
-- Rentang area
INNER JOIN (
    VALUES
        ('area_01', 'West District', 'POLYGON((116.30 39.90, 116.40 39.90, 116.40 39.95, 116.30 39.95, 116.30 39.90))'),
        ('area_02', 'East District', 'POLYGON((116.41 39.90, 116.48 39.90, 116.48 39.95, 116.41 39.95, 116.41 39.90))')
)
AS F(area_id, area_name, wkt_polygon)
-- Kondisi join adalah apakah kendaraan berada di dalam area yang ditentukan.
ON Predicates$ST_Contains(Constructors$ST_GeomFromWKT(F.wkt_polygon), Constructors$ST_Point(V.lon, V.lat))
GROUP BY
    V.window_start,
    V.window_end,
    F.area_id,
    F.area_name;

4. Terapkan pekerjaan

Di editor SQL, klik Terapkan di pojok kanan atas.

Setelah Anda menerapkan dan menjalankan kueri SQL dari bagian sebelumnya, Anda dapat melihat keluarannya di halaman Flink Web UI > Task Manager > Stdout, seperti yang ditunjukkan pada gambar berikut.

image.png

Pekerjaan JAR

Catatan

Dokumentasi resmi Sedona menyediakan beberapa Tutorial Table API dan sebuah Demo siap pakai sebagai referensi.

Bagian ini menjelaskan cara menulis pekerjaan JAR sederhana untuk analisis kerapatan lalu lintas real-time dan menerapkannya ke Realtime Compute for Apache Flink.

1. Tambahkan dependensi

Untuk menggunakan Sedona 1.7.2, tambahkan dependensi berikut sebagaimana dijelaskan dalam dokumentasi.

<dependency>
    <groupId>org.apache.sedona</groupId>
    <artifactId>sedona-flink-shaded_2.12</artifactId>
    <version>1.7.2</version>
</dependency>
<!-- Opsional: https://mvnrepository.com/artifact/org.datasyslab/geotools-wrapper -->
<dependency>
    <groupId>org.datasyslab</groupId>
    <artifactId>geotools-wrapper</artifactId>
    <version>1.7.2-28.5</version>
</dependency>

2. Inisialisasi lingkungan

Sebelum menulis logika pekerjaan, gunakan kode berikut untuk mendaftarkan fungsi dan serializer Sedona.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Daftarkan fungsi dan serializer
SedonaContext.create(env, tableEnv);

Jika Anda lebih memilih antarmuka TableEnvironment, Anda dapat menggunakan kode berikut untuk pendaftaran manual. Ini setara dengan menggunakan SedonaContext.

TableEnvironment tableEnv =
    TableEnvironment.create(EnvironmentSettings.newInstance().build());

TelemetryCollector.send("flink", "java");

// Daftarkan serializer
tableEnv.getConfig()
    .set(
        PipelineOptions.KRYO_DEFAULT_SERIALIZERS.key(),
        "class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;"
        + "class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde");

// Daftarkan fungsi
Arrays.stream(Catalog.getFuncs())
    .forEach(
        (func) -> {
            tableEnv.createTemporarySystemFunction(
                func.getClass().getSimpleName(), func);
        });
Arrays.stream(Catalog.getPredicates())
    .forEach(
        (func) -> {
            tableEnv.createTemporarySystemFunction(
                func.getClass().getSimpleName(), func);
        });

3. Tulis dan kemas pekerjaan

Tulis logika pekerjaan dan kemas menjadi file JAR.

Realtime Compute for Apache Flink menyediakan contoh kode pekerjaan untuk analisis kerapatan lalu lintas real-time, FlinkSedonaExample.zip, dan paket JAR-nya yang sesuai, FlinkSedonaExample-1.0-SNAPSHOT.jar. Kode ini berbasis Realtime Compute for Apache Flink VVR 8.0.11, yang sesuai dengan Apache Flink 1.17.2.

FlinkSedonaExample.zip

FlinkSedonaExample-1.0-SNAPSHOT.jar

4. Terapkan pekerjaan

Anda dapat menerapkan pekerjaan JAR di Konsol Realtime Compute. Navigasi ke Operation Center > Job O&M > Deploy Job > JAR Job.

Setelah Anda menerapkan dan menjalankan paket JAR dari bagian sebelumnya, Anda dapat melihat keluarannya di halaman Flink Web UI > Task Manager > Stdout, seperti yang ditunjukkan pada gambar berikut.

image.png