すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Tablestore コネクタ

最終更新日:Jul 04, 2025

このトピックでは、Tablestore コネクタの使用方法について説明します。

背景情報

Tablestore は、大量の構造化データの保存に最適化された、テーブルベースの低コストサーバーレスストレージサービスです。 Tablestore を使用すると、ミリ秒単位でオンラインデータをクエリおよび取得し、保存されたデータを多次元で分析できます。 Tablestore は、大量の請求書、インスタントメッセージング(IM)、IoT、車載インターネット(IoV)、リスク管理、インテリジェントレコメンデーションなど、さまざまなシナリオに適しています。 Tablestore は、IoT アプリケーション向けに高度に最適化されたエンドツーエンドのストレージソリューションも提供します。 詳細については、「Tablestore とは」をご参照ください。

次の表に、Tablestore コネクタでサポートされている機能を示します。

項目

説明

実行モード

ストリーミングモード

API タイプ

SQL API

テーブルタイプ

ソーステーブル、ディメンションテーブル、およびシンクテーブル

データ形式

該当なし

メトリック

  • ソーステーブルのメトリック: なし

  • ディメンションテーブルのメトリック: なし

  • シンクテーブルのメトリック:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

説明

メトリックの詳細については、「メトリック」をご参照ください。

シンクテーブルでのデータの更新または削除

サポートされています

前提条件

Tablestore インスタンスが購入され、Tablestore テーブルが作成されていること。 詳細については、「Tablestore の使用」をご参照ください。

構文

  • シンクテーブルを作成するためのステートメント

    CREATE TABLE ots_sink (
      name VARCHAR,
      age BIGINT,
      birthday BIGINT,
      primary key(name,age) not enforced
    ) WITH (
      'connector'='ots',
      'instanceName'='<yourInstanceName>',  // Tablestore インスタンス名
      'tableName'='<yourTableName>', // Tablestore テーブル名
      'accessId'='${ak_id}', // AccessKey ID
      'accessKey'='${ak_secret}', // AccessKey Secret
      'endPoint'='<yourEndpoint>', // エンドポイント
      'valueColumns'='birthday' // 挿入するカラム名
    );
    説明

    Tablestore シンクテーブルにはプライマリキーを指定する必要があります。 最新の出力データは Tablestore シンクテーブルに追加され、テーブルデータが更新されます。

  • ディメンションテーブルを作成するためのステートメント

    CREATE TABLE ots_dim (
      id int,
      len int,
      content STRING
    ) WITH (
      'connector'='ots',
      'endPoint'='<yourEndpoint>', // エンドポイント
      'instanceName'='<yourInstanceName>', // Tablestore インスタンス名
      'tableName'='<yourTableName>', // Tablestore テーブル名
      'accessId'='${ak_id}', // AccessKey ID
      'accessKey'='${ak_secret}' // AccessKey Secret
    );
  • ソーステーブルを作成するためのステートメント

    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR
    ) WITH (
      'connector'='ots',
      'endPoint' ='<yourEndpoint>', // エンドポイント
      'instanceName' = 'flink-source', // Tablestore インスタンス名
      'tableName' ='flink_source_table', // Tablestore テーブル名
      'tunnelName' = 'flinksourcestream', // Tunnel 名
      'accessId' ='${ak_id}', // AccessKey ID
      'accessKey' ='${ak_secret}', // AccessKey Secret
      'ignoreDelete' = 'false' // 削除操作を無視するかどうか
    );

    データの消費が必要なフィールド、および Tunnel Service の戻りデータにある OtsRecordType フィールドと OtsRecordTimestamp フィールドは、属性列として読み書きできます。 次の表に、フィールドを示します。

    フィールド

    Apache Flink 用 Realtime Compute でのマッピングフィールド

    説明

    OtsRecordType

    type

    データ操作タイプ。

    OtsRecordTimestamp

    timestamp

    データ操作時刻。 単位: マイクロ秒。

    説明

    全データを読み取る場合、OtsRecordTimestamp パラメータの値は 0 に設定されます。

    OtsRecordType フィールドと OtsRecordTimestamp フィールドを読み取る場合は、Realtime Compute for Apache Flink が提供する METADATA キーワードを使用して、Tablestore ソーステーブルから属性フィールドを取得できます。 次の例は、DDL ステートメントを示しています。

    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR,
      record_type STRING METADATA FROM 'type',  // レコードタイプ
      record_timestamp BIGINT METADATA FROM 'timestamp' // レコードタイムスタンプ
    ) WITH (
      ...
    );

WITH 句のコネクタオプション

  • 全般

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルのタイプ。

    String

    はい

    デフォルト値なし

    値を ots に設定します。

    instanceName

    Tablestore インスタンスの名前。

    String

    はい

    デフォルト値なし

    endPoint

    Tablestore インスタンスのエンドポイント。

    String

    はい

    デフォルト値なし

    詳細については、「エンドポイント」をご参照ください。

    tableName

    テーブルの名前

    String

    はい

    デフォルト値なし

    accessId

    Alibaba Cloud アカウントまたは Resource Access Management(RAM)ユーザーの AccessKey ID。

    String

    はい

    デフォルト値なし

    アカウントの AccessKey ペアを表示する方法を参照してください。

    重要

    AccessKey ペアを保護するために、AccessKey ペアをハードコードするのではなく、変数を使用してください。

    accessKey

    Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。

    String

    はい

    デフォルト値なし

    connectTimeout

    Tablestore コネクタが Tablestore に接続するためのタイムアウト期間。

    Integer

    いいえ

    30000

    単位: ミリ秒。

    socketTimeout

    Tablestore コネクタが Tablestore に接続するためのソケットタイムアウト期間。

    Integer

    いいえ

    30000

    単位: ミリ秒。

    ioThreadCount

    I/O スレッドの数。

    Integer

    いいえ

    4

    callbackThreadPoolSize

    コールバックスレッドプールのサイズ。

    Integer

    いいえ

    4

  • ソース固有

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    tunnelName

    Tablestore ソーステーブルのトンネル名。

    String

    はい

    デフォルト値なし

    事前に Tablestore コンソールでトンネルを作成する必要があります。 トンネルを作成する際には、トンネル名とトンネルタイプを指定します。 トンネルタイプは、増分、フル、または差分です。 トンネルの作成方法の詳細については、「クイックスタート」トピックの「トンネルの作成」セクションをご参照ください。

    ignoreDelete

    削除操作を無視するかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • true: 削除操作は無視されます。

    • false (デフォルト): 削除操作は無視されません。

    skipInvalidData

    ダーティデータを無視するかどうかを指定します。 ダーティデータが無視されない場合、システムがダーティデータを処理するときにエラーが報告されます。

    Boolean

    いいえ

    false

    有効な値:

    • true: ダーティデータは無視されます。

    • false (デフォルト): ダーティデータは無視されません。

    説明

    VVR 8.0.4 以降でのみ、このオプションがサポートされています。

    retryStrategy

    再試行ポリシー。

    Enum

    いいえ

    TIME

    有効な値:

    • TIME: retryTimeoutMs パラメータで指定されたタイムアウト期間が終了するまで、システムは継続的に再試行します。

    • COUNT: retryCount パラメータで指定された最大再試行回数に達するまで、システムは継続的に再試行します。

    retryCount

    最大再試行回数。

    Integer

    いいえ

    3

    retryStrategy パラメータを COUNT に設定した場合、このパラメータを指定できます。

    retryTimeoutMs

    再試行のタイムアウト期間。

    Integer

    いいえ

    180000

    retryStrategy パラメータを TIME に設定した場合、このパラメータを指定できます。 単位: ミリ秒。

    streamOriginColumnMapping

    元のカラム名と関連する実際のカラム名の間のマッピング。

    String

    いいえ

    デフォルト値なし

    元の列名と関連する実際の列名はコロン (:) で区切ります。 複数のマッピングはコンマ (,) で区切ります。 例: origin_col1:col1,origin_col2:col2

    outputSpecificRowType

    特定の行タイプを渡すかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • false: 特定の行タイプを渡しません。 すべてのデータは INSERT タイプです。

    • true: 特定の行タイプを渡します。 データは INSERT、DELETE、または UPDATE_AFTER タイプです。

    dataFetchTimeoutMs

    パーティションからデータをフェッチする最大期間。

    整数

    いいえ

    10000

    単位: ミリ秒。

    低レイテンシ要件で多数のパーティションを同期する場合、このオプション値を小さくして、全体の同期レイテンシを短縮します。

    説明

    このオプションは、VVR 8.0.10 以降でサポートされています。

    enableRequestCompression

    データ圧縮を有効にするかどうかを指定します。

    ブール値

    いいえ

    false

    このオプションを有効にすると、帯域幅を節約できますが、CPU 負荷が増加します。

    説明

    このオプションは、VVR 8.0.10 以降でサポートされています。

  • シンク固有

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    retryIntervalMs

    再試行間隔。

    Integer

    いいえ

    1000

    単位: ミリ秒。

    maxRetryTimes

    最大再試行回数。

    Integer

    いいえ

    10

    valueColumns

    挿入するカラムの名前。

    String

    はい

    デフォルト値なし

    ID フィールドや NAME フィールドなど、複数のフィールドをカンマ(,)で区切ります。

    bufferSize

    データがシンクテーブルに書き込まれる前にバッファに保存できるデータレコードの最大数。

    Integer

    いいえ

    5000

    batchWriteTimeoutMs

    書き込みタイムアウト期間。

    Integer

    いいえ

    5000

    単位: ミリ秒。 batchWriteTimeoutMs パラメーターで指定された期間内にキャッシュデータレコード数が上限に達しない場合、すべてのキャッシュデータはシンクテーブルに書き込まれます。

    batchSize

    一度に書き込むことができるデータレコードの数。

    Integer

    いいえ

    100

    最大値: 200。

    ignoreDelete

    削除操作を無視するかどうかを指定します。

    Boolean

    いいえ

    False

    該当なし。

    autoIncrementKey

    自動インクリメントプライマリキー列の名前。 シンクテーブルに自動インクリメントプライマリキー列が含まれている場合、このパラメーターを構成して、自動インクリメントプライマリキー列の名前を指定できます。

    String

    いいえ

    デフォルト値なし

    シンクテーブルに自動インクリメントプライマリキー列がない場合は、このパラメーターを構成する必要はありません。

    説明

    VVR 8.0.4 以降を使用する Apache Flink 用 Realtime Compute のみ、このパラメータをサポートしています。

    overwriteMode

    データ上書きモード。

    Enum

    いいえ

    PUT

    有効な値:

    • PUT: データは PUT モードで Tablestore テーブルに書き込まれます。

    • UPDATE: データは UPDATE モードで Tablestore テーブルに書き込まれます。

    説明

    動的カラムモードでは、UPDATE モードのみがサポートされています。

    defaultTimestampInMillisecond

    Tablestore テーブルにデータを書き込むために使用されるデフォルトのタイムスタンプ。

    Long

    いいえ

    -1

    このパラメータを指定しない場合、現在のシステム時刻のタイムスタンプが使用されます。

    dynamicColumnSink

    動的カラムモードを有効にするかどうかを指定します。

    Boolean

    いいえ

    false

    動的カラムモードは、テーブルでカラムが指定されておらず、デプロイ状況に基づいてカラムがテーブルに挿入されるシナリオに適しています。 最初のいくつかのカラムは、テーブル作成ステートメントでプライマリキーとして定義されます。 最後の 2 つのカラムの最初のカラムの値はカラム名として使用され、最後のカラムの値は前のカラムの値として使用され、最後の 2 つのカラムのデータ型は STRING である必要があります。

    説明

    動的カラムモードを有効にする場合、自動インクリメントプライマリキーカラムはサポートされておらず、overwriteMode パラメータを UPDATE に設定する必要があります。

    checkSinkTableMeta

    シンクテーブルのメタデータを確認するかどうかを指定します。

    Boolean

    いいえ

    true

    このパラメータを true に設定すると、システムは Tablestore テーブルのプライマリキーカラムがテーブル作成ステートメントで指定されたプライマリキーと同じかどうかを確認します。

    enableRequestCompression

    データ書き込み中にデータ圧縮を有効にするかどうかを指定します。

    Boolean

    いいえ

    false

    maxColumnsCount

    ダウンストリームテーブルに書き込まれる列の最大数。

    整数

    いいえ

    128

    このオプションが 128 より大きい値に設定されている場合、属性列の数が最大値を超えています というエラーが発生します。 これを解決するには、オプション値を調整します。

    説明

    このオプションは、8.0.10 以降でサポートされています。

    storageType

    シンクテーブルタイプ。

    文字列

    いいえ

    WIDE_COLUMN

    有効な値:

    • WIDE_COLUMN: シンクテーブルはワイドテーブルです。

    • TIMESERIES: シンクテーブルは時系列テーブルです。

  • ディメンションテーブル固有

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    retryIntervalMs

    再試行間隔。

    Integer

    いいえ

    1000

    単位: ミリ秒。

    maxRetryTimes

    最大再試行回数。

    Integer

    いいえ

    10

    該当なし。

    cache

    キャッシュポリシー。

    String

    いいえ

    ALL

    有効な値:

    • None: データはキャッシュされません。

    • LRU: ディメンションテーブルの特定のデータのみがキャッシュされます。 システムがデータレコードを受信するたびに、システムはキャッシュを検索します。 システムがキャッシュ内でレコードを見つけられない場合、システムは物理ディメンションテーブルでデータレコードを検索します。

      このキャッシュポリシーを使用する場合は、cacheSize パラメータと cacheTTLMs パラメータを設定する必要があります。

    • ALL (デフォルト): ディメンションテーブルのすべてのデータがキャッシュされます。 ジョブが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。 これにより、ディメンションテーブルの後続のすべてのクエリでキャッシュが検索されます。 キーが存在しない場合、システムはキャッシュ内でデータレコードを見つけられません。 システムは、キャッシュエントリが期限切れになった後、キャッシュ内のすべてのデータをリロードします。

      リモートテーブルのデータ量が少なく、多数の欠落キーが存在する場合は、このパラメータを ALL に設定することをお勧めします。 ソーステーブルとディメンションテーブルは、ON 句に基づいて関連付けることができません。 このキャッシュポリシーを使用する場合は、cacheTTLMs パラメータと cacheReloadTimeBlackList パラメータを設定する必要があります。

      説明

      cache パラメータを ALL に設定する場合は、システムがディメンションテーブルからデータを非同期にロードするため、テーブルを結合するためのノードのメモリを増やす必要があります。 増加したメモリサイズは、リモートテーブルの 2 倍です。

    cacheSize

    キャッシュできるデータレコードの最大数。

    Integer

    いいえ

    デフォルト値なし

    cache パラメータを LRU に設定した場合、このパラメータを指定できます。

    説明

    このパラメータの値は、キャッシュできるデータレコードの最大数です。

    cacheTTLMs

    キャッシュタイムアウト期間。

    Integer

    いいえ

    デフォルト値なし

    単位: ミリ秒。 cacheTTLMs パラメータの構成は、cache パラメータの値によって異なります。

    • cache パラメータを None に設定した場合、cacheTTLMs パラメータは空のままにすることができます。 これは、キャッシュエントリが期限切れにならないことを示します。

    • cache パラメータを LRU に設定した場合、cacheTTLMs パラメータはキャッシュのタイムアウト期間を指定します。 デフォルトでは、キャッシュエントリは期限切れになりません。

    • cache パラメータを ALL に設定した場合、cacheTTLMs パラメータはシステムがキャッシュを更新する間隔を指定します。 デフォルトでは、キャッシュはリロードされません。

    cacheEmpty

    空の結果をキャッシュするかどうかを指定します。

    Boolean

    いいえ

    デフォルト値なし

    • true: 空の結果はキャッシュされます。

    • false: 空の結果はキャッシュされません。

    cacheReloadTimeBlackList

    キャッシュが更新されない期間。 このパラメータは、cache パラメータが ALL に設定されている場合に有効になります。 このパラメータに指定した期間中は、キャッシュは更新されません。 このパラメータは、ダブル 11 などの大規模なオンラインプロモーションイベントに適しています。

    String

    いいえ

    デフォルト値なし

    次の例は、値の形式を示しています。2017-10-24 14:00 -> 2017-10-24 15:002017-11-10 23:30 -> 2017-11-11 08:00。 次のルールに基づいて区切り文字を使用します。

    • 複数の期間をカンマ(,)で区切ります。

    • 各期間の開始時刻と終了時刻を、ハイフン(-)と閉じ山かっこ(>)の組み合わせである矢印(->)で区切ります。

    async

    非同期モードでデータ同期を有効にするかどうかを指定します。

    Boolean

    いいえ

    false

    • true: 非同期モードでのデータ同期が有効になります。 デフォルトでは、非同期モードでデータを同期する場合、データはソートされません。

    • false (デフォルト): 非同期モードでのデータ同期は無効になります。

データ型マッピング

  • ソーステーブル

    Tablestore のフィールドのデータ型

    Apache Flink 用 Realtime Compute のフィールドのデータ型

    INTEGER

    BIGINT

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    DOUBLE

    DOUBLE

    BINARY

    BINARY

  • シンクテーブル

    Apache Flink 用 Realtime Compute のフィールドのデータ型

    Tablestore のフィールドのデータ型

    BINARY

    BINARY

    VARBINARY

    CHAR

    STRING

    VARCHAR

    TINYINT

    INTEGER

    SMALLINT

    INTEGER

    BIGINT

    FLOAT

    DOUBLE

    DOUBLE

    BOOLEAN

    BOOLEAN

例 1

Tablestore からデータを読み取り、Tablestore に書き込みます。

CREATE TEMPORARY TABLE tablestore_stream(
 `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH 
  'connector'='ots',
  'endPoint' ='<yourEndpoint>', // エンドポイント
  'instanceName' = 'flink-source', // Tablestore インスタンス名
  'tableName' ='flink_source_table', // Tablestore テーブル名
  'tunnelName' = 'flinksourcestream', // Tunnel 名
  'accessId' ='${ak_id}', // AccessKey ID
  'accessKey' ='${ak_secret}', // AccessKey Secret
  'ignoreDelete' = 'false', // 削除操作を無視するかどうか
  'skipInvalidData' ='false' // 無効なデータをスキップするかどうか
);

CREATE TEMPORARY TABLE ots_sink (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>', // エンドポイント
  'instanceName'='flink-sink', // Tablestore インスタンス名
  'tableName'='flink_sink_table', // Tablestore テーブル名
  'accessId'='${ak_id}', // AccessKey ID
  'accessKey'='${ak_secret}', // AccessKey Secret
  'valueColumns'='customerid,customername', // 挿入するカラム名
  'autoIncrementKey'='${auto_increment_primary_key_name}' // 自動インクリメントプライマリキーの名前
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

例 2

ワイドテーブルから時系列テーブルにデータを同期します。

CREATE TEMPORARY TABLE timeseries_source (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'iotstore-test',
    'tableName' = 'test_ots_timeseries_2',
    'tunnelName' = 'timeseries_source_tunnel_2',
    'accessId' = '${ak_id}',
    'accessKey' = '${ak_secret}',
    'ignoreDelete' = 'true', -- 削除を無視します
);
CREATE TEMPORARY TABLE timeseries_sink (
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'iotstore-test',
    'tableName' = 'test_timeseries_sink_table_2',
    'accessId' = '${ak_id}',
    'accessKey' = '${ak_secret}',
    'storageType' = 'TIMESERIES',
);

-- ソーステーブルからシンクテーブルにデータを挿入します
INSERT INTO timeseries_sink
    select 
        m_name,
        data_source,
        MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,
        `time`,
        cpu_sys,
        cpu_user,
        disk_0,
        disk_1,
        disk_2,
        memory_used,
        net_in,
        net_out 
    from
        timeseries_source;