すべてのプロダクト
Search
ドキュメントセンター

Tablestore:Realtime Compute for Apache Flink を使用して Tablestore データを処理する

最終更新日:Jul 08, 2025

このトピックでは、Realtime Compute for Apache Flink を使用して Tablestore データを計算する方法について説明します。Tablestore 内のデータテーブルまたは時系列テーブルは、Realtime Compute for Apache Flink を使用したデータ処理のソーステーブルまたは結果テーブルとして使用できます。

前提条件

リアルタイムコンピューティングジョブを開発する

ステップ 1:SQL ドラフトを作成する

  1. ドラフト作成ページに移動します。

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. 管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで、[開発] > [ETL] を選択します。

  2. [新規] をクリックします。[新規ドラフト] ダイアログボックスで、[空のストリームドラフト] を選択し、[次へ] をクリックします。

    説明

    Realtime Compute for Apache Flink はさまざまなコード テンプレートを提供し、データ同期をサポートしています。各コード テンプレートは特定のシナリオに適しており、コード サンプルと手順を提供します。テンプレートをクリックすると、Realtime Compute for Apache Flink の機能と関連構文について学習し、ビジネス ロジックを実装できます。詳細については、「コード テンプレート」および「データ同期テンプレート」をご参照ください。

  3. ドラフトのパラメータを構成します。 次の表にパラメータを示します。

    パラメータ

    説明

    名前

    作成するドラフトの名前。

    説明

    ドラフト名は現在のプロジェクト内で一意である必要があります。

    flink-test

    場所

    ドラフトのコードファイルが保存されるフォルダ。

    既存のフォルダの右側にある 新建文件夹 アイコンをクリックして、サブフォルダを作成することもできます。

    ドラフト

    エンジンバージョン

    現在のドラフトで使用させる Flink エンジンのバージョン。エンジンバージョンの詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。

    vvr-8.0.10-flink-1.17

  4. [作成] をクリックします。

ステップ 2:ドラフトのコードを記述する

説明

このステップの例では、データテーブルから別のデータテーブルにデータを同期するためのコードが記述されています。その他のサンプル SQL 文については、「サンプル SQL 文」をご参照ください。

  1. ソーステーブルと結果テーブルのための一時テーブルを作成します。

    詳細については、「付録 1:Tablestore コネクタ」をご参照ください。

    -- ソーステーブル用に tablestore_stream という名前の一時テーブルを作成します。
    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector' = 'ots', -- ソーステーブルのコネクタタイプを指定します。値は ots で、変更できません。
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tablestore インスタンスの VPC エンドポイントを指定します。
        'instanceName' = 'xxx', -- Tablestore インスタンスの名前を指定します。
        'tableName' = 'flink_source_table', -- ソーステーブルの名前を指定します。
        'tunnelName' = 'flink_source_tunnel', -- ソーステーブル用に作成されたトンネルの名前を指定します。
        'accessId' = 'xxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。
        'ignoreDelete' = 'false' -- 削除操作によって生成されたリアルタイムデータを無視するかどうかを指定します。この例では、このパラメータは false に設定されています。
    );
    
    -- 結果テーブル用に tablestore_sink という名前の一時テーブルを作成します。
    CREATE TEMPORARY TABLE tablestore_sink(
       `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED -- プライマリキーを指定します。
    ) WITH (
        'connector' = 'ots', -- 結果テーブルのコネクタタイプを指定します。値は ots で、変更できません。
        'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tablestore インスタンスの VPC エンドポイントを指定します。
        'instanceName' = 'xxx', -- Tablestore インスタンスの名前を指定します。
        'tableName' = 'flink_sink_table', -- 結果テーブルの名前を指定します。
        'accessId' = 'xxxxxxxxxxx',  -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。
        'valueColumns'='customerid,customername' -- 結果テーブルに挿入する列の名前を指定します。
    );
  2. ドラフトロジックを記述します。

    次のサンプル SQL 文は、ソーステーブルから結果テーブルにデータを挿入する方法の例を示しています。

    -- ソーステーブルから結果テーブルにデータを挿入します。
    INSERT INTO tablestore_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

ステップ 3:(オプション)構成情報を表示する

SQL エディタの右側のタブで、構成を表示したり、パラメータを構成したりできます。次の表にパラメータを示します。

タブ名

説明

[構成]

  • エンジンバージョン:現在のジョブで使用される Flink エンジンのバージョン。

  • 追加の依存関係:ドラフトで使用される追加の依存関係(一時関数など)。

    Ververica Runtime (VVR) 依存関係をダウンロードし、「アーティファクトのアップロード」ページで VVR 依存関係をアップロードしてから、[追加の依存関係] にアップロードされた VVR 依存関係を選択できます。詳細については、「付録 2:VVR 依存関係を構成する」をご参照ください。

[構造]

  • フロー図:データのフロー方向を表示できるフロー図。

  • ツリー図:データの処理元を表示できるツリー図。

[バージョン]

ドラフトのエンジンバージョンを表示できます。「ドラフトバージョン」パネルの [アクション] 列で実行できる操作の詳細については、「デプロイバージョンを管理する」をご参照ください。

ステップ 4:(オプション)構文チェックを実行する

ドラフトの SQL セマンティクス、ネットワーク接続、およびドラフトで使用されるテーブルのメタデータ情報を確認します。計算結果の [SQL アドバイス] をクリックして、SQL リスクと関連する最適化の提案に関する情報を表示することもできます。

  1. SQL エディタの右上隅にある [検証] をクリックします。

  2. [検証] ダイアログボックスで、[確認] をクリックします。

ステップ 5:(オプション)ドラフトをデバッグする

デバッグ機能を使用して、デプロイ実行をシミュレートし、出力を確認し、SELECT 文と INSERT 文のビジネスロジックを検証できます。この機能は開発効率を向上させ、データ品質低下のリスクを軽減します。

  1. SQL エディタの右上隅にある [デバッグ] をクリックします。

  2. [デバッグ] ダイアログボックスで、デバッグするクラスタを選択し、[次へ] をクリックします。

    使用可能なクラスタがない場合は、セッションクラスタを作成します。セッションクラスタが SQL ドラフトと同じエンジンバージョンを使用し、セッションクラスタが実行されていることを確認します。詳細については、「セッションクラスタを作成する」をご参照ください。

  3. デバッグデータを構成します。

    • オンラインデータを使用する場合は、この操作をスキップします。

    • デバッグデータを使用する場合は、[モックデータテンプレートをダウンロード] をクリックし、テンプレートにデバッグデータを入力してから、[モックデータをアップロード] をクリックしてデバッグデータをアップロードします。詳細については、「ドラフトをデバッグする」をご参照ください。

  4. [確認] をクリックします。

ステップ 6:ドラフトをデプロイする

SQL エディターの右上隅にある、[デプロイ] をクリックします。[ドラフトのデプロイ] ダイアログボックスで、関連パラメーターを構成し、[確認] をクリックします。

説明

セッション クラスターは、開発環境やテスト環境などの非運用環境に適しています。セッション クラスターにドラフトをデプロイまたはデバッグして、JobManager のリソース使用率を向上させ、デプロイメントの起動を高速化できます。ただし、運用環境のドラフトをセッション クラスターにデプロイすることはお勧めしません。そうしないと、安定性の問題が発生する可能性があります。

ステップ 7:ドラフトのデプロイを開始し、計算結果を表示する

  1. 左側のナビゲーションウィンドウで、[O&M] > [デプロイメント] を選択します。

  2. 開始するジョブを見つけ、[アクション] 列の [開始] をクリックします。

    [ジョブの開始] パネルで、[初期モード] を選択し、[開始] をクリックします。デプロイ ステータスが [実行中] に変わると、デプロイは想定どおりに実行されます。デプロイの開始時に構成する必要があるパラメーターの詳細については、「デプロイメントの開始」をご参照ください。

    説明
    • Realtime Compute for Apache Flink の各 TaskManager に 2 つの CPU コアと 4 GB のメモリを構成して、各 TaskManager の計算能力を最大限に活用することをお勧めします。1 つの TaskManager は 1 秒あたり 10,000 行を書き込むことができます。

    • ソーステーブルのパーティション数が多い場合は、Realtime Compute for Apache Flink で並列度を 16 未満に設定することをお勧めします。書き込み速度は並列度に比例して線形的に増加します。

  3. 「デプロイメント」ページで、計算結果を表示します。

    1. 左側のナビゲーションウィンドウで、[O&M] > [デプロイメント] を選択します。「デプロイメント」ページで、管理するデプロイの名前をクリックします。

    2. [ログ] タブで、[実行中のタスクマネージャ] タブをクリックし、[パス、ID] 列の値をクリックします。

    3. [ログ] をクリックします。「ログ」タブで、関連するログ情報を表示します。

  4. (オプション)デプロイをキャンセルします。

    デプロイの SQL コードを変更したり、WITH 句にパラメータを追加または削除したり、デプロイのバージョンを変更したりする場合は、デプロイのドラフトをデプロイし、デプロイをキャンセルしてから、変更を有効にするためにデプロイを開始する必要があります。デプロイが失敗し、状態データを再利用して回復できない場合、または動的に有効にならないパラメータ設定を更新する場合は、デプロイをキャンセルしてから再起動する必要があります。デプロイのキャンセル方法の詳細については、「デプロイをキャンセルする」をご参照ください。

付録

付録 1:Tablestore コネクタ

Realtime Compute for Apache Flink は、Tablestore データを読み取り、書き込み、同期するための組み込みの Tablestore コネクタ を提供します。

ソーステーブル

DDL 構文
データテーブル

次のサンプルコードは、ソーステーブルのための一時テーブルを作成するための DDL 文の例を示しています。

-- ソーステーブル用に tablestore_stream という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false'
);
時系列テーブル

次のサンプルコードは、ソーステーブルのための一時テーブルを作成するための DDL 文の例を示しています。

-- ソーステーブル用に tablestore_stream という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

データが消費される必要があるフィールド、およびトンネルサービスの戻りデータの OtsRecordType フィールドと OtsRecordTimestamp フィールドは、属性列として読み書きできます。次の表にフィールドを示します。

フィールド

Realtime Compute for Apache Flink でマップされたフィールド

説明

OtsRecordType

type

操作タイプ。

OtsRecordTimestamp

timestamp

データ操作時間。単位:マイクロ秒。

説明

Realtime Compute for Apache Flink に完全データを読み取らせる場合は、このフィールドを 0 に設定します。

WITH 句のパラメータ

accessKey

パラメータ

該当するテーブル

必須

説明

connector

全般

はい

ソーステーブルのコネクタタイプ。値は ots で、変更できません。

endPoint

全般

はい

Tablestore インスタンスのエンドポイント。VPC エンドポイントを使用する必要があります。詳細については、「エンドポイント」をご参照ください。

instanceName

全般

はい

Tablestore インスタンスの名前。

tableName

全般

はい

Tablestore 内のソーステーブルの名前。

tunnelName

全般

はい

Tablestore 内のソーステーブルのトンネルの名前。トンネルの作成方法については、「トンネルを作成する」をご参照ください。

accessId

全般

はい

Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ペア (AccessKey ID と AccessKey シークレット)。

重要

AccessKey ペアを保護するために、変数を使用して AccessKey ペアを指定することをお勧めします。詳細については、「変数を管理する」をご参照ください。

全般

はい

Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。

AccessKey ペアを保護するために、変数を使用して AccessKey ペアを指定することをお勧めします。詳細については、「変数を管理する」をご参照ください。

connectTimeout

全般

いいえ

Tablestore コネクタが Tablestore に接続するためのタイムアウト期間。単位:ミリ秒。デフォルト値:30000。

socketTimeout

全般

いいえ

Tablestore コネクタが Tablestore に接続するためのソケットタイムアウト期間。単位:ミリ秒。デフォルト値:30000。

ioThreadCount

全般

いいえ

I/O スレッドの数。デフォルト値:4。

callbackThreadPoolSize

全般

いいえ

コールバックスレッドプールのサイズ。デフォルト値:4。

ignoreDelete

データテーブル

いいえ

削除操作によって生成されたリアルタイムデータを無視するかどうかを指定します。デフォルト値:false。削除操作によって生成されたリアルタイムデータは無視されません。

skipInvalidData

全般

いいえ

ダーティデータを無視するかどうかを指定します。デフォルト値:false。ダーティデータは無視されません。ダーティデータが無視されない場合、システムがダーティデータを処理するときにエラーが報告されます。

重要

VVR 8.0.4 以降を使用する Realtime Compute for Apache Flink のみ、このパラメータをサポートしています。

retryStrategy

全般

いいえ

再試行ポリシー。有効な値:

  • TIME:retryTimeoutMs パラメータで指定されたタイムアウト期間が終了するまで、システムは継続的に再試行します。これがデフォルト値です。

  • COUNT:retryCount パラメータで指定された最大再試行回数に達するまで、システムは継続的に再試行します。

retryCount

全般

いいえ

最大再試行回数。retryStrategy パラメータを COUNT に設定した場合、このパラメータを構成できます。デフォルト値:3。

retryTimeoutMs

全般

いいえ

再試行のタイムアウト期間。単位:ミリ秒。retryStrategy パラメータを TIME に設定した場合、このパラメータを構成できます。デフォルト値:180000。

streamOriginColumnMapping

全般

いいえ

ソーステーブルの列名と一時テーブルの列名のマッピング。

説明

ソーステーブルの列名と一時テーブルの列名をコロン (:) で区切ります。複数のマッピングはコンマ (,) で区切ります。例:origin_col1:col1,origin_col2:col2

outputSpecificRowType

全般

いいえ

特定の行タイプを渡すかどうかを指定します。有効な値:

  • false:特定の行タイプを渡しません。すべてのデータの行タイプは INSERT です。これがデフォルト値です。

  • true:特定の行タイプを渡します。データの行タイプは、INSERT、DELETE、または UPDATE_AFTER です。

データ型マッピング

Tablestore のフィールドデータ型

Realtime Compute for Apache Flink のフィールドデータ型

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

結果テーブル

DDL 構文
データテーブル

次のサンプルコードは、結果テーブルのための一時テーブルを作成するための DDL 文の例を示しています。

-- 結果テーブル用に tablestore_sink という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE tablestore_sink(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);
説明

Tablestore 結果テーブルのプライマリキースキーマと少なくとも 1 つの属性列を指定する必要があります。出力データは Tablestore 結果テーブルに追加され、テーブルデータが更新されます。

時系列テーブル

結果テーブルとして時系列テーブルを使用する場合、結果テーブルに次のプライマリキー列を指定する必要があります。_m_name_data_source_tags_time。その他の構成は、データテーブルを結果テーブルとして使用する場合と同じです。WITH 句のパラメータ、SINK テーブルのプライマリキー、および Map 形式のプライマリキーを使用して、時系列テーブルのプライマリキー列を指定できます。上記の 3 つの方法を同時に使用して時系列テーブルのプライマリキー列を指定する場合、WITH 句のパラメータで指定されたプライマリキー列が最も優先されます。

WITH 句のパラメータを使用する

次のサンプルコードは、WITH 句のパラメータを使用して DDL 構文を定義する方法の例を示しています。

-- 結果テーブル用に tablestore_sink という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
    'timeseriesSchema' = '{"measurement":"_m_name", "datasource":"_data_source", "tag_a":"_tags", "tag_b":"_tags", "tag_c":"_tags", "tag_d":"_tags", "tag_e":"_tags", "tag_f":"_tags", "time":"_time"}'
);

-- ソーステーブルから結果テーブルにデータを挿入します。
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
Map 形式のプライマリキーを使用する

次のサンプルコードは、Map 形式のプライマリキーを使用して DDL 構文を定義する方法の例を示しています。

説明

Tablestore は、TimeSeries モデルで時系列テーブルの _tags 列の生成を容易にするために、Flink の Map データ型を提供します。Map データ型は、列名の変更や単純な関数などのマッピング操作をサポートしています。Map を使用する場合、_tags プライマリキー列が 3 番目の位置にあることを確認してください。

-- 結果テーブル用に tablestore_sink という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- ソーステーブルから結果テーブルにデータを挿入します。
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    MAP[`tag_a`, `tag_b`, `tag_c`, `tag_d`, `tag_e`, `tag_f`] AS tags,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
SINK テーブルのプライマリキーを使用する

次のサンプルコードは、SINK テーブルのプライマリキーを使用して DDL 構文を定義する方法の例を示しています。最初のプライマリキー列は _m_name 列で、メジャーメント名を指定します。2 番目のプライマリキー列は _data_source 列で、データソースを指定します。最後のプライマリキー列は _time 列で、タイムスタンプを指定します。中央のプライマリキー列は _tags 列で、時系列のタグを指定します。

-- 結果テーブル用に tablestore_sink という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
    PRIMARY KEY(measurement, datasource, tag_a, tag_b, tag_c, tag_d, tag_e, tag_f, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- ソーステーブルから結果テーブルにデータを挿入します。
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
WITH 句のパラメーター

パラメーター

該当するテーブル

必須

説明

connector

全般

はい

結果テーブルのコネクタタイプ。値は ots で、変更できません。

endPoint

全般

はい

Tablestore インスタンスのエンドポイント。VPC エンドポイントを使用する必要があります。詳細については、「エンドポイント」をご参照ください。

instanceName

全般

はい

Tablestore インスタンスの名前。

tableName

全般

はい

Tablestore の時系列テーブルの名前。

accessId

全般

はい

Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ペア(AccessKey ID と AccessKey シークレット)。

重要

AccessKey ペアを保護するために、変数を使用して AccessKey ペアを指定することをお勧めします。詳細については、「変数の管理」をご参照ください。

accessKey

全般

はい

valueColumns

データテーブル

はい

挿入する列の名前。複数のフィールドはカンマ (,) で区切ります。例: ID,NAME

storageType

全般

いいえ

重要

時系列テーブルを結果テーブルとして使用する場合は、このパラメーターを TIMESERIES に設定します。

テーブルのタイプ。有効な値:

  • WIDE_COLUMN: データテーブル。これがデフォルト値です。

  • TIMESERIES: 時系列テーブル。

timeseriesSchema

時系列テーブル

いいえ

重要

時系列テーブルを結果テーブルとして使用する場合、WITH 句のパラメーターを使用して時系列テーブルのプライマリキーを指定する場合は、このパラメーターを設定する必要があります。

時系列テーブルのプライマリキー列として指定する列。

  • JSON 形式のキーと値のペアを使用して、このパラメーターの値を指定します。例: {"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}

  • 指定するプライマリキー列のタイプは、時系列テーブルのプライマリキー列のタイプと同じである必要があります。タグのプライマリキー列は、複数の列で構成できます。

connectTimeout

全般

いいえ

Tablestore コネクタが Tablestore に接続するためのタイムアウト期間。単位: ミリ秒。デフォルト値: 30000。

socketTimeout

全般

いいえ

Tablestore コネクタが Tablestore に接続するためのソケットタイムアウト期間。単位: ミリ秒。デフォルト値: 30000。

ioThreadCount

全般

いいえ

I/O スレッドの数。デフォルト値: 4。

callbackThreadPoolSize

全般

いいえ

コールバックスレッドプールのサイズ。デフォルト値: 4。

retryIntervalMs

全般

いいえ

再試行間隔。単位: ミリ秒。デフォルト値: 1000。

maxRetryTimes

全般

いいえ

最大再試行回数。デフォルト値: 10。

bufferSize

全般

いいえ

データが結果テーブルに書き込まれる前にバッファーに格納できるデータレコードの最大数。デフォルト値: 5000。これは、バッファー内のデータレコード数が 5,000 に達すると、データが結果テーブルに書き込まれることを指定します。

batchWriteTimeoutMs

全般

いいえ

書き込みタイムアウト期間。単位: ミリ秒。デフォルト値: 5000。これは、バッファー内のデータレコード数が 5,000 ミリ秒以内に bufferSize パラメーターで指定された値に達しない場合、バッファー内のすべてのデータが結果テーブルに書き込まれることを指定します。

batchSize

全般

いいえ

結果テーブルに同時に書き込むことができるデータレコードの数。デフォルト値: 100。最大値: 200。

ignoreDelete

全般

いいえ

削除操作によって生成されたリアルタイムデータを無視するかどうかを指定します。デフォルト値: false。これは、削除操作によって生成されたリアルタイムデータが無視されないことを指定します。

重要

データテーブルをソーステーブルとして使用する場合は、ビジネス要件に基づいてこのパラメーターを設定できます。

autoIncrementKey

データテーブル

いいえ

結果テーブルに自動採番プライマリキー列が含まれている場合の、結果テーブルの自動採番プライマリキー列の名前。結果テーブルに自動採番プライマリキー列がない場合は、このパラメーターを設定する必要はありません。

重要

VVR 8.0.4 以降を使用する Apache Flink 用の Realtime Compute のみ、このパラメーターをサポートしています。

overwriteMode

全般

いいえ

データの上書きモード。有効な値:

  • PUT: データは PUT モードで Tablestore テーブルに書き込まれます。これがデフォルト値です。

  • UPDATE: データは UPDATE モードで Tablestore テーブルに書き込まれます。

説明

動的カラムモードでは、UPDATE モードのみがサポートされています。

defaultTimestampInMillisecond

全般

いいえ

Tablestore テーブルにデータを書き込むために使用されるデフォルトのタイムスタンプ。このパラメーターを空のままにすると、現在のシステム時刻のタイムスタンプが使用されます。

dynamicColumnSink

全般

いいえ

動的カラムモードを有効にするかどうかを指定します。デフォルト値: false。これは、動的カラムモードが無効になっていることを指定します。

説明
  • 動的カラムモードは、テーブルに列が指定されておらず、デプロイ状況に基づいてデータ列がテーブルに挿入されるシナリオに適しています。テーブル作成ステートメントで最初のいくつかの列をプライマリキー列として指定する必要があります。2 番目から最後の列の値は列名変数として使用され、最後の列の値は列名変数の値として使用され、2 番目から最後の列のデータ型は String である必要があります。

  • 動的カラムモードを有効にした場合、自動採番プライマリキー列機能はサポートされず、overwriteMode パラメーターを UPDATE に設定する必要があります。

checkSinkTableMeta

全般

いいえ

結果テーブルのメタデータを確認するかどうかを指定します。デフォルト値: true。これは、システムが Tablestore テーブルのプライマリキー列がテーブル作成ステートメントで指定されたプライマリキー列と同じかどうかを確認することを指定します。

enableRequestCompression

全般

いいえ

データ書き込み中にデータ圧縮を有効にするかどうかを指定します。デフォルト値: false。これは、データ書き込み中にデータ圧縮が無効になっていることを指定します。

データ型マッピング

Realtime Compute for Apache Flink のフィールドデータ型

Tablestore のフィールドデータ型

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

サンプル SQL 文

ソーステーブルから結果テーブルにデータを同期する
データテーブルから時系列テーブルにデータを同期する

flink_source_table という名前のデータテーブルからデータを読み取り、flink_sink_table という名前の時系列テーブルにデータを書き込みます。

サンプル SQL 文:

-- ソーステーブル用に tablestore_stream という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE tablestore_stream(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- WITH 句のパラメータを使用して、結果テーブル用に tablestore_sink という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE tablestore_sink(
     measurement STRING,
     datasource STRING,
     tag_a STRING,
     `time` BIGINT,
     binary_value BINARY,
     bool_value BOOLEAN,
     double_value DOUBLE,
     long_value BIGINT,
     string_value STRING,
     tag_b STRING,
     tag_c STRING,
     tag_d STRING,
     tag_e STRING,
     tag_f STRING,
     PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
 ) WITH (
     'connector' = 'ots',
     'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
     'instanceName' = 'xxx',
     'tableName' = 'flink_sink_table',
     'accessId' = 'xxxxxxxxxxx',
     'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
     'storageType' = 'TIMESERIES',
     'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
 );
 
-- ソーステーブルから結果テーブルにデータを挿入します。
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
時系列テーブルからデータテーブルにデータを同期する

flink_source_table という名前の時系列テーブルからデータを読み取り、flink_sink_table という名前のデータテーブルにデータを書き込みます。

サンプル SQL 文:

-- ソーステーブル用に tablestore_stream という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

-- 結果テーブル用に print_table という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE tablestore_target(
    measurement STRING,
    datasource STRING,
    tags STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY (measurement,datasource, tags, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='binary_value,bool_value,double_value,long_value,string_value'
);

-- ソーステーブルから結果テーブルにデータを挿入します。
INSERT INTO tablestore_target
SELECT
    _m_name,
    _data_source,
    _tags,
    _time,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from tablestore_stream;
ソーステーブルからデータを読み取り、Tablestore コンソールにデータを印刷する

flink_source_table という名前のソーステーブルからデータをバッチで読み取ります。デプロイデバッグ機能を使用して、デプロイの実行をシミュレートできます。デバッグ結果は、SQL エディタの下部に表示されます。

サンプル SQL 文:

-- ソースデータテーブル用に tablestore_stream という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- ソーステーブルからデータを読み取ります。
SELECT * FROM tablestore_stream LIMIT 100;
ソーステーブルからデータを読み取り、TaskManager ログにデータを印刷する

flink_source_table という名前のソーステーブルからデータを読み取り、Print コネクタを使用して結果をTaskManager ログに出力します。

サンプル SQL 文:

-- ソースデータテーブル用に tablestore_stream という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- 結果テーブル用に print_table という名前の一時テーブルを作成します。
CREATE TEMPORARY TABLE print_table(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
  'connector' = 'print',   -- Print コネクタを使用します。
  'logger' = 'true'        -- Realtime Compute for Apache Flink の開発コンソールに計算結果を表示します。
);

-- ソーステーブルのフィールドを出力します。
INSERT INTO print_table
SELECT `order`,orderid,customerid,customername from tablestore_stream;

付録 2:VVR 依存関係を構成する

  1. VVR 依存関係 をダウンロードします。

  2. VVR 依存関係をアップロードします。

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. 管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで、[アーティファクト] をクリックします。

    4. [アーティファクト] ページで、[アーティファクトのアップロード] をクリックし、VVR 依存関係が格納されている JAR パッケージを選択します。

  3. 管理するジョブの SQL エディタの右側にある [構成] をクリックします。「構成」パネルの [追加の依存関係] フィールドで、VVR 依存関係が格納されている JAR パッケージを選択します。