PyODPS ノードを使用したパーティションテーブルからのデータ読み取り
日付やリージョンなどのパーティションキーで整理された大規模な MaxCompute データセットを操作する場合、テーブル全体をスキャンするのではなく、関連するパーティションのみを読み取る方法が必要です。DataWorks の PyODPS ノードを使用すると、PyODPS SDK を使用して特定のパーティションを直接クエリできるため、スキャンされるデータが削減され、Pythonベースのパイプラインが簡素化されます。
前提条件
開始する前に、以下を確認してください。
アクティブ化された MaxCompute サービス。詳細については、「MaxCompute をアクティブ化する」をご参照ください。
アクティブ化された DataWorks サービス。詳細については、「DataWorks をアクティブ化する」をご参照ください。
DataWorks コンソールで作成されたワークフロー。この例では、基本モードで実行されている DataWorks ワークスペース内のワークフローを使用します。詳細については、「ワークフローを作成する」をご参照ください。
ステップ 1: テストデータの準備
パーティションテーブルを作成し、ソースデータをステージングテーブルにロードしてから、ODPS SQL ノードを使用してデータをパーティションテーブルに挿入します。
パーティションテーブルとソーステーブルを作成し、ソーステーブルにデータをインポートします。詳細については、「テーブルを作成してデータをアップロードする」をご参照ください。次の DDL 文とサンプルデータを使用します。
user_detailという名前のパーティションテーブルを作成します。create table if not exists user_detail ( userid BIGINT comment 'ユーザー ID', job STRING comment 'ジョブタイプ', education STRING comment '教育レベル' ) comment 'ユーザー情報テーブル' partitioned by (dt STRING comment '日付',region STRING comment 'リージョン');user_detail_odsという名前のソーステーブルを作成します。create table if not exists user_detail_ods ( userid BIGINT comment 'ユーザー ID', job STRING comment 'ジョブタイプ', education STRING comment '教育レベル', dt STRING comment '日付', region STRING comment 'リージョン' );user_detail.txtという名前のファイルを次の内容で作成し、user_detail_odsテーブルにインポートします。0001,インターネット,学士,20190715,beijing 0002,教育,短期大学,20190716,beijing 0003,金融,修士,20190715,shandong 0004,インターネット,修士,20190715,beijing
ワークフローを右クリックし、[ノードの作成] > [MaxCompute] > [ODPS SQL] を選択します。
「ノードの作成」ダイアログボックスで、[名前] を指定し、[確認] をクリックします。
ノード構成タブのコードエディタで、次の SQL ステートメントを入力します。
insert overwrite table user_detail partition (dt,region) select userid,job,education,dt,region from user_detail_ods;ツールバーの [実行] アイコンをクリックして、
user_detail_odsからuser_detailパーティションテーブルにデータを挿入します。
ステップ 2: PyODPS ノードを使用したパーティションテーブルからのデータ読み取り
DataWorks コンソール にログインします。
左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。
ワークスペースを見つけ、[ショートカット] > [データ開発] を [操作] 列で選択します。
「DataStudio」ページで、ワークフローを右クリックし、「ノードの作成」→「MaxCompute」→「PyODPS 2」を選択します。
[作成ノード] ダイアログボックスで、[名前] を指定し、[確認] をクリックします。
ノード構成タブのコードエディタで、次のコードを入力します。
メソッド 使用方法 推奨用途 メソッド 1: open_reader()とwithリーダーをコンテキストマネージャーでラップします リソースの自動解放 メソッド 2: open_reader()を直接使用リーダーを開き、直接イテレートします withブロックを使用せずに、列名による Record アクセスメソッド 3: read_table()read_table()をODPSオブジェクト上で呼び出します簡潔な 1 行読み取り import sys from odps import ODPS reload(sys) print('dt=' + args['dt']) # UTF-8 をデフォルトのエンコード形式として設定します。 sys.setdefaultencoding('utf8') # パーティションテーブルを取得します。 t = o.get_table('user_detail') # 特定のパーティションが存在するかどうかを確認します。 print t.exist_partition('dt=20190715,region=beijing') # テーブル内のすべてのパーティションをリストします。 for partition in t.partitions: print partition.name # メソッド 1: open_reader() をコンテキストマネージャーとして使用する。 # with ブロックが終了するとリーダーは自動的に閉じられ、適切なリソースクリーンアップが保証されます。 with t.open_reader(partition='dt=20190715,region=beijing') as reader1: count = reader1.count print("メソッド 1 を使用してパーティションテーブル内のデータをクエリします。") for record in reader1: print record[0],record[1],record[2] # メソッド 2: open_reader() をコンテキストマネージャーなしで使用する。 # インデックス位置ではなく、列名でレコードにアクセスします。 print("メソッド 2 を使用してパーティションテーブル内のデータをクエリします。") reader2 = t.open_reader(partition='dt=20190715,region=beijing') for record in reader2: print record["userid"],record["job"],record["education"] # メソッド 3: ODPS オブジェクトで read_table() を直接使用する。 # これは、シンプルな読み取りに最も簡潔なオプションです。 print("メソッド 3 を使用してパーティションテーブル内のデータをクエリします。") for record in o.read_table('user_detail', partition='dt=20190715,region=beijing'): print record["userid"],record["job"],record["education"]partition引数は、各パーティション列に対してkey=valueの形式をコンマで区切って使用し、読み取る正確なパーティションを指定します。これら 3 つのメソッドはすべて、一度に 1 つのパーティションを読み取ります。ツールバーの[パラメーター付きで実行] アイコンをクリックします。

[パラメーター] ダイアログボックスで、次のパラメーターを設定し、[実行] をクリックします。
パラメーター 説明 リソースグループ名 [共通スケジューラリソースグループ] を選択します。 dt dt=20190715に設定します。
結果は、[実行ログ] タブで表示できます。
