全部产品
Search
文档中心

MaxCompute:Contoh penggunaan SDK untuk Python: Pernyataan SQL

更新时间:Jun 19, 2025

Topik ini memberikan contoh cara mengeksekusi pernyataan SQL dalam skenario umum menggunakan SDK untuk Python.

Peringatan

PyODPS mendukung kueri MaxCompute SQL dan menyediakan metode untuk membaca hasil kueri. Saat mengeksekusi pernyataan MaxCompute SQL, perhatikan poin-poin berikut:

  • Gunakan metode execute_sql('statement') atau run_sql('statement') untuk mengeksekusi pernyataan SQL pada objek entri. Nilai yang dikembalikan adalah instance yang sedang berjalan. Untuk informasi lebih lanjut tentang nilai yang dikembalikan, lihat Instance Tugas.

  • MaxCompute tidak mengizinkan Anda membaca hasil instance dalam format Arrow.

  • Beberapa pernyataan SQL yang dapat dieksekusi di konsol MaxCompute tidak dapat dieksekusi di PyODPS. Jika ingin mengeksekusi pernyataan selain DDL dan DML, gunakan metode lain. Contohnya:

    • Gunakan metode run_security_query untuk mengeksekusi pernyataan GRANT dan REVOKE.

    • Gunakan metode run_xflow atau execute_xflow untuk mengeksekusi pernyataan Platform Pembelajaran Mesin untuk AI (PAI).

  • Jika memanggil mesin SQL untuk mengeksekusi pernyataan SQL, Anda akan dikenakan biaya berdasarkan jumlah pekerjaan SQL. Untuk informasi lebih lanjut tentang penagihan, lihat Ikhtisar.

Eksekusi pernyataan SQL

import os
from odps import ODPS
# Setel variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID ke ID AccessKey Anda.
# Setel variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET ke Rahasia AccessKey Anda.
# Kami menyarankan agar Anda tidak langsung menggunakan ID AccessKey atau Rahasia AccessKey Anda.
o = ODPS(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='proyek-default-anda',
    endpoint='titik-akhir-anda',
)
o.execute_sql('select * from nama_tabel')  # Eksekusi pernyataan untuk beberapa instance dalam mode sinkron. Eksekusi pernyataan untuk instance lainnya diblokir hingga eksekusi pernyataan untuk instance yang ditentukan selesai.
instance = o.run_sql ('run_select * from nama_tabel') # Eksekusi pernyataan untuk beberapa instance dalam mode asinkron.
print(instance.get_logview_address()) # Dapatkan URL Logview dari sebuah instance.
instance.wait_for_success()  # Eksekusi pernyataan untuk instance lainnya diblokir hingga eksekusi pernyataan untuk instance yang ditentukan selesai.

SDK Python mendukung eksekusi beberapa perintah SQL tanpa batasan jumlah dan menawarkan mode eksekusi sinkron serta asinkron. Eksekusi sinkron memblokir thread saat ini hingga perintah selesai dan mengembalikan hasilnya. Sebaliknya, eksekusi asinkron tidak menunggu perintah selesai, sehingga meningkatkan kemampuan pemrosesan konkuren program dan efisiensi sambil mengurangi latensi akibat operasi I/O.

Tentukan parameter waktu proses

Anda dapat menggunakan parameter hints untuk menetapkan parameter waktu proses. Nilai parameter hints adalah tipe DICT.

o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

Anda dapat mengonfigurasi parameter sql.settings secara global. Parameter waktu proses yang relevan secara otomatis ditambahkan selama setiap eksekusi.

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris') # Hints secara otomatis dikonfigurasi berdasarkan konfigurasi global.

Dapatkan hasil eksekusi pernyataan SQL

Anda dapat memanggil metode open_reader untuk mendapatkan hasil eksekusi pernyataan SQL. Saat hasil kueri sedang dibaca, situasi berikut mungkin terjadi:

  • Pernyataan SQL mengembalikan data terstruktur.

    with o.execute_sql('select * from nama_tabel').open_reader() as reader:
        for record in reader:
            print(record)  # Setiap rekaman diproses.
  • Jika perintah DESC dijalankan, Anda dapat menggunakan metode reader.raw untuk mendapatkan hasil eksekusi SQL asli.

    with o.execute_sql('desc nama_tabel').open_reader() as reader:
        print(reader.raw)

Tentukan antarmuka Hasil yang digunakan

Jika Anda menetapkan options.tunnel.use_instance_tunnel ke True saat menggunakan metode open_reader, PyODPS secara otomatis memanggil Instance Tunnel. Jika Anda menetapkan options.tunnel.use_instance_tunnel ke False, PyODPS memanggil antarmuka Hasil lama. Namun, jika menggunakan versi MaxCompute yang lebih lama atau terjadi kesalahan saat PyODPS memanggil Instance Tunnel, PyODPS menghasilkan peringatan dan secara otomatis menurunkan panggilan objek ke antarmuka Hasil lama. Anda dapat mengidentifikasi penyebab masalah berdasarkan informasi peringatan. Jika hasil yang dikembalikan oleh Instance Tunnel tidak sesuai harapan, Anda dapat mengubah nilai parameter options.tunnel.use_instance_tunnel menjadi False. Dengan cara ini, Anda juga dapat mengonfigurasi parameter tunnel untuk menentukan antarmuka Hasil yang ingin digunakan saat metode open_reader dipanggil.

  • Gunakan Instance Tunnel.

    with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
        for record in reader:
            print(record)  # Setiap rekaman diproses.
  • Gunakan antarmuka Hasil.

    with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader:
        for record in reader:
            print(record)  # Setiap rekaman diproses.

Batasi jumlah maksimum catatan data yang dapat diunduh

Untuk membatasi jumlah maksimum catatan data yang dapat diunduh, tambahkan opsi limit ke metode open_reader atau tetapkan options.tunnel.limit_instance_tunnel ke True. Jika tidak mengonfigurasi options.tunnel.limit_instance_tunnel, MaxCompute secara otomatis mengaktifkan limit. Dalam hal ini, jumlah maksimum catatan data yang dapat diunduh bergantung pada jumlah catatan data yang dapat diunduh menggunakan perintah Tunnel yang dikonfigurasi dalam proyek. Secara umum, maksimal 10.000 catatan data dapat diunduh sekaligus.

PyODPS memungkinkan Anda membaca data ke dalam pandas DataFrames.

# Gunakan langsung metode to_pandas dari pembaca.
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
    # Tipe pd_df adalah pandas DataFrame.
    pd_df = reader.to_pandas()

Tentukan kecepatan pembacaan data (jumlah proses)

null

Anda dapat menggunakan beberapa proses secara bersamaan untuk mempercepat pembacaan data hanya di PyODPS 0.11.3 dan versi yang lebih baru.

Gunakan parameter n_process untuk menentukan jumlah proses yang dapat digunakan.

import multiprocessing
n_process = multiprocessing.cpu_count()
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
    # Tetapkan n_process ke jumlah proses yang akan digunakan.
    pd_df = reader.to_pandas(n_process=n_process)

Konfigurasikan alias sumber daya

Saat pernyataan SQL dieksekusi, sumber daya yang dirujuk oleh fungsi yang ditentukan pengguna (UDF) mungkin berubah secara dinamis. Untuk menghindari penghapusan dan pembuatan ulang UDF, gunakan opsi alias untuk mengonfigurasi nama sumber daya lama sebagai alias sumber daya baru.

from odps.models import Schema

myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file

@annotate('bigint->bigint')
class Example(object):
    def __init__(self):
        self.n = int(get_cache_file('test_alias_res1').read())

    def evaluate(self, arg):
        return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
                  class_type='test_alias.Example',
                  resources=['test_alias.py', 'test_alias_res1'])

table = o.create_table(
    'test_table',
    schema=Schema.from_lists(['size'], ['bigint']),
    if_not_exists=True
)

data = [[1, ], ]
# Tulis baris data yang hanya berisi satu nilai 1.
o.write_table(table, 0, [table.new_record(it) for it in data])

with o.execute_sql(
    'select test_alias_func(size) from test_table').open_reader() as reader:
    print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# Gunakan nama sumber daya yang isinya 1 sebagai alias sumber daya yang isinya 2. Anda tidak perlu memodifikasi UDF atau sumber daya.
with o.execute_sql(
    'select test_alias_func(size) from test_table',
    aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
    print(reader[0][0])

Eksekusi pernyataan SQL dalam lingkungan interaktif

Anda dapat menggunakan plug-in SQL untuk mengeksekusi pernyataan SQL dan mengimplementasikan kueri berparameter di IPython dan Jupyter. Untuk informasi lebih lanjut, lihat dokumentasi peningkatan pengalaman pengguna.

Tetapkan biz_id

Dalam beberapa kasus, Anda mungkin perlu mengirimkan biz_id saat mengirimkan pernyataan SQL. Jika tidak, terjadi kesalahan saat pernyataan SQL dieksekusi. Anda dapat menetapkan biz_id dalam options secara global.

from odps import options

options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')