このトピックでは、DataWorks で PyODPS ノードを使用してパラメーターを渡す方法について説明します。
前提条件
DataWorks でビジネスフローが作成されている。詳細については、「ワークフローを作成する」をご参照ください。
手順
この例では、DataWorks の基本モードを使用します。ワークスペースを作成する際、デフォルトでは [data Studio パブリックプレビューに参加] は有効になっていません。この例は、Data Studio パブリックプレビューに参加しているワークスペースには適用されません。
テストデータを準備します。
テーブルを作成し、データをアップロードします。詳細については、「テーブルを作成してデータをアップロードする」をご参照ください。
この例では、次のテーブル作成ステートメントとソースデータを使用します。
次のステートメントは、パーティションテーブル user_detail を作成します。
CREATE TABLE IF NOT EXISTS user_detail ( userid BIGINT COMMENT 'User ID', job STRING COMMENT 'Job type', education STRING COMMENT 'Education level' ) COMMENT 'User information table' PARTITIONED BY (dt STRING COMMENT 'Date',region STRING COMMENT 'Region');
次のステートメントは、ソースデータテーブル user_detail_ods を作成します。
CREATE TABLE IF NOT EXISTS user_detail_ods ( userid BIGINT COMMENT 'User ID', job STRING COMMENT 'Job type', education STRING COMMENT 'Education level', dt STRING COMMENT 'Date', region STRING COMMENT 'Region' );
テストデータを user_detail.txt ファイルとして保存します。このファイルを user_detail_ods テーブルにアップロードします。
0001,Internet,Bachelor,20190715,beijing 0002,Education,junior college,20190716,beijing 0003,Finance,master,20190715,shandong 0004,Internet,master,20190715,beijing
ソースデータテーブル
user_detail_ods
からパーティションテーブルuser_detail
にデータを書き込みます。DataWorks コンソール にログインします。
左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。
ターゲットワークスペースを見つけ、
[アクション] 列で を選択します。ビジネスフローを右クリックし、
を選択します。ノード名を入力し、[確認] をクリックします。
ODPS SQL ノードに次のコードを入力します。
INSERT OVERWRITE TABLE user_detail PARTITION (dt, region) SELECT userid, job, education, dt, region FROM user_detail_ods;
[実行] をクリックして、データの書き込みを完了します。
PyODPS を使用してパラメーターを渡します。
DataWorks コンソール にログインします。
左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。
ターゲットワークスペースを見つけ、 [アクション] 列で を選択します。
[データ開発] ページで、作成したビジネスフローを右クリックし、 を選択します。
ノード名を入力し、[確認] をクリックします。
PyODPS 2 ノードに次のコードを入力して、パラメーターを渡します。
import sys reload(sys) print('dt=' + args['dt']) # デフォルトのエンコード形式を UTF-8 に変更します。 sys.setdefaultencoding('utf8') # user_detail テーブルを取得します。 t = o.get_table('user_detail') # 渡されたパーティションフィールドを受け取ります。 with t.open_reader(partition='dt=' + args['dt'] + ',region=beijing') as reader1: count = reader1.count print("パーティションテーブルのデータのクエリ:") for record in reader1: print record[0],record[1],record[2]
[パラメーター付きで実行] をクリックします。
[パラメーター] ダイアログボックスで、パラメーターを設定し、[実行] をクリックします。
次のパラメーターを設定します。
[リソースグループ名]: [デフォルトリソースグループ] を選択します。
dt: dt=20190715 に設定します。
[操作ログ] で操作結果を表示します。