PyODPS は、MaxCompute でサポートされている基本的な SQL 文をサポートしています。このトピックでは、PyODPS で SQL 文を使用する方法について説明します。
背景情報
次の表に、PyODPS で MaxCompute SQL 文を実行するために使用できるメソッドを示します。
メソッド名 | 説明 |
execute_sql()/run_sql() | 詳細については、「SQL 文の実行」をご参照ください。 |
open_reader() | 詳細については、「SQL 文の実行結果の取得」をご参照ください。 |
MaxCompute エントリオブジェクトのメソッドを使用して、すべての SQL 文を実行できるわけではありません。MaxCompute クライアントで実行できる特定の SQL 文は、execute_sql()
メソッドと run_sql()
メソッドを使用して実行できない場合があります。DDL 文または DML 文以外の文を実行するには、他のメソッドを使用する必要があります。たとえば、GRANT 文または REVOKE 文を実行するには run_security_query
メソッドを使用し、API 操作を呼び出すには run_xflow
メソッドまたは execute_xflow
メソッドを使用する必要があります。
Python でユーザー定義関数 (UDF) を記述する場合、UDF によって参照されるリソースが動的に変更される場合は、execute_sql() メソッドで古いリソースの エイリアス を構成し、エイリアスを新しいリソースの名前として使用できます。この方法では、UDF を削除したり、別の UDF を作成したりする必要はありません。詳細については、「リソースエイリアスの構成」をご参照ください。
SQL 文の実行
このセクションでは、PyODPS の MaxCompute SQL 文について説明します。
パラメーター
statement: 実行する SQL 文。
hints: ランタイムパラメーター。hints パラメーターは DICT タイプです。
戻り値
execute_sql()
メソッドとrun_sql()
メソッドを実行すると、タスクインスタンスに関する情報が返されます。詳細については、「タスクインスタンス」をご参照ください。例
例 1
SQL 文を実行します。
o.execute_sql('select * from table_name') # 同期モードで文を実行します。SQL 文の実行が完了するまで、他のインスタンスはブロックされます。 instance = o.run_sql('select * from table_name') # 非同期モードで文を実行します。 print(instance.get_logview_address()) # インスタンスの Logview URL を取得します。 instance.wait_for_success() # SQL 文の実行が完了するまで、他のインスタンスはブロックされます。
例 2
SQL 文の hints パラメーターを構成します。
o.execute_sql('select * from pyodps_iris', hints={'odps.stage.mapper.split.size': 16})
グローバルに
sql.settings
を構成して、ランタイムパラメーターを構成することもできます。次のコードは例を示しています。ランタイムパラメーターをグローバルに構成すると、SQL 文を実行するたびにランタイムパラメーターが有効になります。グローバルに構成できるパラメーターの詳細については、「フラグパラメーター」をご参照ください。from odps import options options.sql.settings = {'odps.stage.mapper.split.size': 16} o.execute_sql('select * from pyodps_iris') # グローバル設定に基づいて hints パラメーターが自動的に構成されます。
SQL 文の実行結果の取得
open_reader()
メソッドを呼び出して、SQL 文の実行結果を読み取ることができます。戻り値は、次のシナリオによって異なります。
テーブルデータを読み取る場合、構造化データが返されます。この場合、
FOR
句を使用して各レコードを走査します。with o.execute_sql('select * from table_name').open_reader() as reader: for record in reader: # 各レコードを処理します。 print(record)
desc
などのコマンドを実行する場合、非構造化データが返されます。この場合、reader.raw
操作を呼び出して、コマンド出力を取得します。with o.execute_sql('desc table_name').open_reader() as reader: print(reader.raw)
open_reader()
メソッドを呼び出すと、PyODPS は古い Result インターフェイスを自動的に呼び出します。これにより、タイムアウトが発生したり、取得できるデータレコード数が制限されたりする可能性があります。次のいずれかの方法を使用して、PyODPS が InstanceTunnel を呼び出すように指定できます。
スクリプトに
options.tunnel.use_instance_tunnel =True
を追加します。open_reader(tunnel=True)
を構成します。次のコードは例を示しています。PyODPS V0.7.7.1 以降では、open_reader()
メソッドを使用して完全データを読み取ることができます。with o.execute_sql('select * from table_name').open_reader(tunnel=True) as reader: for record in reader: print(record)
古いバージョンの MaxCompute を使用している場合、または PyODPS が InstanceTunnel を呼び出すときにエラーが発生した場合、PyODPS はアラートを生成し、呼び出しオブジェクトを古い Result インターフェイスに自動的にスペックダウンします。アラートの情報に基づいて、問題の原因を特定できます。
MaxCompute のバージョンで古い Result インターフェイスのみがサポートされており、SQL 文のすべての実行結果を読み取る場合は、実行結果を別のテーブルに書き込み、open_reader() メソッドを使用してテーブルからデータを読み取ることができます。この操作は、MaxCompute プロジェクトの データ保護メカニズム に従います。
InstanceTunnel の詳細については、「InstanceTunnel」をご参照ください。
デフォルトでは、PyODPS はインスタンスから読み取ることができるデータ量を制限しません。ただし、プロジェクトオーナーは、インスタンスから読み取ることができるデータ量を制限するために、MaxCompute プロジェクトの保護設定を構成する場合があります。この場合、データは読み取り制限モードでのみ読み取ることができます。このモードでは、読み取ることができる行数は、プロジェクトの構成に基づいて制限されます。ほとんどの場合、最大 10,000 行を読み取ることができます。PyODPS は、インスタンスから読み取られたデータ量が制限されており、options.tunnel.limit_instance_tunnel
が構成されていないことを検出すると、読み取り制限モードを自動的に有効にします。
プロジェクトが保護されており、読み取り制限モードを手動で有効にする場合は、
open_reader(limit=True)
のように、open_reader()
メソッドに構成limit=True
を追加できます。また、構成options.tunnel.limit_instance_tunnel = True
を追加して、読み取り制限モードを有効にすることもできます。DataWorks などの特定の環境では、
options.tunnel.limit_instance_tunnel
がデフォルトで True に設定されている場合があります。この場合、インスタンスからすべてのデータを読み取る場合は、open_reader(tunnel=True, limit=False)
のように、open_reader()
メソッドに構成tunnel=True
とlimit=False
を追加する必要があります。
プロジェクトが保護されており、構成 tunnel=True
と limit=False
を使用して保護を解除できない場合は、プロジェクトオーナーに連絡して、関連する読み取り権限を付与してもらう必要があります。詳細については、「プロジェクトデータ保護」をご参照ください。
リソースエイリアスの構成
UDF によって参照されるリソースが動的に変更される場合は、古いリソースの エイリアス を構成し、エイリアスを新しいリソースの名前として使用できます。この方法では、UDF を削除したり、別の UDF を作成したりする必要はありません。
from odps.models import Schema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->bigint')
class Example(object):
def __init__(self):
self.n = int(get_cache_file('test_alias_res1').read())
def evaluate(self, arg):
return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
class_type='test_alias.Example',
resources=['test_alias.py', 'test_alias_res1'])
table = o.create_table(
'test_table',
schema=Schema.from_lists(['size'], ['bigint']),
if_not_exists=True
)
data = [[1, ], ]
# 値 1 のみを格納するデータ行を書き込みます。
o.write_table(table, 0, [table.new_record(it) for it in data])
with o.execute_sql(
'select test_alias_func(size) from test_table').open_reader() as reader:
print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# リソース res1 のエイリアスをリソース res2 の名前として設定します。UDF またはリソースを変更する必要はありません。
with o.execute_sql(
'select test_alias_func(size) from test_table',
aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
print(reader[0][0])
特定のシナリオでは、実行する SQL 文の biz_id を指定する必要があります。指定しないと、エラーが発生します。エラーが発生した場合は、グローバル options で biz_id を構成してエラーを修正できます。
from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')