このトピックでは、Tablestore コネクタの使用方法について説明します。
背景情報
Tablestore は、大量の構造化データの保存に最適化された、テーブルベースの低コストサーバーレスストレージサービスです。 Tablestore を使用すると、ミリ秒単位でオンラインデータをクエリおよび取得し、保存されたデータを多次元で分析できます。 Tablestore は、大量の請求書、インスタントメッセージング(IM)、IoT、車載インターネット(IoV)、リスク管理、インテリジェントレコメンデーションなど、さまざまなシナリオに適しています。 Tablestore は、IoT アプリケーション向けに高度に最適化されたエンドツーエンドのストレージソリューションも提供します。 詳細については、「Tablestore とは」をご参照ください。
次の表に、Tablestore コネクタでサポートされている機能を示します。
項目 | 説明 |
実行モード | ストリーミングモード |
API タイプ | SQL API |
テーブルタイプ | ソーステーブル、ディメンションテーブル、およびシンクテーブル |
データ形式 | 該当なし |
メトリック |
説明 メトリックの詳細については、「メトリック」をご参照ください。 |
シンクテーブルでのデータの更新または削除 | サポートされています |
前提条件
Tablestore インスタンスが購入され、Tablestore テーブルが作成されていること。 詳細については、「Tablestore の使用」をご参照ください。
構文
シンクテーブルを作成するためのステートメント
CREATE TABLE ots_sink ( name VARCHAR, age BIGINT, birthday BIGINT, primary key(name,age) not enforced ) WITH ( 'connector'='ots', 'instanceName'='<yourInstanceName>', // Tablestore インスタンス名 'tableName'='<yourTableName>', // Tablestore テーブル名 'accessId'='${ak_id}', // AccessKey ID 'accessKey'='${ak_secret}', // AccessKey Secret 'endPoint'='<yourEndpoint>', // エンドポイント 'valueColumns'='birthday' // 挿入するカラム名 );説明Tablestore シンクテーブルにはプライマリキーを指定する必要があります。 最新の出力データは Tablestore シンクテーブルに追加され、テーブルデータが更新されます。
ディメンションテーブルを作成するためのステートメント
CREATE TABLE ots_dim ( id int, len int, content STRING ) WITH ( 'connector'='ots', 'endPoint'='<yourEndpoint>', // エンドポイント 'instanceName'='<yourInstanceName>', // Tablestore インスタンス名 'tableName'='<yourTableName>', // Tablestore テーブル名 'accessId'='${ak_id}', // AccessKey ID 'accessKey'='${ak_secret}' // AccessKey Secret );ソーステーブルを作成するためのステートメント
CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='<yourEndpoint>', // エンドポイント 'instanceName' = 'flink-source', // Tablestore インスタンス名 'tableName' ='flink_source_table', // Tablestore テーブル名 'tunnelName' = 'flinksourcestream', // Tunnel 名 'accessId' ='${ak_id}', // AccessKey ID 'accessKey' ='${ak_secret}', // AccessKey Secret 'ignoreDelete' = 'false' // 削除操作を無視するかどうか );データの消費が必要なフィールド、および Tunnel Service の戻りデータにある
OtsRecordTypeフィールドとOtsRecordTimestampフィールドは、属性列として読み書きできます。 次の表に、フィールドを示します。フィールド
Apache Flink 用 Realtime Compute でのマッピングフィールド
説明
OtsRecordType
type
データ操作タイプ。
OtsRecordTimestamp
timestamp
データ操作時刻。 単位: マイクロ秒。
説明全データを読み取る場合、OtsRecordTimestamp パラメータの値は 0 に設定されます。
OtsRecordTypeフィールドとOtsRecordTimestampフィールドを読み取る場合は、Realtime Compute for Apache Flink が提供する METADATA キーワードを使用して、Tablestore ソーステーブルから属性フィールドを取得できます。 次の例は、DDL ステートメントを示しています。CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, record_type STRING METADATA FROM 'type', // レコードタイプ record_timestamp BIGINT METADATA FROM 'timestamp' // レコードタイムスタンプ ) WITH ( ... );
WITH 句のコネクタオプション
全般
オプション
説明
データ型
必須
デフォルト値
備考
connector
テーブルのタイプ。
String
はい
デフォルト値なし
値を
otsに設定します。instanceName
Tablestore インスタンスの名前。
String
はい
デフォルト値なし
endPoint
Tablestore インスタンスのエンドポイント。
String
はい
デフォルト値なし
詳細については、「エンドポイント」をご参照ください。
tableName
テーブルの名前
String
はい
デフォルト値なし
accessId
Alibaba Cloud アカウントまたは Resource Access Management(RAM)ユーザーの AccessKey ID。
String
はい
デフォルト値なし
アカウントの AccessKey ペアを表示する方法を参照してください。
重要AccessKey ペアを保護するために、AccessKey ペアをハードコードするのではなく、変数を使用してください。
accessKey
Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。
String
はい
デフォルト値なし
connectTimeout
Tablestore コネクタが Tablestore に接続するためのタイムアウト期間。
Integer
いいえ
30000
単位: ミリ秒。
socketTimeout
Tablestore コネクタが Tablestore に接続するためのソケットタイムアウト期間。
Integer
いいえ
30000
単位: ミリ秒。
ioThreadCount
I/O スレッドの数。
Integer
いいえ
4
callbackThreadPoolSize
コールバックスレッドプールのサイズ。
Integer
いいえ
4
ソース固有
オプション
説明
データ型
必須
デフォルト値
備考
tunnelName
Tablestore ソーステーブルのトンネル名。
String
はい
デフォルト値なし
事前に Tablestore コンソールでトンネルを作成する必要があります。 トンネルを作成する際には、トンネル名とトンネルタイプを指定します。 トンネルタイプは、増分、フル、または差分です。 トンネルの作成方法の詳細については、「クイックスタート」トピックの「トンネルの作成」セクションをご参照ください。
ignoreDelete
削除操作を無視するかどうかを指定します。
Boolean
いいえ
false
有効な値:
true: 削除操作は無視されます。
false (デフォルト): 削除操作は無視されません。
skipInvalidData
ダーティデータを無視するかどうかを指定します。 ダーティデータが無視されない場合、システムがダーティデータを処理するときにエラーが報告されます。
Boolean
いいえ
false
有効な値:
true: ダーティデータは無視されます。
false (デフォルト): ダーティデータは無視されません。
説明VVR 8.0.4 以降でのみ、このオプションがサポートされています。
retryStrategy
再試行ポリシー。
Enum
いいえ
TIME
有効な値:
TIME: retryTimeoutMs パラメータで指定されたタイムアウト期間が終了するまで、システムは継続的に再試行します。
COUNT: retryCount パラメータで指定された最大再試行回数に達するまで、システムは継続的に再試行します。
retryCount
最大再試行回数。
Integer
いいえ
3
retryStrategy パラメータを COUNT に設定した場合、このパラメータを指定できます。
retryTimeoutMs
再試行のタイムアウト期間。
Integer
いいえ
180000
retryStrategy パラメータを TIME に設定した場合、このパラメータを指定できます。 単位: ミリ秒。
streamOriginColumnMapping
元のカラム名と関連する実際のカラム名の間のマッピング。
String
いいえ
デフォルト値なし
元の列名と関連する実際の列名はコロン (:) で区切ります。 複数のマッピングはコンマ (,) で区切ります。 例:
origin_col1:col1,origin_col2:col2。outputSpecificRowType
特定の行タイプを渡すかどうかを指定します。
Boolean
いいえ
false
有効な値:
false: 特定の行タイプを渡しません。 すべてのデータは INSERT タイプです。
true: 特定の行タイプを渡します。 データは INSERT、DELETE、または UPDATE_AFTER タイプです。
dataFetchTimeoutMs
パーティションからデータをフェッチする最大期間。
整数
いいえ
10000
単位: ミリ秒。
低レイテンシ要件で多数のパーティションを同期する場合、このオプション値を小さくして、全体の同期レイテンシを短縮します。
説明このオプションは、VVR 8.0.10 以降でサポートされています。
enableRequestCompression
データ圧縮を有効にするかどうかを指定します。
ブール値
いいえ
false
このオプションを有効にすると、帯域幅を節約できますが、CPU 負荷が増加します。
説明このオプションは、VVR 8.0.10 以降でサポートされています。
シンク固有
オプション
説明
データ型
必須
デフォルト値
備考
retryIntervalMs
再試行間隔。
Integer
いいえ
1000
単位: ミリ秒。
maxRetryTimes
最大再試行回数。
Integer
いいえ
10
valueColumns
挿入するカラムの名前。
String
はい
デフォルト値なし
ID フィールドや NAME フィールドなど、複数のフィールドをカンマ(,)で区切ります。
bufferSize
データがシンクテーブルに書き込まれる前にバッファに保存できるデータレコードの最大数。
Integer
いいえ
5000
batchWriteTimeoutMs
書き込みタイムアウト期間。
Integer
いいえ
5000
単位: ミリ秒。 batchWriteTimeoutMs パラメーターで指定された期間内にキャッシュデータレコード数が上限に達しない場合、すべてのキャッシュデータはシンクテーブルに書き込まれます。
batchSize
一度に書き込むことができるデータレコードの数。
Integer
いいえ
100
最大値: 200。
ignoreDelete
削除操作を無視するかどうかを指定します。
Boolean
いいえ
False
該当なし。
autoIncrementKey
自動インクリメントプライマリキー列の名前。 シンクテーブルに自動インクリメントプライマリキー列が含まれている場合、このパラメーターを構成して、自動インクリメントプライマリキー列の名前を指定できます。
String
いいえ
デフォルト値なし
シンクテーブルに自動インクリメントプライマリキー列がない場合は、このパラメーターを構成する必要はありません。
説明VVR 8.0.4 以降を使用する Apache Flink 用 Realtime Compute のみ、このパラメータをサポートしています。
overwriteMode
データ上書きモード。
Enum
いいえ
PUT
有効な値:
PUT: データは PUT モードで Tablestore テーブルに書き込まれます。
UPDATE: データは UPDATE モードで Tablestore テーブルに書き込まれます。
説明動的カラムモードでは、UPDATE モードのみがサポートされています。
defaultTimestampInMillisecond
Tablestore テーブルにデータを書き込むために使用されるデフォルトのタイムスタンプ。
Long
いいえ
-1
このパラメータを指定しない場合、現在のシステム時刻のタイムスタンプが使用されます。
dynamicColumnSink
動的カラムモードを有効にするかどうかを指定します。
Boolean
いいえ
false
動的カラムモードは、テーブルでカラムが指定されておらず、デプロイ状況に基づいてカラムがテーブルに挿入されるシナリオに適しています。 最初のいくつかのカラムは、テーブル作成ステートメントでプライマリキーとして定義されます。 最後の 2 つのカラムの最初のカラムの値はカラム名として使用され、最後のカラムの値は前のカラムの値として使用され、最後の 2 つのカラムのデータ型は STRING である必要があります。
説明動的カラムモードを有効にする場合、自動インクリメントプライマリキーカラムはサポートされておらず、overwriteMode パラメータを UPDATE に設定する必要があります。
checkSinkTableMeta
シンクテーブルのメタデータを確認するかどうかを指定します。
Boolean
いいえ
true
このパラメータを true に設定すると、システムは Tablestore テーブルのプライマリキーカラムがテーブル作成ステートメントで指定されたプライマリキーと同じかどうかを確認します。
enableRequestCompression
データ書き込み中にデータ圧縮を有効にするかどうかを指定します。
Boolean
いいえ
false
maxColumnsCount
ダウンストリームテーブルに書き込まれる列の最大数。
整数
いいえ
128
このオプションが 128 より大きい値に設定されている場合、
属性列の数が最大値を超えていますというエラーが発生します。 これを解決するには、オプション値を調整します。説明このオプションは、8.0.10 以降でサポートされています。
storageType
シンクテーブルタイプ。
文字列
いいえ
WIDE_COLUMN有効な値:
WIDE_COLUMN: シンクテーブルはワイドテーブルです。TIMESERIES: シンクテーブルは時系列テーブルです。
ディメンションテーブル固有
オプション
説明
データ型
必須
デフォルト値
備考
retryIntervalMs
再試行間隔。
Integer
いいえ
1000
単位: ミリ秒。
maxRetryTimes
最大再試行回数。
Integer
いいえ
10
該当なし。
cache
キャッシュポリシー。
String
いいえ
ALL
有効な値:
None: データはキャッシュされません。
LRU: ディメンションテーブルの特定のデータのみがキャッシュされます。 システムがデータレコードを受信するたびに、システムはキャッシュを検索します。 システムがキャッシュ内でレコードを見つけられない場合、システムは物理ディメンションテーブルでデータレコードを検索します。
このキャッシュポリシーを使用する場合は、cacheSize パラメータと cacheTTLMs パラメータを設定する必要があります。
ALL (デフォルト): ディメンションテーブルのすべてのデータがキャッシュされます。 ジョブが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。 これにより、ディメンションテーブルの後続のすべてのクエリでキャッシュが検索されます。 キーが存在しない場合、システムはキャッシュ内でデータレコードを見つけられません。 システムは、キャッシュエントリが期限切れになった後、キャッシュ内のすべてのデータをリロードします。
リモートテーブルのデータ量が少なく、多数の欠落キーが存在する場合は、このパラメータを ALL に設定することをお勧めします。 ソーステーブルとディメンションテーブルは、ON 句に基づいて関連付けることができません。 このキャッシュポリシーを使用する場合は、cacheTTLMs パラメータと cacheReloadTimeBlackList パラメータを設定する必要があります。
説明cache パラメータを ALL に設定する場合は、システムがディメンションテーブルからデータを非同期にロードするため、テーブルを結合するためのノードのメモリを増やす必要があります。 増加したメモリサイズは、リモートテーブルの 2 倍です。
cacheSize
キャッシュできるデータレコードの最大数。
Integer
いいえ
デフォルト値なし
cache パラメータを LRU に設定した場合、このパラメータを指定できます。
説明このパラメータの値は、キャッシュできるデータレコードの最大数です。
cacheTTLMs
キャッシュタイムアウト期間。
Integer
いいえ
デフォルト値なし
単位: ミリ秒。 cacheTTLMs パラメータの構成は、cache パラメータの値によって異なります。
cache パラメータを None に設定した場合、cacheTTLMs パラメータは空のままにすることができます。 これは、キャッシュエントリが期限切れにならないことを示します。
cache パラメータを LRU に設定した場合、cacheTTLMs パラメータはキャッシュのタイムアウト期間を指定します。 デフォルトでは、キャッシュエントリは期限切れになりません。
cache パラメータを ALL に設定した場合、cacheTTLMs パラメータはシステムがキャッシュを更新する間隔を指定します。 デフォルトでは、キャッシュはリロードされません。
cacheEmpty
空の結果をキャッシュするかどうかを指定します。
Boolean
いいえ
デフォルト値なし
true: 空の結果はキャッシュされます。
false: 空の結果はキャッシュされません。
cacheReloadTimeBlackList
キャッシュが更新されない期間。 このパラメータは、cache パラメータが ALL に設定されている場合に有効になります。 このパラメータに指定した期間中は、キャッシュは更新されません。 このパラメータは、ダブル 11 などの大規模なオンラインプロモーションイベントに適しています。
String
いいえ
デフォルト値なし
次の例は、値の形式を示しています。2017-10-24 14:00 -> 2017-10-24 15:00、2017-11-10 23:30 -> 2017-11-11 08:00。 次のルールに基づいて区切り文字を使用します。
複数の期間をカンマ(,)で区切ります。
各期間の開始時刻と終了時刻を、ハイフン(-)と閉じ山かっこ(>)の組み合わせである矢印(->)で区切ります。
async
非同期モードでデータ同期を有効にするかどうかを指定します。
Boolean
いいえ
false
true: 非同期モードでのデータ同期が有効になります。 デフォルトでは、非同期モードでデータを同期する場合、データはソートされません。
false (デフォルト): 非同期モードでのデータ同期は無効になります。
データ型マッピング
ソーステーブル
Tablestore のフィールドのデータ型
Apache Flink 用 Realtime Compute のフィールドのデータ型
INTEGER
BIGINT
STRING
STRING
BOOLEAN
BOOLEAN
DOUBLE
DOUBLE
BINARY
BINARY
シンクテーブル
Apache Flink 用 Realtime Compute のフィールドのデータ型
Tablestore のフィールドのデータ型
BINARY
BINARY
VARBINARY
CHAR
STRING
VARCHAR
TINYINT
INTEGER
SMALLINT
INTEGER
BIGINT
FLOAT
DOUBLE
DOUBLE
BOOLEAN
BOOLEAN
例
例 1
Tablestore からデータを読み取り、Tablestore に書き込みます。
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH
'connector'='ots',
'endPoint' ='<yourEndpoint>', // エンドポイント
'instanceName' = 'flink-source', // Tablestore インスタンス名
'tableName' ='flink_source_table', // Tablestore テーブル名
'tunnelName' = 'flinksourcestream', // Tunnel 名
'accessId' ='${ak_id}', // AccessKey ID
'accessKey' ='${ak_secret}', // AccessKey Secret
'ignoreDelete' = 'false', // 削除操作を無視するかどうか
'skipInvalidData' ='false' // 無効なデータをスキップするかどうか
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>', // エンドポイント
'instanceName'='flink-sink', // Tablestore インスタンス名
'tableName'='flink_sink_table', // Tablestore テーブル名
'accessId'='${ak_id}', // AccessKey ID
'accessKey'='${ak_secret}', // AccessKey Secret
'valueColumns'='customerid,customername', // 挿入するカラム名
'autoIncrementKey'='${auto_increment_primary_key_name}' // 自動インクリメントプライマリキーの名前
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;例 2
ワイドテーブルから時系列テーブルにデータを同期します。
CREATE TEMPORARY TABLE timeseries_source (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'iotstore-test',
'tableName' = 'test_ots_timeseries_2',
'tunnelName' = 'timeseries_source_tunnel_2',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'ignoreDelete' = 'true', -- 削除を無視します
);
CREATE TEMPORARY TABLE timeseries_sink (
measurement STRING,
datasource STRING,
tags Map<String, String>,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING,
PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'iotstore-test',
'tableName' = 'test_timeseries_sink_table_2',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'storageType' = 'TIMESERIES',
);
-- ソーステーブルからシンクテーブルにデータを挿入します
INSERT INTO timeseries_sink
select
m_name,
data_source,
MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,
`time`,
cpu_sys,
cpu_user,
disk_0,
disk_1,
disk_2,
memory_used,
net_in,
net_out
from
timeseries_source;