PyODPS supports MaxCompute SQL queries in both synchronous and asynchronous modes, and provides methods to read query results into records, raw output, or pandas DataFrames. Use this guide to submit SQL from Python code and retrieve results efficiently.
Usage notes
-
Use
execute_sql('statement')for synchronous execution andrun_sql('statement')for asynchronous execution. Both methods return a running task instance. -
MaxCompute does not support reading instance results in the Arrow format.
-
Some SQL statements that work in the MaxCompute console cannot be executed in PyODPS. For statements other than DDL (Data Definition Language) and DML (Data Manipulation Language), use the appropriate method:
-
run_security_query— execute GRANT and REVOKE statements -
run_xfloworexecute_xflow— execute Machine Learning Platform for AI (PAI) statements
-
-
SQL jobs are billed based on the number of jobs submitted. See Billing overview for details.
Execute SQL statements
Both execute_sql() and run_sql() submit SQL to MaxCompute. The difference is whether your code waits for the result.
import os
from odps import ODPS
# Store credentials in environment variables rather than hardcoding them
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='<your-project>',
endpoint='<your-endpoint>',
)
# Synchronous: blocks until the SQL job completes
o.execute_sql('select * from table_name')
# Asynchronous: returns immediately; the job runs in the background
instance = o.run_sql('select * from table_name')
print(instance.get_logview_address()) # Get the Logview URL for monitoring
instance.wait_for_success() # Block until the job finishes
The synchronous method blocks the current thread until the job completes. The asynchronous method returns a task instance immediately, letting you submit multiple jobs in parallel and wait for them selectively.
When running asynchronous queries in parallel, make sure you understand the dependency order between queries before submitting them. Isolate long-running async tasks into separate connections when the tasks need to outlive the current session.
Set runtime parameters
Pass a hints parameter as a dict to override session settings for a single execution:
o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
To apply the same settings to every execution in the session, configure options.sql.settings globally:
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris') # Picks up the global hints automatically
Read query results
Use open_reader to access results after a query completes. Choose the approach based on what the query returns and how much data you need.
| Scenario | Approach |
|---|---|
| Structured data (small to medium result set) | Iterate over records with open_reader() |
Raw text output (for example, DESC command) |
Read reader.raw |
| Load all results into memory | reader.to_pandas() |
| Large result set with parallel download | reader.to_pandas(n_process=N) |
Structured data — iterate over records:
with o.execute_sql('select * from table_name').open_reader() as reader:
for record in reader:
print(record)
Raw output (for example, from a DESC command) — read reader.raw:
with o.execute_sql('desc table_name').open_reader() as reader:
print(reader.raw)
Choose between Instance Tunnel and the Result interface
open_reader supports two backends:
| Backend | When to use | How to select |
|---|---|---|
| Instance Tunnel | Opt-in; supports large result sets | Set options.tunnel.use_instance_tunnel = True, then use tunnel=True |
| Result interface | Default, or fallback for older MaxCompute versions or when Instance Tunnel errors | tunnel=False (or set options.tunnel.use_instance_tunnel = False) |
PyODPS automatically downgrades to the Result interface if Instance Tunnel fails and logs an alert explaining the reason. To switch backends manually:
# Use Instance Tunnel explicitly
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
for record in reader:
print(record)
# Use the Result interface explicitly
with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader:
for record in reader:
print(record)
Limit the number of records downloaded
To cap the number of records returned, pass a limit option to open_reader, or set options.tunnel.limit_instance_tunnel = True. Without explicit configuration, MaxCompute applies the project-level Tunnel limit — typically 10,000 records per download.
Read results into a pandas DataFrame
Call to_pandas() on the reader to load all results into a pandas DataFrame:
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
pd_df = reader.to_pandas() # Returns a pandas DataFrame
Speed up reading with multiple processes
Multi-process reading requires PyODPS 0.11.3 or later.
Pass n_process to to_pandas() to distribute the download across multiple processes. The SDK splits the result set into batches and downloads them in parallel.
import multiprocessing
n_process = multiprocessing.cpu_count()
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
pd_df = reader.to_pandas(n_process=n_process)
Configure resource aliases for UDFs
When SQL runs a user-defined function (UDF) that references a resource file, and the resource content changes, you would normally have to delete and recreate the UDF. Instead, use the aliases parameter to map the old resource name to the new one — no UDF changes needed.
The following example sets up a UDF that reads an integer from a resource file and adds it to the input value:
from odps.models import Schema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->bigint')
class Example(object):
def __init__(self):
self.n = int(get_cache_file('test_alias_res1').read())
def evaluate(self, arg):
return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
class_type='test_alias.Example',
resources=['test_alias.py', 'test_alias_res1'])
table = o.create_table(
'test_table',
schema=Schema.from_lists(['size'], ['bigint']),
if_not_exists=True
)
data = [[1, ], ]
o.write_table(table, 0, [table.new_record(it) for it in data])
with o.execute_sql(
'select test_alias_func(size) from test_table').open_reader() as reader:
print(reader[0][0])
To run the same query with a different resource — without modifying the UDF — pass aliases:
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# Map the old resource name to the new one at query time
with o.execute_sql(
'select test_alias_func(size) from test_table',
aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
print(reader[0][0])
Run SQL in interactive environments
PyODPS includes SQL plug-ins for IPython and Jupyter that support parameterized queries. See the user experience enhancement documentation for setup instructions.
Set biz_id
In some cases, you may need to submit biz_id when you submit SQL statements. Otherwise, an error occurs when the SQL statements are executed. Set it globally so it applies to all executions in the session:
from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')