すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:DMSを使用したジョブのスケジュール

最終更新日:Sep 27, 2024

このトピックでは、Data Management (DMS) のジョブスケジューリング機能を使用して、ApsaraDB RDS for PostgreSQLからAnalyticDB for PostgreSQLに分析用のデータをスケジュールする方法について説明します。

概要

DMSはジョブスケジューリング機能を提供します。 この機能により、ApsaraDB RDS for PostgreSQLからObject Storage Service (OSS) にデータをロードしてから、分析のためにAnalyticDB for PostgreSQL in Serverlessモードにデータをロードできます。 DMSは、ETL (extract-transform-load) ジョブ全体をスケジュールするために使用されます。

メリット

  • データはOSSバケットに保存されるため、ストレージとアーカイブのコストが削減され、データのセキュリティが確保されます。

  • データは、高性能分析のためにApsaraDB RDS for PostgreSQLからAnalyticDB for PostgreSQL in Serverless modeにT + 1モードでロードされます。

  • DMSは、シンプルで視覚化された自動スケジューリングフレームワークに基づくジョブスケジューリングをサポートします。

使用上の注意

  • ApsaraDB RDS for PostgreSQLのデータは、日などの指定された条件に基づいて増分アーカイブをサポートする必要があります。

  • 関連するApsaraDB RDS for PostgreSQLインスタンス、AnalyticDB for PostgreSQLインスタンス、およびOSSバケットは、同じリージョン内に存在する必要があります。

前提条件

AnalyticDB for PostgreSQL

ApsaraDB RDS for PostgreSQL

  • ApsaraDB RDS for PostgreSQLインスタンスが作成されました。 詳細については、「ApsaraDB RDS for PostgreSQL インスタンスの作成」をご参照ください。

    説明

    ApsaraDB RDS for PostgreSQLインスタンスのエンジンバージョンは、PostgreSQL 9.4 PostgreSQL 13.0である必要があります。

  • 特権アカウントが作成されます。 詳細については、「アカウントの作成」をご参照ください。

OSS

  • OSSバケットが作成されます。 詳細については、「バケットの作成」をご参照ください。

  • バケットのバケット名エンドポイントが取得されます。 方法:

    1. OSS コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、[バケット] をクリックします。

    3. [バケット] ページで、バケットの名前をクリックします。

      バケット名[バケット] ページで取得できます。

    4. 左側のナビゲーションウィンドウで、[概要] をクリックします。

    5. [概要] ページの [ポート] セクションで、バケットのエンドポイントを取得します。

      データアクセスには、VPC (内部ネットワーク) 経由のECSからのアクセスのエンドポイントを使用することを推奨します。

AccessKeyID と AccessKey シークレット

AccessKey IDとAccessKey secretが取得されます。 詳細については、「AccessKey の作成」をご参照ください。

サービス接続とデータの準備

ApsaraDB RDS for PostgreSQL

  1. ApsaraDB RDS for PostgreSQLインスタンスに接続します。 詳細については、「ApsaraDB RDS For PostgreSQLインスタンスへの接続」をご参照ください。

    このトピックでは、DMSを使用してインスタンスに接続します。

  2. t_srcという名前のテストテーブルを作成し、テストデータをテーブルに挿入します。

    CREATE TABLE t_src (a int, b int, c date);
    INSERT INTO t_src SELECT generate_series(1, 1000), 1, now();
  3. oss_fdwプラグインをインストールします。

    CREATE EXTENSION IF NOT EXISTS oss_fdw;
  4. OSS外部テーブルを作成します。

    CREATE SERVER ossserver FOREIGN DATA WRAPPER oss_fdw OPTIONS
         (host '<bucket_host>' , id '<access_key>', key '<secret_key>',bucket '<bucket_name>');

    下表に、各パラメーターを説明します。

    パラメーター

    説明

    host

    OSSバケットのエンドポイント

    id

    AccessKey ID。

    キー

    AccessKeyシークレット。

    バケット

    OSSバケットの名前

AnalyticDB for PostgreSQL

  1. AnalyticDB for PostgreSQLインスタンスに接続します。 詳細については、「クライアント接続」をご参照ください。

    このトピックでは、DMSを使用してインスタンスに接続します。

  2. ApsaraDB RDS for PostgreSQLインスタンスのテーブルと同じスキーマを使用するt_targetという名前のテーブルを作成します。

    CREATE TABLE t_target (a int, b int, c date);
    説明

    サーバーレスモードのAnalyticDB for PostgreSQLはプライマリキーをサポートしていません。

  3. oss_fdwプラグインをインストールします。

    CREATE EXTENSION IF NOT EXISTS oss_fdw;
  4. OSSサーバーとユーザーマッピングを作成します。

    CREATE SERVER oss_serv
        FOREIGN DATA WRAPPER oss_fdw
        OPTIONS (
            endpoint '<bucket_host>',
            bucket '<bucket_name>'
      );
    
    CREATE USER MAPPING FOR PUBLIC
        SERVER oss_serv
        OPTIONS (
            id '<access_key>',
            key '<secret_key>'
        );

    下表に、各パラメーターを説明します。

    パラメーター

    説明

    endpoint

    OSSバケットのエンドポイント

    id

    AccessKey ID。

    キー

    AccessKeyシークレット。

    バケット

    OSSバケットの名前

ETLジョブの設定

  1. DMSコンソールを使用します。

  2. 上部のナビゲーションバーで、[DTS] をクリックします。 左側のナビゲーションウィンドウで、[データ開発] > [タスクオーケストレーション] を選択します。

  3. タスクフローセクション、をクリックタスクフローの作成.

  4. タスクフローの作成の値を指定します。タスクフロー名をクリックし、OK.

    この例では、[タスクフロー名][RDSからOSSへのデータのインポート] に設定します。

  5. 次の操作を実行して、ApsaraDB RDS for PostgreSQLデータをアーカイブするタスクを設定します。

    1. RDSからOSSへのデータのインポートタブを選択します。データ処理ノード>シングルインスタンスSQL左側のナビゲーションウィンドウで、[Single Instance SQL] を右側のキャンバスにドラッグします。

    2. (オプション) 追加したタスクを選択し、DMS作业调度-重命名アイコンをクリックしてタスクの名前を変更します。

      説明的なタスク名を指定して、ETLジョブ全体を簡単に維持できるようにすることができます。 この例では、タスク名を [RDSからデータを抽出] に設定します。

    3. 追加されたタスクを選択し、DMS-编辑当前节点配置アイコンが表示されます。

    4. コードエディターの上のドロップダウンリストから、バインドするApsaraDB RDS for PostgreSQLデータベースを選択します。

      DMS作业调度-选择需要绑定的数据库

      ApsaraDB RDS for PostgreSQLデータベースの [SQL] タブに切り替えて、データベースを表示できます。

    5. コードエディターで、次のステートメントを入力します。

      DROP FOREIGN TABLE IF EXISTS oss_${mydate};
      
      CREATE FOREIGN TABLE IF NOT EXISTS oss_${mydate}
          (a int,
           b int,
           c date)
           SERVER ossserver
           OPTIONS ( dir 'rds/t3/${mydate}/', DELIMITER  '|' ,
               format 'csv', encoding 'utf8');
      
      INSERT INTO oss_${mydate} SELECT * FROM t_src WHERE c >= '${mydate}';
    6. 右側のナビゲーションウィンドウで、[変数設定] タブをクリックします。 [ノード変数] を選択します。 変数名mydataに、時間形式yyyyMMddに設定します。

      DMS作业调度-RDS变量设置

  6. [RDSからOSSへのデータのインポート] タスクフローに戻ります。 次の操作を実行して、AnalyticDB for PostgreSQLのロードタスクを設定します。

    1. On theRDSからOSSへのデータのインポートタブを選択します。データ処理ノード>シングルインスタンスSQL左側のナビゲーションウィンドウで、[Single Instance SQL] を右側のキャンバスにドラッグします。

    2. (オプション) 追加したタスクを選択し、DMS作业调度-重命名アイコンをクリックしてタスクの名前を変更します。

      説明的なタスク名を指定して、ETLジョブ全体を簡単に維持できるようにすることができます。 この例では、タスク名をLoad data to AnalyticDB for PostgreSQLに設定します。

    3. 追加されたタスクを選択し、DMS-编辑当前节点配置アイコンが表示されます。

    4. を選択します。AnalyticDB for PostgreSQLデータベースをコードエディターの上のドロップダウンリストからバインドします。

      手順5と同じ方法で、AnalyticDB for PostgreSQLデータベースを取得できます。

    5. コードエディターで、次のステートメントを入力します。

      oss _${ mydate} が存在しない場合は

      CREATE FOREIGN TABLE IF NOT EXISTS  oss_${mydate}(
           a int ,
           b int ,
           c date
      ) SERVER oss_serv
          OPTIONS (
              dir 'rds/t3/${mydate}/',
              format 'csv',
              delimiter '|',
              encoding 'utf8');
      
      INSERT INTO t_target SELECT * FROM oss_${mydate};
    6. 右側のナビゲーションウィンドウで、[変数設定] タブをクリックします。 [ノード変数] を選択します。 変数名mydataに、時間形式yyyyMMddに設定します。

      DMS作业调度-RDS变量设置

  7. 次の手順を実行してスケジューリングを設定します。 データをAnalyticDB for PostgreSQLに読み込むタスクの前に、RDSからデータを抽出するタスクを実行する必要があります。

    1. [RDSからOSSへのデータのインポート] タブで、[RDSからデータを抽出] タスクの上にポインターを移動し、右側の小さな円をクリックしたままにして、円から [Data to AnalysticDB for PostgreSQL] タスクに線を引きます。 生成されたタスクフローを次の図に示します。

      DMS作业调度-任务地图

    2. ページ下部の [タスクフロー情報] タブをクリックします。 [スケジューリング設定] セクションで、[スケジューリングの有効化] をオンにします。

    3. RDSからデータを抽出し、データをロードする両方のスケジューリングサイクルを選択します。AnalyticDB for PostgreSQLタスクを実行します。

  8. 上記の設定が完了したら、をクリックします。実行しようタブの上部にあります。

  9. タスクフローが正常に実行される場合は、公開.

関連ドキュメント