このトピックでは、PyODPSノードを使用して、指定されたテーブルのレベル1パーティションからデータを読み取る方法について説明します。
前提条件
次の前提条件が満たされていることを確認します。
-
ビジネスフローはDataWorksで作成されます。 詳細については、「ビジネスフローの作成」をご参照ください。
手順
この例では、DataWorksの基本モードを使用します。 ワークスペースを作成するとき、デフォルトでData Studioのパブリックプレビューに参加は有効になっていません。この例は、Data Studioパブリックプレビューに参加しているワークスペースには適用されません。
-
テストデータを準備します。
-
テーブルを作成してデータをアップロードします。 詳細については、「テーブルの作成とデータのアップロード」をご参照ください。
この例では、次のテーブル作成ステートメントとソースデータを使用します。
-
次のステートメントは、パーティション分割テーブルuser_detailを作成します。
CREATE TABLE IF NOT EXISTS user_detail ( userid BIGINT COMMENT 'User ID', job STRING COMMENT 'Job type', education STRING COMMENT 'Education level' ) COMMENT 'User information table' PARTITIONED BY (dt STRING COMMENT 'Date',region STRING COMMENT 'Region'); -
次の文は、ソースデータテーブルuser_detail_odsを作成します。
CREATE TABLE IF NOT EXISTS user_detail_ods ( userid BIGINT COMMENT 'User ID', job STRING COMMENT 'Job type', education STRING COMMENT 'Education level', dt STRING COMMENT 'Date', region STRING COMMENT 'Region' ); -
テストデータをuser_detail.txtファイルとして保存します。 このファイルをuser_detail_odsテーブルにアップロードします。
0001,互联网,Bachelor,20190715,beijing 0002,education,junior college,20190716,beijing 0003,finance,master,20190715,shandong 0004,互联网,master,20190715,beijing
-
-
ソースデータテーブル
user_detail_odsからパーティションテーブルuser_detailにデータを書き込みます。-
DataWorks コンソールにログインします。
-
左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。
-
ターゲットワークスペースを確認します。 [操作] 列で、 を選択します。
-
ビジネスフローを右クリックし、 を選択します。
-
ノード名を入力し、[確認] をクリックします。
-
ODPS SQLノードに次のコードを入力します。
INSERT OVERWRITE TABLE user_detail PARTITION (dt, region) SELECT userid, job, education, dt, region FROM user_detail_ods; -
[実行] をクリックしてデータの書き込みを完了します。
-
-
PyODPSノードを使用して、user_detailテーブルのレベル1パーティションからデータを読み取ります。
-
にログインします。DataWorksコンソール.
-
左側のナビゲーションウィンドウで、ワークスペース.
-
ターゲットワークスペースを確認します。 [操作] 列で、 を選択します。
-
On theデータ开発ページで、作成したビジネスフローを右クリックし、.
-
ノード名を入力し、確認.
PyODPS 2ノードの構成タブで、コードエディターに次のコードを入力します。
import sys reload(sys) # Set UTF-8 as the default encoding format. sys.setdefaultencoding('utf8') # Read data from the level-1 partition in asynchronous mode. instance = o.run_sql('select * from user_detail WHERE dt=\'20190715\'') instance.wait_for_success() for record in instance.open_reader(): print record["userid"],record["job"],record["education"] # Read data from the level-1 partition in synchronous mode. with o.execute_sql('select * from user_detail WHERE dt=\'20190715\'').open_reader() as reader4: print reader4.raw for record in reader4: print record["userid"],record["job"],record["education"] # Use the PyODPS DataFrame to read data from the level-1 partition. pt_df = DataFrame(o.get_table('user_detail').get_partition('dt=20190715')) print pt_df.head(10)ツールバーの [実行] アイコンをクリックします。

PyODPS 2ノードの実行結果をログの実行タブをクリックします。

-