このトピックでは、Realtime Compute for Apache Flink を使用して Tablestore データを計算する方法について説明します。Tablestore 内のデータテーブルまたは時系列テーブルは、Realtime Compute for Apache Flink を使用したデータ処理のソーステーブルまたは結果テーブルとして使用できます。
前提条件
Tablestore が有効化され、インスタンスが作成されていること。詳細については、「Tablestore を有効化し、インスタンスを作成する」をご参照ください。
ソーステーブルと結果テーブルが作成されていること。ソーステーブル用にトンネルが作成されていること。詳細については、「データテーブルの操作」、「時系列テーブルの操作」、および「トンネルを作成する」をご参照ください。
Realtime Compute for Apache Flink ワークスペースが作成されます。詳細については、「Realtime Compute for Apache Flink をアクティブ化する」をご参照ください。
重要Realtime Compute for Apache Flink ワークスペースと Tablestore インスタンスは、同じリージョンに存在する必要があります。Realtime Compute for Apache Flink でサポートされているリージョンについては、「リージョン」をご参照ください。
AccessKey ペアが取得されます。
重要セキュリティ上の理由から、Tablestore の機能は Resource Access Management (RAM) ユーザーとして使用することをお勧めします。詳細については、「RAM ユーザーの AccessKey ペアを使用して Tablestore にアクセスする」をご参照ください。
リアルタイムコンピューティングジョブを開発する
ステップ 1:SQL ドラフトを作成する
ドラフト作成ページに移動します。
Realtime Compute for Apache Flink コンソール にログインします。
管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
[新規] をクリックします。[新規ドラフト] ダイアログボックスで、[空のストリームドラフト] を選択し、[次へ] をクリックします。
説明Realtime Compute for Apache Flink はさまざまなコード テンプレートを提供し、データ同期をサポートしています。各コード テンプレートは特定のシナリオに適しており、コード サンプルと手順を提供します。テンプレートをクリックすると、Realtime Compute for Apache Flink の機能と関連構文について学習し、ビジネス ロジックを実装できます。詳細については、「コード テンプレート」および「データ同期テンプレート」をご参照ください。
ドラフトのパラメータを構成します。 次の表にパラメータを示します。
パラメータ
説明
例
名前
作成するドラフトの名前。
説明ドラフト名は現在のプロジェクト内で一意である必要があります。
flink-test
場所
ドラフトのコードファイルが保存されるフォルダ。
既存のフォルダの右側にある
アイコンをクリックして、サブフォルダを作成することもできます。ドラフト
エンジンバージョン
現在のドラフトで使用させる Flink エンジンのバージョン。エンジンバージョンの詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。
vvr-8.0.10-flink-1.17
[作成] をクリックします。
ステップ 2:ドラフトのコードを記述する
このステップの例では、データテーブルから別のデータテーブルにデータを同期するためのコードが記述されています。その他のサンプル SQL 文については、「サンプル SQL 文」をご参照ください。
ソーステーブルと結果テーブルのための一時テーブルを作成します。
詳細については、「付録 1:Tablestore コネクタ」をご参照ください。
-- ソーステーブル用に tablestore_stream という名前の一時テーブルを作成します。 CREATE TEMPORARY TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector' = 'ots', -- ソーステーブルのコネクタタイプを指定します。値は ots で、変更できません。 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tablestore インスタンスの VPC エンドポイントを指定します。 'instanceName' = 'xxx', -- Tablestore インスタンスの名前を指定します。 'tableName' = 'flink_source_table', -- ソーステーブルの名前を指定します。 'tunnelName' = 'flink_source_tunnel', -- ソーステーブル用に作成されたトンネルの名前を指定します。 'accessId' = 'xxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。 'ignoreDelete' = 'false' -- 削除操作によって生成されたリアルタイムデータを無視するかどうかを指定します。この例では、このパラメータは false に設定されています。 ); -- 結果テーブル用に tablestore_sink という名前の一時テーブルを作成します。 CREATE TEMPORARY TABLE tablestore_sink( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, PRIMARY KEY (`order`,orderid) NOT ENFORCED -- プライマリキーを指定します。 ) WITH ( 'connector' = 'ots', -- 結果テーブルのコネクタタイプを指定します。値は ots で、変更できません。 'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tablestore インスタンスの VPC エンドポイントを指定します。 'instanceName' = 'xxx', -- Tablestore インスタンスの名前を指定します。 'tableName' = 'flink_sink_table', -- 結果テーブルの名前を指定します。 'accessId' = 'xxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。 'valueColumns'='customerid,customername' -- 結果テーブルに挿入する列の名前を指定します。 );ドラフトロジックを記述します。
次のサンプル SQL 文は、ソーステーブルから結果テーブルにデータを挿入する方法の例を示しています。
-- ソーステーブルから結果テーブルにデータを挿入します。 INSERT INTO tablestore_sink SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
ステップ 3:(オプション)構成情報を表示する
SQL エディタの右側のタブで、構成を表示したり、パラメータを構成したりできます。次の表にパラメータを示します。
タブ名 | 説明 |
[構成] |
|
[構造] |
|
[バージョン] | ドラフトのエンジンバージョンを表示できます。「ドラフトバージョン」パネルの [アクション] 列で実行できる操作の詳細については、「デプロイバージョンを管理する」をご参照ください。 |
ステップ 4:(オプション)構文チェックを実行する
ドラフトの SQL セマンティクス、ネットワーク接続、およびドラフトで使用されるテーブルのメタデータ情報を確認します。計算結果の [SQL アドバイス] をクリックして、SQL リスクと関連する最適化の提案に関する情報を表示することもできます。
SQL エディタの右上隅にある [検証] をクリックします。
[検証] ダイアログボックスで、[確認] をクリックします。
ステップ 5:(オプション)ドラフトをデバッグする
デバッグ機能を使用して、デプロイ実行をシミュレートし、出力を確認し、SELECT 文と INSERT 文のビジネスロジックを検証できます。この機能は開発効率を向上させ、データ品質低下のリスクを軽減します。
SQL エディタの右上隅にある [デバッグ] をクリックします。
[デバッグ] ダイアログボックスで、デバッグするクラスタを選択し、[次へ] をクリックします。
使用可能なクラスタがない場合は、セッションクラスタを作成します。セッションクラスタが SQL ドラフトと同じエンジンバージョンを使用し、セッションクラスタが実行されていることを確認します。詳細については、「セッションクラスタを作成する」をご参照ください。
デバッグデータを構成します。
オンラインデータを使用する場合は、この操作をスキップします。
デバッグデータを使用する場合は、[モックデータテンプレートをダウンロード] をクリックし、テンプレートにデバッグデータを入力してから、[モックデータをアップロード] をクリックしてデバッグデータをアップロードします。詳細については、「ドラフトをデバッグする」をご参照ください。
[確認] をクリックします。
ステップ 6:ドラフトをデプロイする
SQL エディターの右上隅にある、[デプロイ] をクリックします。[ドラフトのデプロイ] ダイアログボックスで、関連パラメーターを構成し、[確認] をクリックします。
セッション クラスターは、開発環境やテスト環境などの非運用環境に適しています。セッション クラスターにドラフトをデプロイまたはデバッグして、JobManager のリソース使用率を向上させ、デプロイメントの起動を高速化できます。ただし、運用環境のドラフトをセッション クラスターにデプロイすることはお勧めしません。そうしないと、安定性の問題が発生する可能性があります。
ステップ 7:ドラフトのデプロイを開始し、計算結果を表示する
左側のナビゲーションウィンドウで、 を選択します。
開始するジョブを見つけ、[アクション] 列の [開始] をクリックします。
[ジョブの開始] パネルで、[初期モード] を選択し、[開始] をクリックします。デプロイ ステータスが [実行中] に変わると、デプロイは想定どおりに実行されます。デプロイの開始時に構成する必要があるパラメーターの詳細については、「デプロイメントの開始」をご参照ください。
説明Realtime Compute for Apache Flink の各 TaskManager に 2 つの CPU コアと 4 GB のメモリを構成して、各 TaskManager の計算能力を最大限に活用することをお勧めします。1 つの TaskManager は 1 秒あたり 10,000 行を書き込むことができます。
ソーステーブルのパーティション数が多い場合は、Realtime Compute for Apache Flink で並列度を 16 未満に設定することをお勧めします。書き込み速度は並列度に比例して線形的に増加します。
「デプロイメント」ページで、計算結果を表示します。
左側のナビゲーションウィンドウで、 を選択します。「デプロイメント」ページで、管理するデプロイの名前をクリックします。
[ログ] タブで、[実行中のタスクマネージャ] タブをクリックし、[パス、ID] 列の値をクリックします。
[ログ] をクリックします。「ログ」タブで、関連するログ情報を表示します。
(オプション)デプロイをキャンセルします。
デプロイの SQL コードを変更したり、WITH 句にパラメータを追加または削除したり、デプロイのバージョンを変更したりする場合は、デプロイのドラフトをデプロイし、デプロイをキャンセルしてから、変更を有効にするためにデプロイを開始する必要があります。デプロイが失敗し、状態データを再利用して回復できない場合、または動的に有効にならないパラメータ設定を更新する場合は、デプロイをキャンセルしてから再起動する必要があります。デプロイのキャンセル方法の詳細については、「デプロイをキャンセルする」をご参照ください。