SQLタスクなどのタスクは、MaxComputeの基本的な計算単位です。 このトピックでは、PyODPSのタスクインスタンスに対する基本的な操作について説明します。
基本操作
タスクがトリガーされると、そのタスクのMaxComputeインスタンスが作成されます。 詳細は、「インスタンス」をご参照ください。 MaxComputeインスタンスで次の操作を実行できます。
list_instances()
: プロジェクト内のすべてのインスタンスを取得します。exist_instance()
: インスタンスが存在するかどうかを確認します。get_instance()
: インスタンスを取得します。stop_instance()
: インスタンスを停止します。 実行中のインスタンスのみを停止できます。 実行されていないインスタンスを停止しようとすると、エラーが返されます。説明stop()
メソッドを呼び出してインスタンスを停止することもできます。 たとえば、o.get_instance('instance id').stop
を実行できます。
例
インスタンスのIDを取得します。
for instance in o.list_instances(): print(instance.id)
インスタンスが存在するかどうかを確認します。
print(o.exist_instance('my_instance_id'))
インスタンスのLogview URLの取得
SQLタスクインスタンスの場合、
get_logview_address
メソッドを呼び出して、インスタンスのLogview URLを取得できます。# Execute an SQL statement to obtain an instance. Then, call the get_logview_address() method to obtain the Logview URL of the instance. instance = o.run_sql('desc pyodps_iris') print(instance.get_logview_address()) # Obtain the Logview URL of an instance that has the specified ID. instance = o.get_instance('my_instance_id') print(instance.get_logview_address())
Machine Learning Platform For AI (PAI) インスタンスの場合、インスタンスのサブインスタンスを一覧表示し、サブインスタンスのLogview URLを取得する必要があります。
instance = o.run_xflow('AppendID', 'algo_public', {'inputTableName': 'input_table', 'outputTableName': 'output_table'}) for sub_inst_name, sub_inst in o.get_xflow_sub_instances(instance).items(): print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))
インスタンスのステータスの取得
インスタンスの状態は、[実行中]
、[一時停止中]
、または [終了]
です。 次の方法を使用して、インスタンスのステータスを取得できます。
status
: インスタンスのステータスを取得します。is_terminated
: 現在のインスタンスの実行が完了したかどうかを確認します。is_successful
: 現在のインスタンスの実行が成功したかどうかを確認します。 インスタンスが実行中または失敗した場合、Falseが返されます。
サンプルコード:
インスタンスのステータスを取得します。
instance=o.get_instance('my_instance_id') print(instance.status) print(instance.status.value)
レスポンス例:
Status.TERMINATED Terminated
インスタンスの実行が完了したかどうかを確認します。
instance=o.get_instance('my_instance_id') from odps.models import Instance print(instance.status == Instance.Status.TERMINATED)
True
が返された場合、実行は完了です。
wait_for_completion
メソッドを呼び出して、インスタンスの実行が完了するまでシステムがステータスを返すことができます。 wait_for_success
メソッドの効果は、wait_for_completionメソッドの効果と同様です。 ただし、wait_for_successメソッドを呼び出すと、インスタンスが失敗するとエラーが返されます。
サブインスタンスに対する操作の実行
実行中のインスタンスは、1つ以上のサブインスタンスを含むことができる。 次のメソッドを呼び出して、サブインスタンスで操作を実行できます。
get_task_names
: すべてのサブインスタンスを取得します。 このメソッドは、すべてのサブインスタンスの名前を返します。instance=o.get_instance('my_instance_id') instance.get_task_names()
レスポンス例:
['jdbc_sql_task']
get_task_result
: サブインスタンスの実行結果を取得します。 このメソッドは、各サブインスタンスの実行結果をディクショナリの形式で返します。 サンプルコード:すべてのサブインスタンスの名前を取得します。
instance=o.execute_sql('select*frompyodps_irislimit1') print(instance.get_task_names())
レスポンス例:
['AnonymousSQLTask']
サブインスタンスの実行結果を取得します。
print(instance.get_task_result('AnonymousSQLTask'))
レスポンス例:
"sepallength","sepalwidth","petallength","petalwidth","name" 4.9,3.0,1.4,0.2,"Iris-setosa"
サブインスタンスの実行結果を取得します。
print(instance.get_task_results())
レスポンス例:
OrderedDict([('AnonymousSQLTask', '"sepallength","sepalwidth","petallength","petalwidth","name"\n4.9,3.0,1.4,0.2,"Iris-setosa"\n')])
get_task_progress
: インスタンスの実行中にサブインスタンスの現在の進行状況を取得します。instance=o.get_instance('20160519101349613gzbzufck2') while not instance.is_terminated(): for task_name in instance.get_task_names(): print(instance.id, instance.get_task_progress(task_name).get_stage_progress_formatted_string()) time.sleep(10)
レスポンス例:
20160519101349613gzbzufck2 2016-05-19 18:14:03 M1_Stg1_job0:0/1/1[100%]