Realtime Compute for Apache Flink terintegrasi dengan Apache Sedona untuk menyediakan analisis geospasial yang andal, terdistribusi, dan real-time. Topik ini menggunakan skenario analisis kerapatan lalu lintas real-time sebagai contoh untuk menunjukkan cara memanggil fungsi spasial Sedona dalam Flink dan melakukan perhitungan geospasial secara efisien pada aliran data.
Batasan
Fitur ini hanya didukung di Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0.11 atau versi lebih baru.
Apache Sedona
Apache Sedona adalah kerangka kerja komputasi kinerja tinggi (HPC) terdistribusi yang dirancang untuk memproses data geospasial berskala besar.
Sedona 1.7.2 mendukung tiga fungsi agregat (AggregateFunction) dan lebih dari 200 fungsi skalar (ScalarFunction). Untuk informasi selengkapnya, lihat Dokumentasi API resmi.
Gunakan Sedona
Pekerjaan SQL
Bagian ini menjelaskan cara menulis pekerjaan SQL sederhana untuk analisis kerapatan lalu lintas real-time dan menerapkannya ke Realtime Compute for Apache Flink.
1. Unduh paket JAR
Unduh versi paket JAR yang diperlukan dari Repositori Maven Sedona, seperti sedona-flink-shaded_2.12-1.7.2.jar.
Atau, Anda dapat mengunduh paket terkompresi dari halaman unduhan resmi Sedona dan mengekstraknya untuk mendapatkan paket sedona-flink-shaded_2.12-1.7.2.jar untuk Flink.

2. Daftarkan JAR UDF
Di Konsol Realtime Compute, navigasi ke halaman Data Development > ETL/Data Query > Function. Klik tanda plus (+) untuk mendaftarkan JAR UDF. Unggah paket sedona-flink-shaded_2.12-1.7.2.jar yang telah diunduh dan pilih fungsi yang diperlukan untuk didaftarkan.


Ketika Realtime Compute for Apache Flink secara otomatis mengekstrak nama fungsi, nama kelas luar ditambahkan sebagai awalan pada nama UDF yang merupakan kelas dalam. Perilaku ini merupakan default, seperti yang ditunjukkan pada gambar sebelumnya. Anda dapat mempertahankan nama fungsi default atau mengubahnya sesuai kebutuhan.
Setelah pendaftaran, Anda dapat melihat semua fungsi yang terdaftar di daftar fungsi pada halaman Data Development > ETL > Function.

3. Tulis pekerjaan dan daftarkan serializer
Di halaman Data Development > ETL > Stream Draft, buat draf aliran kosong dan tulis logika pekerjaan.
Pastikan untuk mengonfigurasi parameter pipeline.default-kryo-serializers di awal pekerjaan guna mendaftarkan serializer khusus, seperti yang ditunjukkan pada contoh berikut. Hal ini mengurangi beban serialisasi hingga sekitar dua pertiga. Untuk informasi selengkapnya tentang parameter tersebut, lihat pipeline.default-kryo-serializers.
SET 'pipeline.default-kryo-serializers' = 'class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde';Realtime Compute for Apache Flink menyediakan kueri SQL berikut untuk analisis kerapatan lalu lintas real-time.
-- =============================================================================
-- 0. Daftarkan serializer khusus
-- =============================================================================
SET 'pipeline.default-kryo-serializers' = 'class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde';
-- =============================================================================
-- 1. Definisikan tabel sumber data
-- =============================================================================
-- Buat tabel sementara untuk mensimulasikan aliran data kontinu posisi kendaraan.
CREATE TEMPORARY TABLE vehicle_positions (
`vehicle_id` STRING,
`lon` DOUBLE,
`lat` DOUBLE,
`event_time` TIMESTAMP_LTZ(3),
WATERMARK FOR `event_time` AS `event_time` - INTERVAL '2' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.vehicle_id.length' = '5',
'fields.lon.min' = '116.3',
'fields.lon.max' = '116.5',
'fields.lat.min' = '39.85',
'fields.lat.max' = '40.0'
);
-- =============================================================================
-- 2. Definisikan tabel sink data
-- =============================================================================
-- Buat tabel sementara untuk mencetak hasil agregasi akhir ke konsol.
CREATE TEMPORARY TABLE print_sink (
`area_name` STRING,
`vehicle_count` BIGINT,
`window_end` TIMESTAMP(3)
) WITH (
'connector' = 'print',
'print-identifier' = 'AREA_STATS_SQL'
);
-- =============================================================================
-- 3. Logika ETL inti
-- =============================================================================
-- Hilangkan duplikasi dan hitung ID kendaraan dalam setiap area dan jendela.
INSERT INTO print_sink
SELECT
F.area_name,
COUNT(DISTINCT V.vehicle_id) AS vehicle_count,
V.window_end
FROM
TABLE(TUMBLE(TABLE vehicle_positions, DESCRIPTOR(event_time), INTERVAL '10' SECOND)) AS V
-- Rentang area
INNER JOIN (
VALUES
('area_01', 'West District', 'POLYGON((116.30 39.90, 116.40 39.90, 116.40 39.95, 116.30 39.95, 116.30 39.90))'),
('area_02', 'East District', 'POLYGON((116.41 39.90, 116.48 39.90, 116.48 39.95, 116.41 39.95, 116.41 39.90))')
)
AS F(area_id, area_name, wkt_polygon)
-- Kondisi join adalah apakah kendaraan berada di dalam area yang ditentukan.
ON Predicates$ST_Contains(Constructors$ST_GeomFromWKT(F.wkt_polygon), Constructors$ST_Point(V.lon, V.lat))
GROUP BY
V.window_start,
V.window_end,
F.area_id,
F.area_name;4. Terapkan pekerjaan
Di editor SQL, klik Terapkan di pojok kanan atas.
Setelah Anda menerapkan dan menjalankan kueri SQL dari bagian sebelumnya, Anda dapat melihat keluarannya di halaman Flink Web UI > Task Manager > Stdout, seperti yang ditunjukkan pada gambar berikut.

Pekerjaan JAR
Bagian ini menjelaskan cara menulis pekerjaan JAR sederhana untuk analisis kerapatan lalu lintas real-time dan menerapkannya ke Realtime Compute for Apache Flink.
1. Tambahkan dependensi
Untuk menggunakan Sedona 1.7.2, tambahkan dependensi berikut sebagaimana dijelaskan dalam dokumentasi.
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-flink-shaded_2.12</artifactId>
<version>1.7.2</version>
</dependency>
<!-- Opsional: https://mvnrepository.com/artifact/org.datasyslab/geotools-wrapper -->
<dependency>
<groupId>org.datasyslab</groupId>
<artifactId>geotools-wrapper</artifactId>
<version>1.7.2-28.5</version>
</dependency>2. Inisialisasi lingkungan
Sebelum menulis logika pekerjaan, gunakan kode berikut untuk mendaftarkan fungsi dan serializer Sedona.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Daftarkan fungsi dan serializer
SedonaContext.create(env, tableEnv);Jika Anda lebih memilih antarmuka TableEnvironment, Anda dapat menggunakan kode berikut untuk pendaftaran manual. Ini setara dengan menggunakan SedonaContext.
TableEnvironment tableEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().build());
TelemetryCollector.send("flink", "java");
// Daftarkan serializer
tableEnv.getConfig()
.set(
PipelineOptions.KRYO_DEFAULT_SERIALIZERS.key(),
"class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
+ "class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;"
+ "class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde");
// Daftarkan fungsi
Arrays.stream(Catalog.getFuncs())
.forEach(
(func) -> {
tableEnv.createTemporarySystemFunction(
func.getClass().getSimpleName(), func);
});
Arrays.stream(Catalog.getPredicates())
.forEach(
(func) -> {
tableEnv.createTemporarySystemFunction(
func.getClass().getSimpleName(), func);
});3. Tulis dan kemas pekerjaan
Tulis logika pekerjaan dan kemas menjadi file JAR.
Realtime Compute for Apache Flink menyediakan contoh kode pekerjaan untuk analisis kerapatan lalu lintas real-time, FlinkSedonaExample.zip, dan paket JAR-nya yang sesuai, FlinkSedonaExample-1.0-SNAPSHOT.jar. Kode ini berbasis Realtime Compute for Apache Flink VVR 8.0.11, yang sesuai dengan Apache Flink 1.17.2.
4. Terapkan pekerjaan
Anda dapat menerapkan pekerjaan JAR di Konsol Realtime Compute. Navigasi ke Operation Center > Job O&M > Deploy Job > JAR Job.
Setelah Anda menerapkan dan menjalankan paket JAR dari bagian sebelumnya, Anda dapat melihat keluarannya di halaman Flink Web UI > Task Manager > Stdout, seperti yang ditunjukkan pada gambar berikut.
