すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:SESSION

最終更新日:Apr 14, 2025

このトピックでは、Realtime Compute for Apache Flink で SESSION 関数を使用する方法について説明します。

定義

SESSION 関数は、セッションアクティビティによって要素をグループ化します。タンブリングウィンドウやスライドウィンドウとは異なり、セッションウィンドウは重複せず、サイズも固定されていません。セッションウィンドウが特定の期間内に要素を受信しない場合、セッションは切断され、ウィンドウは閉じられます。

構文

GROUP BY 句で SESSION 関数を使用して、セッションウィンドウを定義できます。

SESSION(<time-attr>, <gap-interval>)

入力パラメーター

パラメーター

説明

time-attr

パラメーターは、ストリーム内の有効な時間属性フィールドである必要があります。このパラメーターは、時間が処理時間かイベント時間かを指定します。詳細については、「ウィンドウ関数」をご参照ください。

-

gap-interval

セッションのタイムアウト期間または非アクティブ間隔。セッションの最後の要素が到着した後、<gap-interval> で指定された期間内に新しい要素が到着しない場合、セッションは閉じられます。後続に到着した要素は、新しいセッションに割り当てられます。 gap-interval パラメーターは、INTERVAL 'num' timeUnit 形式です。

INTERVAL '10' SECOND は、セッションのタイムアウト期間が 10 秒であることを示します。

ウィンドウ識別子関数

ウィンドウ識別子関数は、ウィンドウの開始時刻、終了時刻、または時間属性を指定します。時間属性は、下位レベルのウィンドウを集計するために使用されます。

関数

戻り値の型

説明

SESSION_START (<time-attr>, <gap-interval>)

Timestamp

境界値を含むウィンドウの開始時刻を返します。たとえば、ウィンドウの時間範囲が [00:10, 00:15] の場合、00:10 が返されます。戻り値は、セッションウィンドウの最初のレコードの時間です。

SESSION_END (<time-attr>, <gap-interval>)

Timestamp

境界値を含むウィンドウの終了時刻を返します。たとえば、ウィンドウの時間範囲が [00:00, 00:15] の場合、00:15 が返されます。戻り値は、セッションウィンドウの最後のレコードの時間 + <gap-interval> です。

SESSION_ROWTIME (<time-attr>, <gap-interval>)

TIMESTAMP (rowtime-attr)

境界値を除くウィンドウの終了時刻を返します。たとえば、ウィンドウの時間範囲が (00:00, 00:15) の場合、00:14:59.999 が返されます。戻り値は、時間操作を実行できる基準となる行時間属性です。この関数は、カスケードウィンドウなどのイベント時間に基づいて定義されたウィンドウでのみ使用できます。詳細については、「カスケードウィンドウ」をご参照ください。この関数は、イベント時間に基づくウィンドウにのみ適用されます。

SESSION_PROCTIME(<time-attr>, <gap-interval>)

TIMESTAMP (rowtime-attr)

境界値を除くウィンドウの終了時刻を返します。たとえば、ウィンドウの時間範囲が (00:00, 00:15) の場合、00:14:59.999 が返されます。戻り値は、時間操作を実行できる基準となる処理時間属性です。この関数は、カスケードウィンドウなどの処理時間に基づいて定義されたウィンドウでのみ使用できます。詳細については、「カスケードウィンドウ」をご参照ください。この関数は、処理時間に基づくウィンドウにのみ適用されます。

次の例では、アクティブなセッションごとにユーザーごとのクリック数を計算する方法について説明します。セッションのタイムアウト間隔は 30 秒です。

  • user_clicks テーブルのテストデータ

    username (VARCHAR)

    click_url (VARCHAR)

    eventtime (VARCHAR)

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:00.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:10.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:49.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:01:05.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:01:58.0

    Timo

    http://taobao.com/xxx

    2024-10-10 10:02:10.0

  • テストステートメント

    CREATE TEMPORARY TABLE user_clicks(
      username varchar,
      click_url varchar,
      eventtime varchar,                            
      ts AS TO_TIMESTAMP(eventtime),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- 行時間のウォーターマークを定義します。
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<brokers>',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE session_output(
      window_start TIMESTAMP,
      window_end TIMESTAMP,
      username VARCHAR,
      clicks BIGINT
    ) WITH (
      'connector'='print',
      'logger'='true'
    );
    
    INSERT INTO session_output
    SELECT
    SESSION_START(ts, INTERVAL '30' SECOND),
    SESSION_END(ts, INTERVAL '30' SECOND),
    username,
    COUNT(click_url)
    FROM user_clicks
    GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;
  • テスト結果

    window_start (TIMESTAMP)

    window_end (TIMESTAMP)

    username (VARCHAR)

    clicks (BIGINT)

    2024-10-10 10:00:00.0

    2024-10-10 10:00:40.0

    Jark

    2

    2024-10-10 10:00:49.0

    2024-10-10 10:01:35.0

    Jark

    2

    2024-10-10 10:01:58.0

    2024-10-10 10:02:28.0

    Jark

    1

    2024-10-10 10:02:10.0

    2024-10-10 10:02:40.0

    Timo

    1