このトピックでは、Realtime Compute for Apache Flink を使用して Tablestore データを処理する方法について説明します。Tablestore のデータテーブルまたは時系列テーブルを、Realtime Compute for Apache Flink によるデータ処理のソーステーブルまたは結果テーブルとして使用できます。
前提条件
Tablestore が有効化され、インスタンスが作成されていること。詳細については、「Tablestore の有効化とインスタンスの作成」をご参照ください。
ソーステーブル、結果テーブル、およびソーステーブル用のトンネルが作成済みであること。詳細については、「データテーブルの操作」、「時系列テーブルの操作」、および「トンネルの作成」をご参照ください。
Realtime Compute for Apache Flink のワークスペースが作成済みであること。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。
重要Realtime Compute for Apache Flink のワークスペースと Tablestore インスタンスは、同じリージョンに配置されている必要があります。Realtime Compute for Apache Flink がサポートしているリージョンについては、「リージョン」をご参照ください。
AccessKey ペアを取得済みであること。
重要セキュリティ上の理由から、Resource Access Management (RAM) ユーザーとして Tablestore 機能を使用することを推奨します。詳細については、「RAM ユーザーの AccessKey ペアを使用して Tablestore にアクセスする」をご参照ください。
リアルタイムコンピューティングジョブの開発
ステップ 1:SQL ドラフトの作成
ドラフト作成ページに移動します。
Realtime Compute for Apache Flink コンソール にログインします。
対象のワークスペースの 操作 列で、コンソール をクリックします。
左側のナビゲーションウィンドウで、開発 > ETL をクリックします。
新規作成 をクリックします。新規ドラフト ダイアログボックスで、空のストリームドラフト を選択し、次へ をクリックします。
説明Realtime Compute for Apache Flink には、さまざまなコードテンプレートが用意されており、データ同期もサポートしています。各コードテンプレートは特定のシナリオに適しており、コードサンプルと Realtime Compute for Apache Flink の機能および関連構文に関する説明が提供されています。テンプレートをクリックして、ビジネスロジックを実装できます。詳細については、「コードテンプレート」および「データ同期テンプレート」をご参照ください。
ジョブ情報 を入力します。
パラメーター
説明
例
ファイル名
作成するドラフトの名前です。
説明ドラフト名は現在のプロジェクト内で一意である必要があります。
flink-test
保存場所
ドラフトのコードファイルを保存するフォルダです。
既存のフォルダの右側にある
アイコンをクリックして、サブフォルダを作成することもできます。Draft
エンジンバージョン
現在のドラフトで使用する Flink エンジンのバージョンです。エンジンバージョンの詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。
vvr-8.0.10-flink-1.17
作成 をクリックします。
ステップ 2:SQL ドラフトの記述
このステップの例では、データテーブルから別のデータテーブルへのデータ同期を行うコードを記述します。その他の SQL ステートメントのサンプルについては、「SQL ステートメントのサンプル」をご参照ください。
ソーステーブルおよび結果テーブル用の一時テーブルを作成します。
詳細については、「付録 1:Tablestore コネクタ」をご参照ください。
-- ソーステーブル用の一時テーブル tablestore_stream を作成します。 CREATE TEMPORARY TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector' = 'ots', -- ソーステーブルのコネクタタイプを指定します。値は ots で、変更できません。 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tablestore インスタンスの VPC エンドポイントを指定します。 'instanceName' = 'xxx', -- Tablestore インスタンスの名前を指定します。 'tableName' = 'flink_source_table', -- ソーステーブルの名前を指定します。 'tunnelName' = 'flink_source_tunnel', -- ソーステーブル用に作成されたトンネルの名前を指定します。 'accessId' = 'xxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret を指定します。 'ignoreDelete' = 'false' -- 削除操作によって生成されるリアルタイムデータを無視するかどうかを指定します。この例では false に設定されています。 ); -- 結果テーブル用の一時テーブル tablestore_sink を作成します。 CREATE TEMPORARY TABLE tablestore_sink( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, PRIMARY KEY (`order`,orderid) NOT ENFORCED -- プライマリキーを指定します。 ) WITH ( 'connector' = 'ots', -- 結果テーブルのコネクタタイプを指定します。値は ots で、変更できません。 'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Tablestore インスタンスの VPC エンドポイントを指定します。 'instanceName' = 'xxx', -- Tablestore インスタンスの名前を指定します。 'tableName' = 'flink_sink_table', -- 結果テーブルの名前を指定します。 'accessId' = 'xxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret を指定します。 'valueColumns'='customerid,customername' -- 結果テーブルに挿入する列の名前を指定します。 );ドラフトのロジックを記述します。
次の SQL ステートメントのサンプルは、ソーステーブルから結果テーブルにデータを挿入する方法の例です。
-- ソーステーブルから結果テーブルにデータを挿入します。 INSERT INTO tablestore_sink SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
ステップ 3:(オプション)構成情報の確認
SQL エディターの右側のタブで、構成を確認またはパラメーターを設定できます。次の表にパラメーターを示します。
タブ名 | 説明 |
構成 |
|
構造 |
|
バージョン | ドラフトのバージョン履歴を表示します。操作 列の機能の詳細については、「ジョブバージョンの管理」をご参照ください。 |
ステップ 4:(オプション)構文チェックの実行
検証により、ジョブの SQL セマンティクス、ネットワーク接続、およびテーブルメタデータをチェックします。結果エリアで SQL アドバイス をクリックすると、潜在的な SQL リスクおよび最適化の提案を確認できます。
SQL エディターの右上隅で、検証 をクリックします。
検証 ダイアログボックスで、確認 をクリックします。
ステップ 5:(オプション)ドラフトのデバッグ
デバッグ機能を使用して、デプロイメントの実行をシミュレートし、出力を確認して SELECT 文および INSERT 文のビジネスロジックを検証できます。これにより、開発効率が向上し、データ品質の低下リスクが軽減されます。
SQL エディターの右上隅で、デバッグ をクリックします。
デバッグ ダイアログボックスで、セッションクラスターを選択し、次へ をクリックします。
利用可能なクラスターがない場合は、セッションクラスターを作成してください。セッションクラスターが SQL ドラフトと同じエンジンバージョンを使用しており、実行中であることを確認してください。詳細については、「セッションクラスターの作成」をご参照ください。
デバッグデータを設定します。
オンラインデータを使用する場合は、この操作をスキップします。
デバッグデータを使用する場合は、デバッグデータテンプレートのダウンロード をクリックして、テンプレートにデータを入力し、ファイルをアップロードします。詳細については、「ジョブのデバッグ」をご参照ください。
データを設定したら、OK をクリックします。
ステップ 6:ドラフトのデプロイ
SQL エディターの右上隅で、デプロイ をクリックします。新規バージョンのデプロイ ダイアログボックスで、デプロイパラメーターを設定し、OK をクリックします。
セッションクラスターは、開発環境やテスト環境など、本番環境以外の環境に適しています。セッションクラスターでドラフトをデプロイまたはデバッグすることで、JobManager のリソース利用率が向上し、デプロイの起動が高速化されます。ただし、本番環境用のドラフトをセッションクラスターにデプロイしないことを推奨します。そうしないと、安定性の問題が発生する可能性があります。
ステップ 7:ドラフトのデプロイメントを開始し、計算結果を確認する
左側のナビゲーションウィンドウで、O&M > デプロイメント をクリックします。
対象のデプロイメントの 操作 列で、開始 をクリックします。
状態なしで開始 を選択し、開始 をクリックします。実行中 のステータスは、デプロイメントが正常に動作していることを示します。起動パラメーターの詳細については、「ジョブの開始」をご参照ください。
説明Realtime Compute for Apache Flink の各 TaskManager の計算能力を最大限に活用するため、各 TaskManager に CPU コア 2 個とメモリ 4 GB を割り当てることを推奨します。TaskManager は 1 秒あたり 10,000 行を書き込むことができます。
ソーステーブルのパーティション数が多い場合は、Realtime Compute for Apache Flink の同時実行数を 16 未満に設定することを推奨します。書き込みレートは同時実行数に比例して線形に増加します。
デプロイメントページで、計算結果を確認します。
O&M > デプロイメント ページで、対象のデプロイメントの名前をクリックします。
ジョブログ タブで、実行中のタスクマネージャー タブをクリックし、パス,ID 列で対象のタスクをクリックします。
ログ をクリックして、ログ情報を確認します。
(オプション) デプロイメントのキャンセル
デプロイメントの SQL コードを変更したり、WITH 句にパラメーターを追加または削除したり、デプロイメントのバージョンを変更したりする場合は、デプロイメントのドラフトを再デプロイし、デプロイメントをキャンセルしてから再度開始する必要があります。デプロイメントが失敗し、状態データを再利用して復旧できない場合や、動的に適用されないパラメーター設定を更新したい場合も、デプロイメントをキャンセルしてから再開する必要があります。デプロイメントのキャンセル方法の詳細については、「デプロイメントのキャンセル」をご参照ください。