全部产品
Search
文档中心

Realtime Compute for Apache Flink:Pernyataan temporal join berdasarkan waktu pemrosesan

更新时间:Jun 19, 2025

Di Realtime Compute for Apache Flink, setiap aliran data dapat dikaitkan dengan tabel dimensi dari sumber data eksternal. Hal ini memungkinkan Anda melakukan kueri terkait di Realtime Compute for Apache Flink. Topik ini menjelaskan cara menggunakan temporal join berdasarkan waktu pemrosesan.

Latar Belakang

Temporal join berdasarkan waktu pemrosesan menggunakan atribut waktu pemrosesan untuk menghubungkan baris dalam tabel fakta dengan versi terbaru dari kunci yang sesuai di tabel dimensi. Tidak seperti temporal join berdasarkan waktu kejadian yang menghubungkan baris berdasarkan waktu terjadinya peristiwa, temporal join berdasarkan waktu pemrosesan menghubungkan baris berdasarkan waktu kedatangan data.

Prasyarat

  • Realtime Compute for Apache Flink menggunakan Ververica Runtime (VVR) 8.0.10 atau yang lebih baru.

  • Tabel dimensi MySQL telah dibuat.

Catatan penggunaan

  • Nonaktifkan pengecekan titik selama sinkronisasi penuh dengan mengonfigurasi execution.checkpointing.interval-during-backlog = 0, karena Realtime Compute for Apache Flink tidak mendukung pengecekan titik selama sinkronisasi penuh. Konfigurasi ini tidak akan memengaruhi pengecekan titik selama sinkronisasi data inkremental.

  • Untuk menggunakan temporal join berdasarkan waktu pemrosesan, konfigurasikan table.optimizer.proctime-temporal-join-strategy = TEMPORAL_JOIN.

Sintaks

Sintaks untuk temporal join berdasarkan waktu pemrosesan sama dengan sintaks untuk menggabungkan tabel dimensi:

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;

Contoh

  • Data Uji

    • Tabel 1 kafka_input

      id (bigint)

      name (varchar)

      age (bigint)

      1

      Lee

      22

      2

      Harry

      20

      3

      Liban

      28

    • Tabel 2 phoneNumber

      name (varchar)

      phoneNumber (bigint)

      David

      1390000111

      Brooks

      1390000222

      Liban

      1390000333

      Lee

      1390000444

  • Kode Uji

    SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN';  -- Gunakan temporal join berdasarkan waktu pemrosesan.
    SET 'execution.checkpointing.interval-during-backlog' = '0';              -- Nonaktifkan pengecekan titik selama sinkronisasi penuh.
    
    CREATE TEMPORARY TABLE kafka_input (
      id   BIGINT,
      name VARCHAR,
      age  BIGINT,
      proc_time AS PROCTIME()
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    -- Definisikan sebuah tabel dimensi.
    CREATE TEMPORARY TABLE phoneNumber(
      name VARCHAR,
      phoneNumber BIGINT,
      PRIMARY KEY(name) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE result_infor(
      id BIGINT,
      phoneNumber BIGINT,
      name VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO result_infor
    SELECT
      t.id,
      w.phoneNumber,
      t.name
    FROM kafka_input as t
    JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time as w
    ON t.name = w.name;
  • Hasil Uji

    id (bigint)

    phoneNumber (bigint)

    name (varchar)

    1

    1390000444

    Lee

    3

    1390000333

    Liban