PyODPS mendukung pernyataan SQL dasar yang tersedia di MaxCompute. Topik ini menjelaskan cara menggunakan pernyataan SQL dalam PyODPS.
Informasi latar belakang
Tabel berikut menggambarkan metode yang dapat digunakan untuk mengeksekusi pernyataan SQL MaxCompute di PyODPS.
Nama Metode | Deskripsi |
execute_sql()/run_sql() | Untuk informasi lebih lanjut, lihat Eksekusi pernyataan SQL. |
open_reader() | Untuk informasi lebih lanjut, lihat Memperoleh hasil eksekusi pernyataan SQL. |
Tidak semua pernyataan SQL dapat dieksekusi menggunakan metode objek entri MaxCompute. Beberapa pernyataan SQL yang dapat dieksekusi pada klien MaxCompute mungkin gagal dieksekusi menggunakan metode execute_sql() dan run_sql(). Anda harus menggunakan metode lain untuk mengeksekusi pernyataan non-DDL atau non-DML. Sebagai contoh, gunakan metode run_security_query untuk mengeksekusi pernyataan GRANT atau REVOKE, serta metode run_xflow atau execute_xflow untuk memanggil Operasi API.
Saat menulis fungsi yang ditentukan pengguna (UDF) dalam Python, jika sumber daya yang dirujuk oleh UDF berubah secara dinamis, Anda dapat mengonfigurasi alias untuk sumber daya lama dalam metode execute_sql() dan menggunakan alias sebagai nama sumber daya baru. Dengan cara ini, Anda tidak perlu menghapus UDF atau membuat UDF lain. Untuk informasi lebih lanjut, lihat Mengonfigurasi alias sumber daya.
Eksekusi pernyataan SQL
Bagian ini menjelaskan pernyataan SQL MaxCompute dalam PyODPS:
Parameter
statement: Pernyataan SQL yang ingin dieksekusi.
hints: Parameter waktu proses. Parameter hints bertipe DICT.
Nilai Kembali
Informasi tentang instance tugas dikembalikan setelah mengeksekusi metode
execute_sql()danrun_sql(). Untuk informasi lebih lanjut, lihat Instance Tugas.Contoh
Contoh 1
Eksekusi pernyataan SQL.
o.execute_sql('select * from table_name') # Eksekusi pernyataan dalam mode sinkron. Instance lain diblokir hingga eksekusi pernyataan SQL selesai. instance = o.run_sql('select * from table_name') # Eksekusi pernyataan dalam mode asinkron. print(instance.get_logview_address()) # Dapatkan URL Logview dari instance. instance.wait_for_success() # Instance lain diblokir hingga eksekusi pernyataan SQL selesai.Contoh 2
Konfigurasikan parameter hints untuk pernyataan SQL.
o.execute_sql('select * from pyodps_iris', hints={'odps.stage.mapper.split.size': 16})Anda juga dapat mengonfigurasi global
sql.settingsuntuk mengatur parameter waktu proses. Kode berikut menunjukkan sebuah contoh. Setelah mengonfigurasi parameter waktu proses secara global, parameter tersebut akan berlaku setiap kali Anda mengeksekusi pernyataan SQL. Untuk informasi lebih lanjut tentang parameter yang dapat dikonfigurasi secara global, lihat Parameter Bendera.from odps import options options.sql.settings = {'odps.stage.mapper.split.size': 16} o.execute_sql('select * from pyodps_iris') # Parameter hints dikonfigurasi secara otomatis berdasarkan pengaturan global.
Memperoleh hasil eksekusi pernyataan SQL
Anda dapat memanggil metode open_reader() untuk membaca hasil eksekusi pernyataan SQL. Nilai kembali bervariasi berdasarkan skenario berikut:
Jika data tabel dibaca, data terstruktur dikembalikan. Dalam hal ini, gunakan klausa
FORuntuk melintasi setiap rekaman.with o.execute_sql('select * from table_name').open_reader() as reader: for record in reader: # Proses setiap rekaman. print(record)Jika Anda menjalankan perintah seperti
desc, data tidak terstruktur dikembalikan. Dalam hal ini, panggil operasireader.rawuntuk mendapatkan keluaran perintah.with o.execute_sql('desc table_name').open_reader() as reader: print(reader.raw)
Jika Anda memanggil metode open_reader(), PyODPS secara otomatis memanggil antarmuka Result lama. Ini dapat menyebabkan timeout atau membatasi jumlah rekaman data yang dapat diperoleh. Anda dapat menggunakan salah satu metode berikut untuk menentukan PyODPS memanggil InstanceTunnel:
Tambahkan
options.tunnel.use_instance_tunnel = Trueke dalam skrip.Konfigurasikan
open_reader(tunnel=True). Kode berikut menunjukkan sebuah contoh. Untuk PyODPS V0.7.7.1 dan versi lebih tinggi, Anda dapat menggunakan metodeopen_reader()untuk membaca data lengkap.with o.execute_sql('select * from table_name').open_reader(tunnel=True) as reader: for record in reader: print(record)
Jika Anda menggunakan versi MaxCompute yang lebih lama atau terjadi kesalahan saat PyODPS memanggil InstanceTunnel, PyODPS menghasilkan peringatan dan secara otomatis menurunkan objek panggilan ke antarmuka Result lama. Anda dapat mengidentifikasi penyebab masalah berdasarkan informasi peringatan.
Jika versi MaxCompute Anda hanya mendukung antarmuka Result lama dan Anda ingin membaca semua hasil eksekusi pernyataan SQL, Anda dapat menulis hasil eksekusi ke tabel lain dan kemudian menggunakan metode open_reader() untuk membaca data dari tabel tersebut. Operasi ini tunduk pada mekanisme perlindungan data proyek MaxCompute Anda.
Untuk informasi lebih lanjut tentang InstanceTunnel, lihat InstanceTunnel.
Secara default, PyODPS tidak membatasi jumlah data yang dapat dibaca dari instance. Namun, pemilik proyek dapat mengonfigurasi pengaturan perlindungan untuk proyek MaxCompute untuk membatasi jumlah data yang dapat dibaca dari instance. Dalam hal ini, data hanya dapat dibaca dalam mode pembatasan baca. Dalam mode ini, jumlah baris yang dapat dibaca dibatasi berdasarkan konfigurasi proyek. Dalam kebanyakan kasus, maksimum 10.000 baris dapat dibaca. Jika PyODPS mendeteksi bahwa jumlah data yang dibaca dari instance dibatasi dan options.tunnel.limit_instance_tunnel tidak dikonfigurasi, PyODPS secara otomatis mengaktifkan mode pembatasan baca.
Jika proyek Anda dilindungi dan Anda ingin secara manual mengaktifkan mode pembatasan baca, Anda dapat menambahkan konfigurasi
limit=Trueke metodeopen_reader(), sepertiopen_reader(limit=True). Anda juga dapat menambahkan konfigurasioptions.tunnel.limit_instance_tunnel = Trueuntuk mengaktifkan mode pembatasan baca.Dalam lingkungan tertentu, seperti DataWorks,
options.tunnel.limit_instance_tunnelmungkin diatur ke True secara default. Dalam hal ini, jika Anda ingin membaca semua data dari instance, Anda harus menambahkan konfigurasitunnel=Truedanlimit=Falseuntuk metodeopen_reader(), sepertiopen_reader(tunnel=True, limit=False).
Jika proyek Anda dilindungi dan konfigurasi tunnel=True dan limit=False tidak dapat digunakan untuk menghapus perlindungan, Anda harus menghubungi pemilik proyek untuk memberikan izin baca terkait. Untuk informasi lebih lanjut, lihat Perlindungan Data Proyek.
Mengonfigurasi alias sumber daya
Jika sumber daya yang dirujuk oleh UDF berubah secara dinamis, Anda dapat mengonfigurasi alias untuk sumber daya lama dan menggunakan alias sebagai nama sumber daya baru. Dengan cara ini, Anda tidak perlu menghapus UDF atau membuat UDF lain.
from odps.models import Schema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->bigint')
class Example(object):
def __init__(self):
self.n = int(get_cache_file('test_alias_res1').read())
def evaluate(self, arg):
return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
class_type='test_alias.Example',
resources=['test_alias.py', 'test_alias_res1'])
table = o.create_table(
'test_table',
schema=Schema.from_lists(['size'], ['bigint']),
if_not_exists=True
)
data = [[1, ], ]
# Tulis baris data yang hanya berisi satu nilai: 1.
o.write_table(table, 0, [table.new_record(it) for it in data])
with o.execute_sql(
'select test_alias_func(size) from test_table').open_reader() as reader:
print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# Tetapkan alias sumber daya res1 sebagai nama sumber daya res2. Anda tidak perlu memodifikasi UDF atau sumber daya.
with o.execute_sql(
'select test_alias_func(size) from test_table',
aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
print(reader[0][0])Dalam skenario tertentu, Anda harus menentukan biz_id untuk pernyataan SQL agar dapat dieksekusi. Jika tidak, terjadi kesalahan. Jika terjadi kesalahan, Anda dapat mengonfigurasi biz_id dalam options global untuk memperbaiki kesalahan.
from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')