このトピックでは、Data Management (DMS) のジョブスケジューリング機能を使用して、ApsaraDB RDS for PostgreSQLからAnalyticDB for PostgreSQLに分析用のデータをスケジュールする方法について説明します。
概要
DMSはジョブスケジューリング機能を提供します。 この機能により、ApsaraDB RDS for PostgreSQLからObject Storage Service (OSS) にデータをロードしてから、分析のためにAnalyticDB for PostgreSQL in Serverlessモードにデータをロードできます。 DMSは、ETL (extract-transform-load) ジョブ全体をスケジュールするために使用されます。
メリット
データはOSSバケットに保存されるため、ストレージとアーカイブのコストが削減され、データのセキュリティが確保されます。
データは、高性能分析のためにApsaraDB RDS for PostgreSQLからAnalyticDB for PostgreSQL in Serverless modeにT + 1モードでロードされます。
DMSは、シンプルで視覚化された自動スケジューリングフレームワークに基づくジョブスケジューリングをサポートします。
使用上の注意
ApsaraDB RDS for PostgreSQLのデータは、日などの指定された条件に基づいて増分アーカイブをサポートする必要があります。
関連するApsaraDB RDS for PostgreSQLインスタンス、AnalyticDB for PostgreSQLインスタンス、およびOSSバケットは、同じリージョン内に存在する必要があります。
前提条件
AnalyticDB for PostgreSQL
サーバーレスモードのAnalyticDB for PostgreSQLインスタンスが作成されます。 詳細は、インスタンスの作成をご参照ください。
特権アカウントが作成されます。 詳細については、「データベースアカウントの作成」をご参照ください。
ApsaraDB RDS for PostgreSQL
ApsaraDB RDS for PostgreSQLインスタンスが作成されました。 詳細については、「ApsaraDB RDS for PostgreSQL インスタンスの作成」をご参照ください。
説明ApsaraDB RDS for PostgreSQLインスタンスのエンジンバージョンは、PostgreSQL 9.4 PostgreSQL 13.0である必要があります。
特権アカウントが作成されます。 詳細については、「アカウントの作成」をご参照ください。
OSS
OSSバケットが作成されます。 詳細については、「バケットの作成」をご参照ください。
バケットのバケット名とエンドポイントが取得されます。 方法:
OSS コンソールにログインします。
左側のナビゲーションウィンドウで、[バケット] をクリックします。
[バケット] ページで、バケットの名前をクリックします。
バケット名は [バケット] ページで取得できます。
左側のナビゲーションウィンドウで、[概要] をクリックします。
[概要] ページの [ポート] セクションで、バケットのエンドポイントを取得します。
データアクセスには、VPC (内部ネットワーク) 経由のECSからのアクセスのエンドポイントを使用することを推奨します。
AccessKeyID と AccessKey シークレット
AccessKey IDとAccessKey secretが取得されます。 詳細については、「AccessKey の作成」をご参照ください。
サービス接続とデータの準備
ApsaraDB RDS for PostgreSQL
ApsaraDB RDS for PostgreSQLインスタンスに接続します。 詳細については、「ApsaraDB RDS For PostgreSQLインスタンスへの接続」をご参照ください。
このトピックでは、DMSを使用してインスタンスに接続します。
t_srcという名前のテストテーブルを作成し、テストデータをテーブルに挿入します。
CREATE TABLE t_src (a int, b int, c date); INSERT INTO t_src SELECT generate_series(1, 1000), 1, now();
oss_fdwプラグインをインストールします。
CREATE EXTENSION IF NOT EXISTS oss_fdw;
OSS外部テーブルを作成します。
CREATE SERVER ossserver FOREIGN DATA WRAPPER oss_fdw OPTIONS (host '<bucket_host>' , id '<access_key>', key '<secret_key>',bucket '<bucket_name>');
下表に、各パラメーターを説明します。
パラメーター
説明
host
OSSバケットのエンドポイント。
id
AccessKey ID。
キー
AccessKeyシークレット。
バケット
OSSバケットの名前。
AnalyticDB for PostgreSQL
AnalyticDB for PostgreSQLインスタンスに接続します。 詳細については、「クライアント接続」をご参照ください。
このトピックでは、DMSを使用してインスタンスに接続します。
ApsaraDB RDS for PostgreSQLインスタンスのテーブルと同じスキーマを使用するt_targetという名前のテーブルを作成します。
CREATE TABLE t_target (a int, b int, c date);
説明サーバーレスモードのAnalyticDB for PostgreSQLはプライマリキーをサポートしていません。
oss_fdwプラグインをインストールします。
CREATE EXTENSION IF NOT EXISTS oss_fdw;
OSSサーバーとユーザーマッピングを作成します。
CREATE SERVER oss_serv FOREIGN DATA WRAPPER oss_fdw OPTIONS ( endpoint '<bucket_host>', bucket '<bucket_name>' ); CREATE USER MAPPING FOR PUBLIC SERVER oss_serv OPTIONS ( id '<access_key>', key '<secret_key>' );
下表に、各パラメーターを説明します。
パラメーター
説明
endpoint
OSSバケットのエンドポイント。
id
AccessKey ID。
キー
AccessKeyシークレット。
バケット
OSSバケットの名前。
ETLジョブの設定
DMSコンソールを使用します。
上部のナビゲーションバーで、[DTS] をクリックします。 左側のナビゲーションウィンドウで、 を選択します。
タスクフローセクション、をクリックタスクフローの作成.
タスクフローの作成の値を指定します。タスクフロー名をクリックし、OK.
この例では、[タスクフロー名] を [RDSからOSSへのデータのインポート] に設定します。
次の操作を実行して、ApsaraDB RDS for PostgreSQLデータをアーカイブするタスクを設定します。
RDSからOSSへのデータのインポートタブを選択します。データ処理ノード>シングルインスタンスSQL左側のナビゲーションウィンドウで、[Single Instance SQL] を右側のキャンバスにドラッグします。
(オプション) 追加したタスクを選択し、
アイコンをクリックしてタスクの名前を変更します。
説明的なタスク名を指定して、ETLジョブ全体を簡単に維持できるようにすることができます。 この例では、タスク名を [RDSからデータを抽出] に設定します。
追加されたタスクを選択し、
アイコンが表示されます。
コードエディターの上のドロップダウンリストから、バインドするApsaraDB RDS for PostgreSQLデータベースを選択します。
ApsaraDB RDS for PostgreSQLデータベースの [SQL] タブに切り替えて、データベースを表示できます。
コードエディターで、次のステートメントを入力します。
DROP FOREIGN TABLE IF EXISTS oss_${mydate}; CREATE FOREIGN TABLE IF NOT EXISTS oss_${mydate} (a int, b int, c date) SERVER ossserver OPTIONS ( dir 'rds/t3/${mydate}/', DELIMITER '|' , format 'csv', encoding 'utf8'); INSERT INTO oss_${mydate} SELECT * FROM t_src WHERE c >= '${mydate}';
右側のナビゲーションウィンドウで、[変数設定] タブをクリックします。 [ノード変数] を選択します。 変数名を
mydata
に、時間形式をyyyyMMdd
に設定します。
[RDSからOSSへのデータのインポート] タスクフローに戻ります。 次の操作を実行して、AnalyticDB for PostgreSQLのロードタスクを設定します。
On theRDSからOSSへのデータのインポートタブを選択します。データ処理ノード>シングルインスタンスSQL左側のナビゲーションウィンドウで、[Single Instance SQL] を右側のキャンバスにドラッグします。
(オプション) 追加したタスクを選択し、
アイコンをクリックしてタスクの名前を変更します。
説明的なタスク名を指定して、ETLジョブ全体を簡単に維持できるようにすることができます。 この例では、タスク名をLoad data to AnalyticDB for PostgreSQLに設定します。
追加されたタスクを選択し、
アイコンが表示されます。
を選択します。AnalyticDB for PostgreSQLデータベースをコードエディターの上のドロップダウンリストからバインドします。
手順5と同じ方法で、AnalyticDB for PostgreSQLデータベースを取得できます。
コードエディターで、次のステートメントを入力します。
oss _${ mydate} が存在しない場合は
CREATE FOREIGN TABLE IF NOT EXISTS oss_${mydate}( a int , b int , c date ) SERVER oss_serv OPTIONS ( dir 'rds/t3/${mydate}/', format 'csv', delimiter '|', encoding 'utf8'); INSERT INTO t_target SELECT * FROM oss_${mydate};
右側のナビゲーションウィンドウで、[変数設定] タブをクリックします。 [ノード変数] を選択します。 変数名を
mydata
に、時間形式をyyyyMMdd
に設定します。
次の手順を実行してスケジューリングを設定します。 データをAnalyticDB for PostgreSQLに読み込むタスクの前に、RDSからデータを抽出するタスクを実行する必要があります。
[RDSからOSSへのデータのインポート] タブで、[RDSからデータを抽出] タスクの上にポインターを移動し、右側の小さな円をクリックしたままにして、円から [Data to AnalysticDB for PostgreSQL] タスクに線を引きます。 生成されたタスクフローを次の図に示します。
ページ下部の [タスクフロー情報] タブをクリックします。 [スケジューリング設定] セクションで、[スケジューリングの有効化] をオンにします。
RDSからデータを抽出し、データをロードする両方のスケジューリングサイクルを選択します。AnalyticDB for PostgreSQLタスクを実行します。
上記の設定が完了したら、をクリックします。実行しようタブの上部にあります。
タスクフローが正常に実行される場合は、公開.