全部产品
Search
文档中心

Realtime Compute for Apache Flink:Interval join

更新时间:Jun 19, 2025

Topik ini menjelaskan cara menggunakan interval join.

Informasi latar belakang

Interval join memungkinkan dua aliran data digabungkan berdasarkan kunci umum. Fungsinya menghubungkan elemen dari setiap aliran ketika timestamp mereka berada dalam interval waktu relatif yang ditentukan. Setelah kedua aliran digabungkan, kolom timestamp dalam aliran input dipertahankan, sehingga memungkinkan pemrosesan lebih lanjut berdasarkan waktu kejadian.

Sintaksis

SELECT nama-kolom
FROM tabel1  [AS <alias1>]
[INNER | LEFT | RIGHT |FULL ] JOIN tabel2 
ON tabel1.nama-kolom1 = tabel2.nama-kunci1 AND EKSPRESI_WAKTU_TERBATAS
null
  • INNER JOIN, LEFT JOIN, RIGHT JOIN, dan FULL JOIN didukung. Jika Anda menggunakan JOIN tanpa spesifikasi, INNER JOIN akan dijalankan secara default.

  • SEMI JOIN dan ANTI JOIN tidak didukung.

  • EKSPRESI_WAKTU_TERBATAS adalah ekspresi kondisional interval pada kolom atribut waktu dari dua aliran data. Ekspresi kondisional berikut didukung:

    • ltime = rtime

    • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE

    • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

Contoh

Contoh ini menunjukkan barang yang dikirim dalam waktu 4 jam setelah penempatan pesanan.

  • Data Uji

    • Tabel Pesanan (orders)

      id

      productName

      orderTime

      1

      telepon

      2024-04-01 10:00:00.0

      2

      laptop

      2024-04-01 10:02:00.0

      3

      jam tangan

      2024-04-01 10:03:00.0

      4

      tablet

      2024-04-01 10:05:00.0

    • Tabel Logistik (Shipments)

      shipId

      orderId

      status

      shiptime

      0

      1

      dikirim

      2024-04-01 11:00:00.0

      1

      2

      dikirim

      2024-04-01 17:00:00.0

      2

      3

      dikirim

      2024-04-01 12:00:00.0

      3

      4

      dikirim

      2024-04-01 11:30:00.0

  • Pernyataan Uji

    CREATE TEMPORARY TABLE Orders(
      id BIGINT,
      productName VARCHAR,
      ordertime TIMESTAMP(3),
      WATERMARK wk FOR ordertime as withOffset(ordertime, 2000)  -- Tentukan kolom ordertime sebagai atribut waktu kejadian tabel dan gunakan strategi watermark tertunda 2 detik.
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<topikAnda>',
      'properties.bootstrap.servers' = '<brokerAnda>',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE Shipments(
      shipId BIGINT,
      orderId BIGINT,
      status VARCHAR,
      shiptime TIMESTAMP(3),
      WATERMARK wk FOR shiptime as withOffset(shiptime, 2000)  -- Tentukan kolom ordertime sebagai atribut waktu kejadian tabel dan gunakan strategi watermark tertunda 2 detik.
    ) WITH (
      'connector' = 'kafka',
       'topic' = '<topikAnda>',
      'properties.bootstrap.servers' = '<brokerAnda>',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    -- Buat sink MySQL.
    CREATE TEMPORARY TABLE rds_output(
      id BIGINT,
      productName VARCHAR,
      status VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<hostAnda>',
      'port' = '3306',
      'username' = '<namaPenggunaAnda>',
      'password' = '<kataSandiAnda>',
      'database-name' = '<namaBasisDataAnda>',
      'table-name' = '<namaTabelAnda>'
    );
    
    INSERT INTO rds_output
    SELECT id, productName, status
    FROM Orders AS o
    JOIN Shipments AS s on o.id = s.orderId AND
         o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime;
  • Hasil Uji

    id(bigint)

    productName(varchar)

    status(varchar)

    1

    telepon

    dikirim

    3

    jam tangan

    dikirim

    4

    tablet

    dikirim