本文為您介紹Python SDK中執行SQL命令相關的典型情境操作樣本。
注意事項
PyODPS支援MaxCompute SQL查詢,並可以讀取執行的結果,使用時有以下注意事項。
執行SQL
import os
from odps import ODPS
# 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret,
# 不建議直接使用 Access Key ID / Access Key Secret 字串
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
o.execute_sql('select * from table_name') # 同步的方式執行,會阻塞直到SQL執行完成。
instance = o.run_sql('select * from table_name') # 以非同步方式提交
print(instance.get_logview_address()) # 擷取logview地址
instance.wait_for_success() # 阻塞直到完成Python SDK支援執行多條SQL命令,無數量限制,並提供同步和非同步兩種執行方式。同步執行會阻塞當前線程,直到命令完成並返回結果;而非同步執行不會等待命令完成,從而提高程式的並發處理能力和效率,減少因 I/O 操作導致的延遲。
設定運行參數
在運行時如果需要設定參數,您可以通過設定hints參數來實現,參數的類型是dict。
o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})您可以對於全域配置設定sql.settings,後續每次運行時則都會添加相關的運行時參數。
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris') # 會根據全域配置添加hints讀取SQL執行結果
運行SQL的Instance能夠直接執行open_reader操作讀取SQL執行結果。讀取時會出現以下兩種情況:
SQL返回了結構化的資料。
with o.execute_sql('select * from table_name').open_reader() as reader: for record in reader: print(record) # 處理每一個recordSQL可能執行了
desc命令,這時可以通過reader.raw取到原始的SQL執行結果。with o.execute_sql('desc table_name').open_reader() as reader: print(reader.raw)
設定使用哪種結果介面
如果您設定了options.tunnel.use_instance_tunnel == True,在後續調用open_reader時,PyODPS會預設調用Instance Tunnel, 否則會調用舊的Result介面。如果您使用了版本較低的 MaxCompute服務,或者調用Instance Tunnel出現了問題,PyODPS會給出警示並自動降級到舊的Result介面,您可根據警示資訊判斷導致降級的原因。如果Instance Tunnel的返回結果不合預期, 您可以將該選項設為False,在調用open_reader時,也可以使用tunnel參數來指定使用何種結果介面。
使用Instance Tunnel
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader: for record in reader: print(record) # 處理每一個record使用 Results 介面
with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader: for record in reader: print(record) # 處理每一個record
設定讀取資料規模
如果您想要限制下載資料的規模,可以為open_reader增加limit選項, 或者設定 options.tunnel.limit_instance_tunnel = True 。如果未設定 options.tunnel.limit_instance_tunnel,MaxCompute會自動開啟資料量限制,此時,可下載的資料條數受到Project配置的Tunnel下載資料規模數限制, 通常該限制為10000條。
設定讀取結果為pandas DataFrame
# 直接使用 reader 的 to_pandas 方法
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
# pd_df 類型為 pandas DataFrame
pd_df = reader.to_pandas()設定讀取速度(進程數)
多進程加速僅在 PyODPS 0.11.3 及以上版本中支援。
您可以通過n_process指定使用進程數。
import multiprocessing
n_process = multiprocessing.cpu_count()
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
# n_process 指定成機器核心數
pd_df = reader.to_pandas(n_process=n_process)設定alias
在運行SQL時,如果某個UDF引用的資源是動態變化的,您可以alias舊的資源名到新的資源,這樣就可以避免重新刪除並重新建立UDF。
from odps.models import Schema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->bigint')
class Example(object):
def __init__(self):
self.n = int(get_cache_file('test_alias_res1').read())
def evaluate(self, arg):
return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
class_type='test_alias.Example',
resources=['test_alias.py', 'test_alias_res1'])
table = o.create_table(
'test_table',
schema=Schema.from_lists(['size'], ['bigint']),
if_not_exists=True
)
data = [[1, ], ]
# 寫入一行資料,只包含一個值1
o.write_table(table, 0, [table.new_record(it) for it in data])
with o.execute_sql(
'select test_alias_func(size) from test_table').open_reader() as reader:
print(reader[0][0])res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# 把內容為1的資源alias成內容為2的資源,不需要修改UDF或資源
with o.execute_sql(
'select test_alias_func(size) from test_table',
aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
print(reader[0][0])在互動式環境執行 SQL
在ipython和jupyter裡支援使用SQL外掛程式的方式運行SQL,且支援參數化查詢, 詳情參考互動體驗增強文檔。
設定biz_id
在少數情形下,在提交SQL時,可能需要同時提交biz_id,否則執行會報錯。此時,您可以在全域options裡設定biz_id。
from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')