すべてのプロダクト
Search
ドキュメントセンター

Tablestore:Realtime Compute for Apache Flink を使用して Tablestore データを処理する

最終更新日:May 09, 2026

このトピックでは、Realtime Compute for Apache Flink を使用して Tablestore データを処理する方法について説明します。Tablestore のデータテーブルまたは時系列テーブルを、Realtime Compute for Apache Flink によるデータ処理のソーステーブルまたは結果テーブルとして使用できます。

前提条件

リアルタイムコンピューティングジョブの開発

ステップ 1:SQL ドラフトの作成

  1. ドラフト作成ページに移動します。

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. 対象のワークスペースの 操作 列で、コンソール をクリックします。

    3. 左側のナビゲーションウィンドウで、開発 > ETL をクリックします。

  2. 新規作成 をクリックします。新規ドラフト ダイアログボックスで、空のストリームドラフト を選択し、次へ をクリックします。

    説明

    Realtime Compute for Apache Flink には、さまざまなコードテンプレートが用意されており、データ同期もサポートしています。各コードテンプレートは特定のシナリオに適しており、コードサンプルと Realtime Compute for Apache Flink の機能および関連構文に関する説明が提供されています。テンプレートをクリックして、ビジネスロジックを実装できます。詳細については、「コードテンプレート」および「データ同期テンプレート」をご参照ください。

  3. ジョブ情報 を入力します。

    パラメーター

    説明

    ファイル名

    作成するドラフトの名前です。

    説明

    ドラフト名は現在のプロジェクト内で一意である必要があります。

    flink-test

    保存場所

    ドラフトのコードファイルを保存するフォルダです。

    既存のフォルダの右側にある 新建文件夹 アイコンをクリックして、サブフォルダを作成することもできます。

    Draft

    エンジンバージョン

    現在のドラフトで使用する Flink エンジンのバージョンです。エンジンバージョンの詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。

    vvr-8.0.10-flink-1.17

  4. 作成 をクリックします。

ステップ 2:SQL ドラフトの記述

説明

このステップの例では、データテーブルから別のデータテーブルへのデータ同期を行うコードを記述します。その他の SQL ステートメントのサンプルについては、「SQL ステートメントのサンプル」をご参照ください。

  1. ソーステーブルおよび結果テーブル用の一時テーブルを作成します。

    詳細については、「付録 1:Tablestore コネクタ」をご参照ください。

    -- ソーステーブル用の一時テーブル tablestore_stream を作成します。
    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector' = 'ots', -- ソーステーブルのコネクタタイプを指定します。値は ots で、変更できません。
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tablestore インスタンスの VPC エンドポイントを指定します。
        'instanceName' = 'xxx', -- Tablestore インスタンスの名前を指定します。
        'tableName' = 'flink_source_table', -- ソーステーブルの名前を指定します。
        'tunnelName' = 'flink_source_tunnel', -- ソーステーブル用に作成されたトンネルの名前を指定します。
        'accessId' = 'xxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret を指定します。
        'ignoreDelete' = 'false' -- 削除操作によって生成されるリアルタイムデータを無視するかどうかを指定します。この例では false に設定されています。
    );
    
    -- 結果テーブル用の一時テーブル tablestore_sink を作成します。
    CREATE TEMPORARY TABLE tablestore_sink(
       `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED -- プライマリキーを指定します。
    ) WITH (
        'connector' = 'ots', -- 結果テーブルのコネクタタイプを指定します。値は ots で、変更できません。
        'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tablestore インスタンスの VPC エンドポイントを指定します。
        'instanceName' = 'xxx', -- Tablestore インスタンスの名前を指定します。
        'tableName' = 'flink_sink_table', -- 結果テーブルの名前を指定します。
        'accessId' = 'xxxxxxxxxxx',  -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret を指定します。
        'valueColumns'='customerid,customername' -- 結果テーブルに挿入する列の名前を指定します。
    );
  2. ドラフトのロジックを記述します。

    次の SQL ステートメントのサンプルは、ソーステーブルから結果テーブルにデータを挿入する方法の例です。

    -- ソーステーブルから結果テーブルにデータを挿入します。
    INSERT INTO tablestore_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

ステップ 3:(オプション)構成情報の確認

SQL エディターの右側のタブで、構成を確認またはパラメーターを設定できます。次の表にパラメーターを示します。

タブ名

説明

構成

  • エンジンバージョン:ドラフトで使用する Flink エンジンのバージョンです。

  • 追加依存関係:ジョブに必要な追加の依存関係(UDF など)です。

    Ververica Runtime (VVR) の依存関係をダウンロードし、リソースファイルページでアップロードしてから、アップロードしたファイルを 追加依存関係 として選択できます。詳細については、「付録 2:VVR 依存関係の設定」をご参照ください。

構造

  • データフロー図:ジョブのデータフローを可視化します。

  • ツリー構造図:ジョブのデータリネージを可視化します。

バージョン

ドラフトのバージョン履歴を表示します。操作 列の機能の詳細については、「ジョブバージョンの管理」をご参照ください。

ステップ 4:(オプション)構文チェックの実行

検証により、ジョブの SQL セマンティクス、ネットワーク接続、およびテーブルメタデータをチェックします。結果エリアで SQL アドバイス をクリックすると、潜在的な SQL リスクおよび最適化の提案を確認できます。

  1. SQL エディターの右上隅で、検証 をクリックします。

  2. 検証 ダイアログボックスで、確認 をクリックします。

ステップ 5:(オプション)ドラフトのデバッグ

デバッグ機能を使用して、デプロイメントの実行をシミュレートし、出力を確認して SELECT 文および INSERT 文のビジネスロジックを検証できます。これにより、開発効率が向上し、データ品質の低下リスクが軽減されます。

  1. SQL エディターの右上隅で、デバッグ をクリックします。

  2. デバッグ ダイアログボックスで、セッションクラスターを選択し、次へ をクリックします。

    利用可能なクラスターがない場合は、セッションクラスターを作成してください。セッションクラスターが SQL ドラフトと同じエンジンバージョンを使用しており、実行中であることを確認してください。詳細については、「セッションクラスターの作成」をご参照ください。

  3. デバッグデータを設定します。

    • オンラインデータを使用する場合は、この操作をスキップします。

    • デバッグデータを使用する場合は、デバッグデータテンプレートのダウンロード をクリックして、テンプレートにデータを入力し、ファイルをアップロードします。詳細については、「ジョブのデバッグ」をご参照ください。

  4. データを設定したら、OK をクリックします。

ステップ 6:ドラフトのデプロイ

SQL エディターの右上隅で、デプロイ をクリックします。新規バージョンのデプロイ ダイアログボックスで、デプロイパラメーターを設定し、OK をクリックします。

説明

セッションクラスターは、開発環境やテスト環境など、本番環境以外の環境に適しています。セッションクラスターでドラフトをデプロイまたはデバッグすることで、JobManager のリソース利用率が向上し、デプロイの起動が高速化されます。ただし、本番環境用のドラフトをセッションクラスターにデプロイしないことを推奨します。そうしないと、安定性の問題が発生する可能性があります。

ステップ 7:ドラフトのデプロイメントを開始し、計算結果を確認する

  1. 左側のナビゲーションウィンドウで、O&M > デプロイメント をクリックします。

  2. 対象のデプロイメントの 操作 列で、開始 をクリックします。

    状態なしで開始 を選択し、開始 をクリックします。実行中 のステータスは、デプロイメントが正常に動作していることを示します。起動パラメーターの詳細については、「ジョブの開始」をご参照ください。

    説明
    • Realtime Compute for Apache Flink の各 TaskManager の計算能力を最大限に活用するため、各 TaskManager に CPU コア 2 個とメモリ 4 GB を割り当てることを推奨します。TaskManager は 1 秒あたり 10,000 行を書き込むことができます。

    • ソーステーブルのパーティション数が多い場合は、Realtime Compute for Apache Flink の同時実行数を 16 未満に設定することを推奨します。書き込みレートは同時実行数に比例して線形に増加します。

  3. デプロイメントページで、計算結果を確認します。

    1. O&M > デプロイメント ページで、対象のデプロイメントの名前をクリックします。

    2. ジョブログ タブで、実行中のタスクマネージャー タブをクリックし、パス,ID 列で対象のタスクをクリックします。

    3. ログ をクリックして、ログ情報を確認します。

  4. (オプション) デプロイメントのキャンセル

    デプロイメントの SQL コードを変更したり、WITH 句にパラメーターを追加または削除したり、デプロイメントのバージョンを変更したりする場合は、デプロイメントのドラフトを再デプロイし、デプロイメントをキャンセルしてから再度開始する必要があります。デプロイメントが失敗し、状態データを再利用して復旧できない場合や、動的に適用されないパラメーター設定を更新したい場合も、デプロイメントをキャンセルしてから再開する必要があります。デプロイメントのキャンセル方法の詳細については、「デプロイメントのキャンセル」をご参照ください。

付録

付録 1:Tablestore コネクタ

Realtime Compute for Apache Flink には、Tablestore コネクタ が組み込まれており、Tablestore データの読み取り、書き込み、および同期が可能です。

ソーステーブル

DDL 構文
データテーブル

次のサンプルコードは、ソーステーブル用の一時テーブルを作成する DDL ステートメントの例です。

-- ソーステーブル用の一時テーブル tablestore_stream を作成します。
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false'
);
時系列テーブル

次のサンプルコードは、ソーステーブル用の一時テーブルを作成する DDL ステートメントの例です。

-- ソーステーブル用の一時テーブル tablestore_stream を作成します。
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

トンネルサービスのメタデータフィールド(OtsRecordTypeOtsRecordTimestamp など)を、ソーステーブルの通常の列として読み取ることができます。次の表にこれらのフィールドを示します。

フィールド

Realtime Compute for Apache Flink でのマッピングフィールド

説明

OtsRecordType

type

操作タイプです。

OtsRecordTimestamp

timestamp

データ操作時間。単位: マイクロ秒。

説明

Realtime Compute for Apache Flink が完全データを読み取るようにするには、このフィールドを 0 に設定します。

WITH 句のパラメーター

パラメーター

適用テーブル

必須

説明

connector

共通

はい

ソーステーブルのコネクタタイプです。値は ots で、変更できません。

endPoint

共通

はい

Tablestore インスタンスのエンドポイントです。VPC エンドポイントを使用する必要があります。詳細については、「エンドポイント」をご参照ください。

instanceName

共通

はい

Tablestore インスタンスの名前です。

tableName

共通

はい

Tablestore のソーステーブル名です。

tunnelName

共通

はい

Tablestore のソーステーブル用トンネルの名前です。トンネルの作成方法については、「トンネルの作成」をご参照ください。

accessId

一般

はい

Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ペア(AccessKey ID および AccessKey Secret)です。

重要

AccessKey ペアを保護するため、変数を使用して AccessKey ペアを指定することを推奨します。詳細については、「変数の管理」をご参照ください。

accessKey

共通

はい

connectTimeout

共通

いいえ

Tablestore コネクタが Tablestore に接続する際のタイムアウト期間です。単位:ミリ秒。デフォルト値:30000。

socketTimeout

共通

いいえ

Tablestore コネクタが Tablestore に接続する際のソケットタイムアウト期間です。単位:ミリ秒。デフォルト値:30000。

ioThreadCount

共通

いいえ

I/O スレッド数です。デフォルト値:4。

callbackThreadPoolSize

共通

いいえ

コールバックスレッドプールのサイズです。デフォルト値:4。

ignoreDelete

データテーブル

いいえ

削除操作によって生成されるリアルタイムデータを無視するかどうかを指定します。デフォルト値:false(削除操作によって生成されるリアルタイムデータを無視しません)。

skipInvalidData

共通

いいえ

ダーティデータを無視するかどうかを指定します。デフォルト値:false(ダーティデータを無視しません)。ダーティデータを無視しない場合、システムがダーティデータを処理するとエラーが報告されます。

重要

VVR 8.0.4 以降を使用する Realtime Compute for Apache Flink のみがこのパラメーターをサポートします。

retryStrategy

共通

いいえ

リトライポリシーです。有効な値:

  • TIME:retryTimeoutMs パラメーターで指定されたタイムアウト期間が終了するまで、システムが継続的にリトライします。これがデフォルト値です。

  • COUNT:retryCount パラメーターで指定された最大リトライ回数に達するまで、システムが継続的にリトライします。

retryCount

共通

いいえ

最大リトライ回数です。retryStrategy パラメーターを COUNT に設定した場合に、このパラメーターを設定できます。デフォルト値:3。

retryTimeoutMs

共通

いいえ

リトライのタイムアウト期間です。単位:ミリ秒。retryStrategy パラメーターを TIME に設定した場合に、このパラメーターを設定できます。デフォルト値:180000。

streamOriginColumnMapping

共通

いいえ

ソーステーブルの列名と一時テーブルの列名のマッピングです。

説明

元の列名と実際の列名はコロン(:)で区切ります。複数のマッピングはカンマ(,)で区切ります。例:origin_col1:col1,origin_col2:col2

outputSpecificRowType

共通

いいえ

特定の行タイプをパススルーするかどうかを指定します。有効な値:

  • false:特定の行タイプをパススルーしません。すべてのデータの行タイプは INSERT になります。これがデフォルト値です。

  • true:特定の行タイプをパススルーします。データの行タイプは INSERT、DELETE、または UPDATE_AFTER になります。

データ型のマッピング

Tablestore のフィールドデータ型

Realtime Compute for Apache Flink のフィールドデータ型

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

結果テーブル

DDL 構文
データテーブル

次のサンプルコードは、結果テーブル用の一時テーブルを作成する DDL ステートメントの例です。

-- 結果テーブル用の一時テーブル tablestore_sink を作成します。
CREATE TEMPORARY TABLE tablestore_sink(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);
説明

Tablestore の結果テーブルには、プライマリキーのスキーマと少なくとも 1 つの属性列を指定する必要があります。出力データは Tablestore の結果テーブルに追加され、テーブルデータが更新されます。

時系列テーブル

時系列結果テーブルには、4 つのプライマリキー(_m_name_data_source_tags、および _time)が必要です。これらのプライマリキーは、3 つの方法で指定できます。WITH パラメーターを使用する方法、結果テーブルのプライマリキー定義を使用する方法、Map 形式のプライマリキーを使用する方法です。_tags 列を定義する際、WITH パラメーターの方法が最も優先度が高く、次に Map 形式、最後に結果テーブルのプライマリキー定義となります。

WITH 句のパラメーターを使用する

次のサンプルコードは、WITH 句のパラメーターを使用して DDL 構文を定義する方法の例です。

-- 結果テーブル用の一時テーブル tablestore_sink を作成します。
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
    'timeseriesSchema' = '{"measurement":"_m_name", "datasource":"_data_source", "tag_a":"_tags", "tag_b":"_tags", "tag_c":"_tags", "tag_d":"_tags", "tag_e":"_tags", "tag_f":"_tags", "time":"_time"}'
);

-- ソーステーブルから結果テーブルにデータを挿入します。
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
Map 形式のプライマリキーを使用する

次のサンプルコードは、Map 形式のプライマリキーを使用して DDL 構文を定義する方法の例です。

説明

Tablestore は、TimeSeries モデルの時系列テーブルの _tags 列を生成しやすくするために、Flink の Map データ型を提供しています。Map データ型は、列名の変更や単純な関数などのマッピング操作をサポートします。Map を使用する際は、_tags プライマリキーカラムが 3 番目の位置にあることを確認してください。

-- 結果テーブル用の一時テーブル tablestore_sink を作成します。
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- ソーステーブルから結果テーブルにデータを挿入します。
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    MAP[`tag_a`, `tag_b`, `tag_c`, `tag_d`, `tag_e`, `tag_f`] AS tags,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
SINK テーブルのプライマリキーを使用する

次のサンプルコードは、SINK テーブルのプライマリキーを使用して DDL 構文を定義する方法の例です。最初のプライマリキーカラムは測定名を指定する _m_name カラムです。2 番目のプライマリキーカラムはデータソースを指定する _data_source カラムです。最後のプライマリキーカラムはタイムスタンプを指定する _time カラムです。中央のプライマリキーカラムは時系列のタグを指定する _tags カラムです。

-- 結果テーブル用の一時テーブル tablestore_sink を作成します。
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
    PRIMARY KEY(measurement, datasource, tag_a, tag_b, tag_c, tag_d, tag_e, tag_f, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- ソーステーブルから結果テーブルにデータを挿入します。
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
WITH 句のパラメーター

パラメーター

適用テーブル

必須

説明

connector

共通

はい

結果テーブルのコネクタタイプです。値は ots で、変更できません。

endPoint

共通

はい

Tablestore インスタンスのエンドポイントです。VPC エンドポイントを使用する必要があります。詳細については、「エンドポイント」をご参照ください。

instanceName

共通

はい

Tablestore インスタンスの名前です。

tableName

共通

はい

Tablestore の時系列テーブル名です。

accessId

共通

はい

Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ペア(AccessKey ID および AccessKey Secret)です。

重要

AccessKey ペアを保護するため、変数を使用して AccessKey ペアを指定することを推奨します。詳細については、「変数の管理」をご参照ください。

accessKey

共通

はい

valueColumns

データテーブル

はい

データを書き込む列の名前です。複数の列名はカンマ(,)で区切ります。例:ID,NAME

storageType

共通

いいえ

重要

結果テーブルとして時系列テーブルを使用する場合は、このパラメーターを TIMESERIES に設定します。

テーブルのタイプです。有効な値:

  • WIDE_COLUMN:データテーブル。これがデフォルト値です。

  • TIMESERIES:時系列テーブル。

timeseriesSchema

時系列テーブル

いいえ

重要

結果テーブルとして時系列テーブルを使用し、WITH 句のパラメーターを使用して時系列テーブルのプライマリキーを指定する場合は、このパラメーターを設定する必要があります。

時系列テーブルのプライマリキーカラムとして指定するカラムです。

  • 時系列テーブルのプライマリキーを JSON 形式のキーと値のペアで指定します。例:{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}

  • 指定するプライマリキーカラムのタイプは、時系列テーブルのプライマリキーカラムのタイプと同じである必要があります。tags プライマリキーカラムは複数のカラムで構成できます。

connectTimeout

共通

いいえ

Tablestore コネクタが Tablestore に接続する際のタイムアウト期間です。単位:ミリ秒。デフォルト値:30000。

socketTimeout

共通

いいえ

Tablestore コネクタが Tablestore に接続する際のソケットタイムアウト期間です。単位:ミリ秒。デフォルト値:30000。

ioThreadCount

共通

いいえ

I/O スレッド数です。デフォルト値:4。

callbackThreadPoolSize

共通

いいえ

コールバックスレッドプールのサイズです。デフォルト値:4。

retryIntervalMs

共通

いいえ

リトライ間隔です。単位:ミリ秒。デフォルト値:1000。

maxRetryTimes

共通

いいえ

最大リトライ回数です。デフォルト値:10。

bufferSize

一般

いいえ

結果テーブルにデータを書き込む前にバッファーに格納できるデータレコードの最大数です。デフォルト値:5000(バッファー内のデータレコード数が 5,000 に達すると、結果テーブルにデータが書き込まれます)。

batchWriteTimeoutMs

共通

いいえ

書き込みタイムアウト期間です。単位:ミリ秒。デフォルト値:5000(バッファー内のデータレコード数が bufferSize パラメーターで指定された値に達しない場合でも、5,000 ミリ秒経過すると、バッファー内のすべてのデータが結果テーブルに書き込まれます)。

batchSize

一般

いいえ

同時に結果テーブルに書き込めるデータレコード数です。デフォルト値:100。最大値:200。

ignoreDelete

共通

いいえ

削除操作によって生成されるリアルタイムデータを無視するかどうかを指定します。デフォルト値:false(削除操作によって生成されるリアルタイムデータを無視しません)。

重要

ソーステーブルとしてデータテーブルを使用する場合は、ビジネス要件に基づいてこのパラメーターを設定できます。

autoIncrementKey

データテーブル

いいえ

結果テーブルに自動採番主キー列が含まれている場合の、自動採番主キー列の名前です。結果テーブルに自動採番主キー列がない場合は、このパラメーターを設定する必要はありません。

重要

VVR 8.0.4 以降を使用する Realtime Compute for Apache Flink のみがこのパラメーターをサポートします。

overwriteMode

共通

いいえ

データ上書きモードです。有効な値:

  • PUT:Tablestore テーブルに PUT モードでデータを書き込みます。これがデフォルト値です。

  • UPDATE:Tablestore テーブルに UPDATE モードでデータを書き込みます。

説明

動的カラムモードでは、UPDATE モードのみがサポートされます。

defaultTimestampInMillisecond

一般

いいえ

データを Tablestore テーブルに書き込む際に使用されるデフォルトのタイムスタンプです。このパラメーターを空のままにすると、現在のシステム時刻のタイムスタンプが使用されます。

dynamicColumnSink

共通

いいえ

動的カラムモードを有効にするかどうかを指定します。デフォルト値:false(動的カラムモードは無効です)。

説明
  • 動的カラムモードは、テーブルにカラムが指定されておらず、デプロイメントステータスに基づいてデータカラムがテーブルに挿入されるシナリオに適しています。テーブル作成文では、最初のいくつかのカラムをプライマリキー列として指定する必要があります。最後から 2 番目のカラムの値はカラム名変数として使用され、最後のカラムの値はそのカラム名変数の値として使用されます。また、最後から 2 番目のカラムのデータの型は String である必要があります。

  • 動的カラムモードを有効にすると、自動採番主キー列機能はサポートされず、overwriteMode パラメーターを UPDATE に設定する必要があります。

checkSinkTableMeta

共通

いいえ

結果テーブルのメタデータをチェックするかどうかを指定します。デフォルト値:true(Tablestore テーブルのプライマリキーカラムがテーブル作成ステートメントで指定されたプライマリキーカラムと同じかどうかをシステムがチェックします)。

enableRequestCompression

共通

いいえ

データ書き込み中にデータ圧縮を有効にするかどうかを指定します。デフォルト値:false(データ書き込み中にデータ圧縮は無効です)。

データ型のマッピング

Realtime Compute for Apache Flink のフィールドデータ型

Tablestore のフィールドデータ型

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

SQL ステートメントのサンプル

ソーステーブルから結果テーブルへのデータ同期
データテーブルから時系列テーブルへのデータ同期

flink_source_table という名前のデータテーブルからデータを読み取り、flink_sink_table という名前の時系列テーブルにデータを書き込みます。

SQL ステートメントのサンプル:

-- ソーステーブル用の一時テーブル tablestore_stream を作成します。
CREATE TEMPORARY TABLE tablestore_stream(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- WITH 句のパラメーターを使用して結果テーブル用の一時テーブル tablestore_sink を作成します。
CREATE TEMPORARY TABLE tablestore_sink(
     measurement STRING,
     datasource STRING,
     tag_a STRING,
     `time` BIGINT,
     binary_value BINARY,
     bool_value BOOLEAN,
     double_value DOUBLE,
     long_value BIGINT,
     string_value STRING,
     tag_b STRING,
     tag_c STRING,
     tag_d STRING,
     tag_e STRING,
     tag_f STRING,
     PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
 ) WITH (
     'connector' = 'ots',
     'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
     'instanceName' = 'xxx',
     'tableName' = 'flink_sink_table',
     'accessId' = 'xxxxxxxxxxx',
     'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
     'storageType' = 'TIMESERIES',
     'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
 );
 
-- ソーステーブルから結果テーブルにデータを挿入します。
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
時系列テーブルからデータテーブルへのデータ同期

flink_source_table という名前の時系列テーブルからデータを読み取り、flink_sink_table という名前のデータテーブルにデータを書き込みます。

SQL ステートメントのサンプル:

-- ソーステーブル用の一時テーブル tablestore_stream を作成します。
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

-- 結果テーブル用の一時テーブル print_table を作成します。
CREATE TEMPORARY TABLE tablestore_target(
    measurement STRING,
    datasource STRING,
    tags STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY (measurement,datasource, tags, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='binary_value,bool_value,double_value,long_value,string_value'
);

-- ソーステーブルから結果テーブルにデータを挿入します。
INSERT INTO tablestore_target
SELECT
    _m_name,
    _data_source,
    _tags,
    _time,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from tablestore_stream;
ソーステーブルからデータを読み取り、Tablestore コンソールにデータを表示する

flink_source_table という名前のソーステーブルからデータをバッチで読み取ります。デプロイメントデバッグ機能 を使用して、デプロイメントの実行をシミュレートできます。デバッグ結果は SQL エディターの下部に表示されます。

SQL ステートメントのサンプル:

-- ソースデータテーブル用の一時テーブル tablestore_stream を作成します。
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- ソーステーブルからデータを読み取ります。
SELECT * FROM tablestore_stream LIMIT 100;
ソーステーブルからデータを読み取り、TaskManager ログにデータを表示する

flink_source_table という名前のソーステーブルからデータを読み取り、Print コネクタを使用して結果を TaskManager ログ に表示します。

SQL ステートメントのサンプル:

-- ソースデータテーブル用の一時テーブル tablestore_stream を作成します。
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- 結果テーブル用の一時テーブル print_table を作成します。
CREATE TEMPORARY TABLE print_table(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
  'connector' = 'print',   -- Print コネクタを使用します。
  'logger' = 'true'        -- 計算結果を Realtime Compute for Apache Flink の開発コンソールに表示します。
);

-- ソーステーブルのフィールドを表示します。
INSERT INTO print_table
SELECT `order`,orderid,customerid,customername from tablestore_stream;

付録 2:VVR 依存関係の設定

  1. VVR 依存関係 をダウンロードします。

  2. VVR 依存関係をアップロードします。

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. 対象のワークスペースの 操作 列で、コンソール をクリックします。

    3. 左側のナビゲーションウィンドウで、アーティファクト をクリックします。

    4. アーティファクト ページ で、アーティファクトのアップロード をクリックし、VVR 依存関係の JAR パッケージを選択します。

  3. 対象ジョブの SQL エディターの右側で、構成 タブをクリックします。追加依存関係 フィールドで、VVR 依存関係の JAR パッケージを選択します。