この記事では、DataWorks で PyODPS の引数を渡す方法について説明します。
前提条件
事前に、次の操作を完了しておいてください。
-
MaxCompute が有効化されていること。
-
DataWorks が有効化されていること。
-
DataWorks でワークフローが作成されていること。詳細については、「Create a workflow」をご参照ください。
操作手順
この例では、DataWorks のシンプルモードを使用します。ワークスペースを作成する際に、デフォルトで [参加数据开发(Data Studio)公测] をオフのままにしておきます。パブリックプレビューのワークスペースはこの例には適用されません。
-
テストデータを準備します。
-
テーブルを作成し、データをアップロードします。操作方法については、「Create a table and upload data」をご参照ください。
テーブル構造とソースデータの情報は次のとおりです:
-
パーティションテーブル user_detail のテーブル作成文は次のとおりです。
CREATE TABLE IF NOT EXISTS user_detail ( userid BIGINT COMMENT 'ユーザー ID', job STRING COMMENT '職種', education STRING COMMENT '学歴' ) COMMENT 'ユーザー情報テーブル' PARTITIONED BY (dt STRING COMMENT '日付', region STRING COMMENT 'リージョン'); -
ソースデータテーブル user_detail_ods のテーブル作成文は以下のとおりです。
CREATE TABLE IF NOT EXISTS user_detail_ods ( userid BIGINT COMMENT 'ユーザー ID', job STRING COMMENT '職種', education STRING COMMENT '学歴', dt STRING COMMENT '日付', region STRING COMMENT 'リージョン' ); -
テストデータを user_detail.txt ファイルとして保存し、テーブル user_detail_ods にアップロードします。
0001,互联网,本科,20190715,beijing 0002,教育,大专,20190716,beijing 0003,金融,硕士,20190715,shandong 0004,互联网,硕士,20190715,beijing
-
-
ソースデータテーブル
user_detail_odsのデータをパーティションテーブルuser_detailに書き込みます。-
DataWorks コンソールにログインします。
-
左側メニューで、[Workspace] をクリックします。
-
対象のワークスペースを確認し、[操作] 列の を選択します。
-
ワークフローを右クリックし、 を選択します。
-
ノード名を入力し、[確認] をクリックします。
-
ODPS SQL ノードに次のコードを入力します。
INSERT OVERWRITE TABLE user_detail PARTITION (dt, region) SELECT userid, job, education, dt, region FROM user_detail_ods; -
[実行] をクリックして、データの書き込みを完了します。
-
-
-
PyODPS を使用して引数を渡します。
-
DataWorks コンソールにログインします。
-
左側メニューで、[Workspace] をクリックします。
-
対象のワークスペースを確認し、[操作] 列の を選択します。
-
[データ開発] ページで、作成済みのワークフローを右クリックし、 を選択します。
-
ノード名を入力し、[確認] をクリックします。
-
PyODPS 2 ノードに次のコードを入力して、引数渡しを実装します。
import sys reload(sys) print('dt=' + args['dt']) # システムのデフォルトエンコーディングを変更します。 sys.setdefaultencoding('utf8') # テーブルを取得します。 t = o.get_table('user_detail') # 渡されたパーティション引数を使用します。 with t.open_reader(partition='dt=' + args['dt'] + ',region=beijing') as reader1: count = reader1.count print("查询分区表数据:") for record in reader1: print record[0],record[1],record[2] -
[Run with Parameters] をクリックします。
import sys reload(sys) print('dt=' + args['dt']) # システムのデフォルトエンコーディングを変更します。 sys.setdefaultencoding('utf8') # テーブルを取得します。 t = o.get_table('user_detail') # 渡されたパーティション引数を使用します。 with t.open_reader(partition='dt=' + args['dt'] + ',region=beijing') as reader1: count = reader1.count print("查询分区表数据:") for record in reader1: print record[0],record[1],record[2] -
[Parameters] ダイアログボックスに設定パラメーターを入力し、[実行] をクリックします。
設定パラメーターの説明は次のとおりです:
-
[リソースグループ名]: [Shared Resource Group] を選択します。
-
[dt]:
20190715に設定します。
-
-
[Runtime Log] で実行結果を確認します。
Executing user script with PyODPS 0.8.0 dt=20190715 查询分区表数据: 4 互联网 硕士 1 互联网 本科 xxx xxx xxx xxx INFO ===================================================================
-