All Products
Search
Document Center

Realtime Compute for Apache Flink:Analyze real-time traffic density with Flink and Sedona

Last Updated:Nov 14, 2025

Realtime Compute for Apache Flink integrates with Apache Sedona to provide powerful, distributed, real-time geospatial analysis. This topic uses a real-time traffic density analysis scenario to demonstrate how to use Apache Sedona's spatial functions in Flink for efficient geospatial computations on data streams.

Limitations

This feature requires Ververica Runtime (VVR) 8.0.11 or later.

Apache Sedona

Apache Sedona is a distributed, high-performance computing (HPC) framework for processing large-scale geospatial data.

Apache Sedona version 1.7.2 supports three aggregate functions and over 200 scalar functions. For details, see the official API Documentation.

Procedure

SQL

This section shows how to develop a job using SQL for real-time traffic density analysis.

1. Download the sedona-flink JAR

Download the required JAR package, such as sedona-flink-shaded_2.12-1.7.2.jar, from the Apache Sedona Maven repository.

Alternatively, download the compressed package from the Apache Sedona download page, and extract sedona-flink-shaded_2.12-1.7.2.jar.

image.png

2. Register the UDF

In Realtime Compute for Apache Flink's Development Console, navigate to Development > ETL. Select the UDFs tab. Click the + icon to register a UDF. Upload the downloaded sedona-flink-shaded_2.12-1.7.2.jar package and select the required functions to register.

image.png

image.png

Note

If a UDF is implemented as an inner class, Realtime Compute for Apache Flink automatically prefixes its function name with the outer class name, as shown above. Either keep the default function name or modify it as needed.

After registration, view all registered functions on the Development > ETL > UDFs page.

image.png

3. Write job code and register serializers

Navigate to Development > ETL, create a blank stream draft, and write code.

Important

Register the dedicated serializers by setting the pipeline.default-kryo-serializers parameter at the beginning of the code. This reduces serialization overhead by approximately two-thirds. For a detailed description of the parameter, see pipeline.default-kryo-serializers. Example:

SET 'pipeline.default-kryo-serializers' = 'class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde';

Sample code:

-- =============================================================================
--  0. Register the dedicated serializer
-- =============================================================================
SET 'pipeline.default-kryo-serializers' = 'class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde';

-- =============================================================================
--  1. Define the source table
-- =============================================================================
-- Create a temporary table to simulate a continuous data stream of vehicle positions.
CREATE TEMPORARY TABLE vehicle_positions (
  `vehicle_id` STRING,
  `lon`        DOUBLE,
  `lat`        DOUBLE,
  `event_time` TIMESTAMP_LTZ(3),
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '2' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.vehicle_id.length' = '5',
  'fields.lon.min' = '116.3',
  'fields.lon.max' = '116.5',
  'fields.lat.min' = '39.85',
  'fields.lat.max' = '40.0'
);

-- =============================================================================
--  2. Define the sink table
-- =============================================================================
-- Create a temporary table to print the final aggregation results to the console.
CREATE TEMPORARY TABLE print_sink (
  `area_name`     STRING,
  `vehicle_count` BIGINT,
  `window_end`    TIMESTAMP(3)
) WITH (
  'connector' = 'print',
  'print-identifier' = 'AREA_STATS_SQL'
);

-- =============================================================================
--  3. ETL logic
-- =============================================================================
-- Deduplicate and count vehicle IDs within each area and window.
INSERT INTO print_sink
SELECT
    F.area_name,
    COUNT(DISTINCT V.vehicle_id) AS vehicle_count,
    V.window_end
FROM
    TABLE(TUMBLE(TABLE vehicle_positions, DESCRIPTOR(event_time), INTERVAL '10' SECOND)) AS V
-- Area range
INNER JOIN (
    VALUES
        ('area_01', 'West District', 'POLYGON((116.30 39.90, 116.40 39.90, 116.40 39.95, 116.30 39.95, 116.30 39.90))'),
        ('area_02', 'East District', 'POLYGON((116.41 39.90, 116.48 39.90, 116.48 39.95, 116.41 39.95, 116.41 39.90))')
)
AS F(area_id, area_name, wkt_polygon)
-- The join condition is whether the vehicle is within the defined area.
ON Predicates$ST_Contains(Constructors$ST_GeomFromWKT(F.wkt_polygon), Constructors$ST_Point(V.lon, V.lat))
GROUP BY
    V.window_start,
    V.window_end,
    F.area_id,
    F.area_name;

4. Deploy the job

In the SQL editor, click Deploy.

After deploying and starting the job, view its output on the Flink Web UI > Task Manager > Stdout page, as shown below.

image.png

JAR

Note

The Apache Sedona documentation provides tutorials for Table APIs and a ready-to-use Demo.

This section shows how to write a simple Flink application for real-time traffic density analysis and deploy it to Realtime Compute for Apache Flink.

1. Add dependencies

If you are using Apache Sedona 1.7.2, add the following dependencies described in the documentation.

<dependency>
    <groupId>org.apache.sedona</groupId>
    <artifactId>sedona-flink-shaded_2.12</artifactId>
    <version>1.7.2</version>
</dependency>
<!-- Optional: https://mvnrepository.com/artifact/org.datasyslab/geotools-wrapper -->
<dependency>
    <groupId>org.datasyslab</groupId>
    <artifactId>geotools-wrapper</artifactId>
    <version>1.7.2-28.5</version>
</dependency>

2. Initialize the environment

Before writing the job code, register Apache Sedona's functions and serializers:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Register Apache Sedona functions and serializers
SedonaContext.create(env, tableEnv);

If you prefer to use the TableEnvironment interface, manually register functions and serializers with the following code. This method has the is functionally equivalent to SedonaContext.

TableEnvironment tableEnv =
    TableEnvironment.create(EnvironmentSettings.newInstance().build());

TelemetryCollector.send("flink", "java");

// Register serializers
tableEnv.getConfig()
    .set(
        PipelineOptions.KRYO_DEFAULT_SERIALIZERS.key(),
        "class:org.locationtech.jts.geom.Geometry,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.Point,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.LineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.Polygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.MultiPoint,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.MultiLineString,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.MultiPolygon,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.GeometryCollection,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.apache.sedona.common.geometryObjects.Circle,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.geom.Envelope,serializer:org.apache.sedona.common.geometrySerde.GeometrySerde;"
        + "class:org.locationtech.jts.index.quadtree.Quadtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde;"
        + "class:org.locationtech.jts.index.strtree.STRtree,serializer:org.apache.sedona.common.geometrySerde.SpatialIndexSerde");

// Register functions
Arrays.stream(Catalog.getFuncs())
    .forEach(
        (func) -> {
            tableEnv.createTemporarySystemFunction(
                func.getClass().getSimpleName(), func);
        });
Arrays.stream(Catalog.getPredicates())
    .forEach(
        (func) -> {
            tableEnv.createTemporarySystemFunction(
                func.getClass().getSimpleName(), func);
        });

3. Write job code and package the project

Write job code and package the project into a JAR.

Realtime Compute for Apache Flink provides sample code file (FlinkSedonaExample.zip) and its corresponding JAR package (FlinkSedonaExample-1.0-SNAPSHOT.jar). This application uses VVR 8.0.11, which corresponds to Apache Flink 1.17.2.

FlinkSedonaExample.zip

FlinkSedonaExample-1.0-SNAPSHOT.jar

4. Deploy the job

In Realtime Compute for Apache Flink's Development Console, create a JAR deployment by navigating to O&M > Deployments > Create Deployment > JAR Deployment.

After deploying and starting the job, view its output on Flink Web UI > Task Manager > Stdout, as shown below.

image.png