すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:SQL

最終更新日:Mar 27, 2025

PyODPS は、MaxCompute でサポートされている基本的な SQL 文をサポートしています。このトピックでは、PyODPS で SQL 文を使用する方法について説明します。

背景情報

次の表に、PyODPS で MaxCompute SQL 文を実行するために使用できるメソッドを示します。

メソッド名

説明

execute_sql()/run_sql()

詳細については、「SQL 文の実行」をご参照ください。

open_reader()

詳細については、「SQL 文の実行結果の取得」をご参照ください。

説明

MaxCompute エントリオブジェクトのメソッドを使用して、すべての SQL 文を実行できるわけではありません。MaxCompute クライアントで実行できる特定の SQL 文は、execute_sql() メソッドと run_sql() メソッドを使用して実行できない場合があります。DDL 文または DML 文以外の文を実行するには、他のメソッドを使用する必要があります。たとえば、GRANT 文または REVOKE 文を実行するには run_security_query メソッドを使用し、API 操作を呼び出すには run_xflow メソッドまたは execute_xflow メソッドを使用する必要があります。

Python でユーザー定義関数 (UDF) を記述する場合、UDF によって参照されるリソースが動的に変更される場合は、execute_sql() メソッドで古いリソースの エイリアス を構成し、エイリアスを新しいリソースの名前として使用できます。この方法では、UDF を削除したり、別の UDF を作成したりする必要はありません。詳細については、「リソースエイリアスの構成」をご参照ください。

SQL 文の実行

このセクションでは、PyODPS の MaxCompute SQL 文について説明します。

  • パラメーター

    • statement: 実行する SQL 文。

    • hints: ランタイムパラメーター。hints パラメーターは DICT タイプです。

  • 戻り値

    execute_sql() メソッドと run_sql() メソッドを実行すると、タスクインスタンスに関する情報が返されます。詳細については、「タスクインスタンス」をご参照ください。

    • 例 1

      SQL 文を実行します。

      o.execute_sql('select * from table_name')  # 同期モードで文を実行します。SQL 文の実行が完了するまで、他のインスタンスはブロックされます。
      instance = o.run_sql('select * from table_name')  # 非同期モードで文を実行します。
      print(instance.get_logview_address())  # インスタンスの Logview URL を取得します。
      instance.wait_for_success()  # SQL 文の実行が完了するまで、他のインスタンスはブロックされます。

    • 例 2

      SQL 文の hints パラメーターを構成します。

      o.execute_sql('select * from pyodps_iris', hints={'odps.stage.mapper.split.size': 16})

      グローバルに sql.settings を構成して、ランタイムパラメーターを構成することもできます。次のコードは例を示しています。ランタイムパラメーターをグローバルに構成すると、SQL 文を実行するたびにランタイムパラメーターが有効になります。グローバルに構成できるパラメーターの詳細については、「フラグパラメーター」をご参照ください。

      from odps import options
      options.sql.settings = {'odps.stage.mapper.split.size': 16}
      o.execute_sql('select * from pyodps_iris')  # グローバル設定に基づいて hints パラメーターが自動的に構成されます。

SQL 文の実行結果の取得

open_reader() メソッドを呼び出して、SQL 文の実行結果を読み取ることができます。戻り値は、次のシナリオによって異なります。

  • テーブルデータを読み取る場合、構造化データが返されます。この場合、FOR 句を使用して各レコードを走査します。

    with o.execute_sql('select * from table_name').open_reader() as reader:
        for record in reader: # 各レコードを処理します。
            print(record)
  • desc などのコマンドを実行する場合、非構造化データが返されます。この場合、reader.raw 操作を呼び出して、コマンド出力を取得します。

    with o.execute_sql('desc table_name').open_reader() as reader:
        print(reader.raw)

open_reader() メソッドを呼び出すと、PyODPS は古い Result インターフェイスを自動的に呼び出します。これにより、タイムアウトが発生したり、取得できるデータレコード数が制限されたりする可能性があります。次のいずれかの方法を使用して、PyODPS が InstanceTunnel を呼び出すように指定できます。

  • スクリプトに options.tunnel.use_instance_tunnel =True を追加します。

  • open_reader(tunnel=True) を構成します。次のコードは例を示しています。PyODPS V0.7.7.1 以降では、open_reader() メソッドを使用して完全データを読み取ることができます。

    with o.execute_sql('select * from table_name').open_reader(tunnel=True) as reader:
        for record in reader:
            print(record)
説明
  • 古いバージョンの MaxCompute を使用している場合、または PyODPS が InstanceTunnel を呼び出すときにエラーが発生した場合、PyODPS はアラートを生成し、呼び出しオブジェクトを古い Result インターフェイスに自動的にスペックダウンします。アラートの情報に基づいて、問題の原因を特定できます。

  • MaxCompute のバージョンで古い Result インターフェイスのみがサポートされており、SQL 文のすべての実行結果を読み取る場合は、実行結果を別のテーブルに書き込み、open_reader() メソッドを使用してテーブルからデータを読み取ることができます。この操作は、MaxCompute プロジェクトの データ保護メカニズム に従います。

  • InstanceTunnel の詳細については、「InstanceTunnel」をご参照ください。

デフォルトでは、PyODPS はインスタンスから読み取ることができるデータ量を制限しません。ただし、プロジェクトオーナーは、インスタンスから読み取ることができるデータ量を制限するために、MaxCompute プロジェクトの保護設定を構成する場合があります。この場合、データは読み取り制限モードでのみ読み取ることができます。このモードでは、読み取ることができる行数は、プロジェクトの構成に基づいて制限されます。ほとんどの場合、最大 10,000 行を読み取ることができます。PyODPS は、インスタンスから読み取られたデータ量が制限されており、options.tunnel.limit_instance_tunnel が構成されていないことを検出すると、読み取り制限モードを自動的に有効にします。

  • プロジェクトが保護されており、読み取り制限モードを手動で有効にする場合は、open_reader(limit=True) のように、open_reader() メソッドに構成 limit=True を追加できます。また、構成 options.tunnel.limit_instance_tunnel = True を追加して、読み取り制限モードを有効にすることもできます。

  • DataWorks などの特定の環境では、options.tunnel.limit_instance_tunnel がデフォルトで True に設定されている場合があります。この場合、インスタンスからすべてのデータを読み取る場合は、open_reader(tunnel=True, limit=False) のように、open_reader() メソッドに構成 tunnel=Truelimit=False を追加する必要があります。

  • 重要

    プロジェクトが保護されており、構成 tunnel=Truelimit=False を使用して保護を解除できない場合は、プロジェクトオーナーに連絡して、関連する読み取り権限を付与してもらう必要があります。詳細については、「プロジェクトデータ保護」をご参照ください。

リソースエイリアスの構成

UDF によって参照されるリソースが動的に変更される場合は、古いリソースの エイリアス を構成し、エイリアスを新しいリソースの名前として使用できます。この方法では、UDF を削除したり、別の UDF を作成したりする必要はありません。

from odps.models import Schema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file

@annotate('bigint->bigint')
class Example(object):
    def __init__(self):
        self.n = int(get_cache_file('test_alias_res1').read())

    def evaluate(self, arg):
        return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
                  class_type='test_alias.Example',
                  resources=['test_alias.py', 'test_alias_res1'])

table = o.create_table(
    'test_table',
    schema=Schema.from_lists(['size'], ['bigint']),
    if_not_exists=True
)

data = [[1, ], ]
# 値 1 のみを格納するデータ行を書き込みます。
o.write_table(table, 0, [table.new_record(it) for it in data])

with o.execute_sql(
    'select test_alias_func(size) from test_table').open_reader() as reader:
    print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# リソース res1 のエイリアスをリソース res2 の名前として設定します。UDF またはリソースを変更する必要はありません。
with o.execute_sql(
    'select test_alias_func(size) from test_table',
    aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
    print(reader[0][0])

特定のシナリオでは、実行する SQL 文の biz_id を指定する必要があります。指定しないと、エラーが発生します。エラーが発生した場合は、グローバル optionsbiz_id を構成してエラーを修正できます。

from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')