このトピックでは、PyODPS を使用してレベル 1 パーティションをクエリする方法について説明します。
前提条件
事前に、次の操作を完了しておいてください。
-
MaxCompute を有効化済みであること。
-
DataWorks を有効化済みであること。
-
DataWorks でワークフローが作成済みであること。 詳細については、「ワークフローの作成」をご参照ください。
操作手順
この例では、DataWorks の 基本モードを使用します。 ワークスペースを作成する際、DataStudio のパブリックプレビューに参加 オプションはデフォルトで選択解除されています。 この例は、パブリックプレビュー中のワークスペースには適用されません。
-
テストデータを準備します。
-
テーブルを作成し、データをアップロードします。 操作方法については、「テーブルの作成とデータのアップロード」をご参照ください。
テーブル構造とソースデータ情報は次のとおりです。
-
パーティションテーブル user_detail の CREATE TABLE 文は次のとおりです。
CREATE TABLE IF NOT EXISTS user_detail ( userid BIGINT COMMENT 'ユーザー ID', job STRING COMMENT 'ジョブタイプ', education STRING COMMENT '学歴' ) COMMENT 'ユーザー情報テーブル' PARTITIONED BY (dt STRING COMMENT '日付',region STRING COMMENT 'リージョン'); -
ソースデータテーブル user_detail_ods の CREATE TABLE 文は次のとおりです。
CREATE TABLE IF NOT EXISTS user_detail_ods ( userid BIGINT COMMENT 'ユーザー ID', job STRING COMMENT 'ジョブタイプ', education STRING COMMENT '学歴', dt STRING COMMENT '日付', region STRING COMMENT 'リージョン' ); -
テストデータを user_detail.txt ファイルとして保存します。 このファイルをテーブル user_detail_ods にアップロードします。
0001,互联网,本科,20190715,beijing 0002,教育,大专,20190716,beijing 0003,金融,硕士,20190715,shandong 0004,互联网,硕士,20190715,beijing
-
-
ソースデータテーブル
user_detail_odsのデータをパーティションテーブルuser_detailに書き込みます。-
DataWorks コンソールにログインします。
-
左側のナビゲーションペインで、Workspace をクリックします。
-
対象のワークスペースを確認し、操作 列で を選択します。
-
ビジネスフローを右クリックし、 を選択します。
-
ノード名を入力し、確認 をクリックします。
-
ODPS SQL ノードに次のコードを入力します。
INSERT OVERWRITE TABLE user_detail PARTITION (dt, region) SELECT userid, job, education, dt, region FROM user_detail_ods; -
実行 をクリックして、データの書き込みを完了します。
-
-
PyODPS を使用してレベル 1 パーティションをクエリします。
-
DataWorks コンソールにログインします。
-
左側のナビゲーションペインで、Workspace をクリックします。
-
対象のワークスペースを確認し、操作 列で を選択します。
-
データ開発 ページで、作成済みのビジネスフローを右クリックし、 を選択します。
-
ノード名を入力し、確認 をクリックします。
次のコードを入力して、非同期、同期、および DataFrame を使用する 3 つの方法でレベル 1 パーティションをクエリします。
import sys reload(sys) # デフォルトのシステムエンコーディングを utf8 に設定します。 sys.setdefaultencoding('utf8') # レベル 1 パーティションを非同期で読み取ります。 instance = o.run_sql('select * from user_detail WHERE dt=\'20190715\'') instance.wait_for_success() for record in instance.open_reader(): print record["userid"],record["job"],record["education"] # レベル 1 パーティションを同期的に読み取ります。 with o.execute_sql('select * from user_detail WHERE dt=\'20190715\'').open_reader() as reader4: print reader4.raw for record in reader4: print record["userid"],record["job"],record["education"] # PyODPS DataFrame を使用してレベル 1 パーティションを読み取ります。 pt_df = DataFrame(o.get_table('user_detail').get_partition('dt=20190715')) print pt_df.head(10)実行 をクリックします。
Runtime Log で結果を表示します。
4 Internet master 1 Internet bachelor 3 finance master "userid","job","education","dt","region" 4,"Internet","master","20190715","beijing" 1,"Internet","bachelor","20190715","beijing" 3,"finance","master","20190715","shandong" 4 Internet master 1 Internet bachelor 3 finance master Try to fetch data from tunnel userid job education 0 4 Internet master 1 1 Internet bachelor 2 3 finance master
-