Topik ini memberikan contoh cara melakukan operasi pada tabel dalam skenario tipikal menggunakan SDK untuk Python.
Kueri semua tabel
Gunakan metode list_tables() dari objek entri untuk menanyakan semua tabel dalam sebuah Proyek.
for table in odps.list_tables():
# Kueri semua tabel dalam sebuah proyek.Periksa apakah sebuah tabel ada
Gunakan metode exist_table() dari objek entri untuk memeriksa keberadaan tabel. Kemudian, gunakan metode get_table() untuk mendapatkan informasi tentang tabel tersebut.
t = odps.get_table('table_name')
t.schema
odps.Schema {
c_int_a bigint
c_int_b bigint
c_double_a double
c_double_b double
c_string_a string
c_string_b string
c_bool_a boolean
c_bool_b boolean
c_datetime_a datetime
c_datetime_b datetime
}
t.lifecycle
-1
print(t.creation_time)
2014-05-15 14:58:43
t.is_virtual_view
False
t.size
1408
t.schema.columns
[<column c_int_a, type bigint>,
<column c_int_b, type bigint>,
<column c_double_a, type double>,
<column c_double_b, type double>,
<column c_string_a, type string>,
<column c_string_b, type string>,
<column c_bool_a, type boolean>,
<column c_bool_b, type boolean>,
<column c_datetime_a, type datetime>,
<column c_datetime_b, type datetime>] Buat skema tabel
Berikut adalah dua metode untuk membuat skema tabel:
Buat skema berdasarkan kolom tabel dan partisi opsional.
from odps.models import Schema, Column, Partition columns = [ Column(name='num', type='bigint', comment='kolom ini'), Column(name='num2', type='double', comment='kolom ini 2'), ] partitions = [Partition(name='pt', type='string', comment='partisi ini')] schema = Schema(columns=columns, partitions=partitions)Setelah membuat skema, Anda dapat mengambil informasi kolom dan partisi.
Dapatkan informasi semua kolom.
print(schema.columns)Contoh respons:
[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]Dapatkan informasi kolom kunci partisi.
print(schema.partitions)Contoh respons:
[<partition pt, type string>]Dapatkan nama kolom non-partisi.
print(schema.names)Contoh respons:
['num', 'num2']Dapatkan tipe data kolom non-partisi.
print(schema.types)Contoh respons:
[bigint, double]
Buat skema dengan memanggil metode
Schema.from_lists(). Metode ini lebih mudah digunakan, tetapi tidak memungkinkan pengaturan langsung komentar untuk kolom dan partisi.from odps.models import Schema schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string']) print(schema.columns)Contoh respons:
[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
Buat tabel
Gunakan metode o.create_table() untuk membuat tabel dengan menggunakan skema tabel atau dengan menentukan nama dan tipe data kolom. Pastikan tipe data kolom valid saat membuat tabel.
Gunakan skema tabel untuk membuat tabel
Saat menggunakan skema tabel, buat skema terlebih dahulu sebelum membuat tabel.
# Buat skema tabel.
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
# Buat tabel dengan menggunakan skema yang telah dibuat.
table = o.create_table('my_new_table', schema)
# Buat tabel hanya jika tidak ada tabel dengan nama yang sama.
table = o.create_table('my_new_table', schema, if_not_exists=True)
# Konfigurasikan siklus hidup tabel.
table = o.create_table('my_new_table', schema, lifecycle=7)Gunakan metode print(o.exist_table('my_new_table')) untuk memeriksa apakah tabel berhasil dibuat. Jika True dikembalikan, tabel berhasil dibuat.
Buat tabel dengan menentukan nama dan tipe data kolom yang akan disertakan dalam tabel
# Buat tabel partisi bernama my_new_table dengan kolom umum dan kolom kunci partisi yang ditentukan.
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
# Buat tabel non-partisi bernama my_new_table02.
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)Gunakan metode print(o.exist_table('my_new_table')) untuk memeriksa apakah tabel berhasil dibuat. Jika True dikembalikan, tabel berhasil dibuat.
Buat tabel dengan menentukan nama dan tipe data kolom yang akan disertakan dalam tabel: tipe data baru di edisi tipe data MaxCompute V2.0
Secara default, hanya tipe data BIGINT, DOUBLE, DECIMAL, STRING, DATETIME, BOOLEAN, MAP, dan ARRAY yang didukung saat membuat tabel. Untuk menggunakan tipe data lain seperti TINYINT dan STRUCT, atur options.sql.use_odps2_extension ke True. Contoh:
from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')Hapus tabel
Gunakan metode delete_table() untuk menghapus tabel yang ada.
o.delete_table('my_table_name', if_exists=True) # Hapus tabel hanya jika tabel tersebut ada.
t.drop() # Panggil metode drop() untuk menghapus tabel jika tabel tersebut ada.
Kelola partisi tabel
Periksa apakah tabel memiliki partisi.
table = o.get_table('my_new_table') if table.schema.partitions: print('Tabel %s memiliki partisi.' % table.name)Iterasikan semua partisi dalam tabel.
table = o.get_table('my_new_table') for partition in table.partitions: # Iterasikan semua partisi. print(partition.name) # Langkah iterasi. Pada langkah ini, nama partisi ditampilkan. for partition in table.iterate_partitions(spec='pt=test'): # Iterasikan partisi level-2 dalam partisi bernama test. print(partition.name) # Langkah iterasi. Pada langkah ini, nama partisi ditampilkan. for partition in table.iterate_partitions(spec='dt>20230119'): # Iterasikan partisi level-2 dalam partisi yang memenuhi kondisi dt>20230119. print(partition.name) # Langkah iterasi. Pada langkah ini, nama partisi ditampilkan.nullMulai PyODPS 0.11.3, Anda dapat menentukan ekspresi logis untuk
iterate_partitions, sepertidt>20230119dalam contoh sebelumnya.Periksa keberadaan partisi.
table = o.get_table('my_new_table') table.exist_partition('pt=test,sub=2015')Dapatkan informasi tentang partisi.
table = o.get_table('my_new_table') partition = table.get_partition('pt=test') print(partition.creation_time) partition.sizeBuat partisi.
t = o.get_table('my_new_table') t.create_partition('pt=test', if_not_exists=True) # Buat partisi hanya jika tidak ada partisi dengan nama yang sama.Hapus partisi yang ada.
t = o.get_table('my_new_table') t.delete_partition('pt=test', if_exists=True) # Set parameter if_exists ke True. Ini memastikan bahwa partisi dihapus hanya jika partisi tersebut ada. partition.drop() # Panggil metode drop() untuk menghapus partisi jika partisi tersebut ada.
Baca data dari tabel
Anda dapat membaca data dari tabel dengan beberapa cara:
Gunakan metode head untuk mengambil hingga 10.000 catatan pertama di setiap tabel.
from odps import ODPS t = o.get_table('dual') for record in t.head(3): # Proses setiap rekaman.Eksekusi pernyataan dengan klausa WITH.
with t.open_reader(partition='pt=test') as reader: count = reader.count for record in reader[5:10] # Anda dapat menjalankan pernyataan beberapa kali hingga semua rekaman dibaca. Jumlah rekaman ditentukan oleh count. Anda dapat mengubah kode menjadi kode operasi paralel. # Proses satu rekaman.Eksekusi pernyataan tanpa klausa WITH.
reader = t.open_reader(partition='pt=test') count = reader.count for record in reader[5:10] # Anda dapat menjalankan pernyataan beberapa kali hingga semua rekaman dibaca. Jumlah rekaman ditentukan oleh count. Anda dapat mengubah kode menjadi kode operasi paralel. # Proses satu rekaman.Baca data langsung ke Pandas DataFrames.
with t.open_reader(partition='pt=test') as reader: pd_df = reader.to_pandas()
Tulis data ke tabel
Mirip dengan open_reader, gunakan open_writer dari objek tabel untuk membuka penulis dan menulis data ke tabel.
Eksekusi pernyataan dengan klausa WITH.
with t.open_writer(partition='pt=test') as writer: records = [[111, 'aaa', True], # Daftar dapat digunakan. [222, 'bbb', False], [333, 'ccc', True], [444, 'Chinese', False]] writer.write(records) # Rekaman dapat berupa objek iterable. records = [t.new_record([111, 'aaa', True]), # Objek rekaman dapat digunakan. t.new_record([222, 'bbb', False]), t.new_record([333, 'ccc', True]), t.new_record([444, 'Chinese', False])] writer.write(records)Jika partisi yang ditentukan tidak ada, atur parameter create_partition ke True untuk membuat partisi. Contoh:
with t.open_writer(partition='pt=test', create_partition=True) as writer: records = [[111, 'aaa', True], # Daftar dapat digunakan. [222, 'bbb', False], [333, 'ccc', True], [444, 'Chinese', False]] writer.write(records) # Rekaman dapat berupa objek iterable.Cara yang lebih sederhana adalah menggunakan metode write_table dari objek MaxCompute untuk menulis data.
records = [[111, 'aaa', True], # Daftar dapat digunakan. [222, 'bbb', False], [333, 'ccc', True], [444, 'Chinese', False]] o.write_table('test_table', records, partition='pt=test', create_partition=True)nullSetiap kali Anda memanggil
write_table, MaxCompute menghasilkan file di server. Operasi ini memakan waktu. Jika banyak file dihasilkan, efisiensi operasi kueri berikutnya berkurang. Oleh karena itu, disarankan untuk menulis beberapa rekaman sekaligus atau menyediakan objek generator saat menggunakan metodewrite_table.Saat menggunakan metode
write_tableuntuk menulis data, data baru ditambahkan ke data yang ada. PyODPS tidak menyediakan opsi untuk menimpa data yang ada. Anda perlu secara manual menghapus data yang ingin Anda timpa. Untuk tabel non-partisi, gunakantable.truncate(). Untuk tabel partisi, hapus partisi terlebih dahulu.
Gunakan format Arrow untuk membaca dan menulis data
Apache Arrow adalah format lintas bahasa yang mendukung pertukaran data antar platform. Sejak 2021, MaxCompute mendukung pembacaan data tabel menggunakan format Arrow. Versi PyODPS 0.11.2 dan yang lebih baru mendukung fitur ini. Setelah menginstal pyarrow di lingkungan Python Anda, tambahkan konfigurasi arrow=True saat memanggil open_writer. Dengan cara ini, Anda dapat membaca atau menulis Arrow RecordBatches.
import pandas as pd
import pyarrow as pa
with t.open_writer(partition='pt=test', create_partition=True, arrow=True) as writer:
records = [[111, 'aaa', True],
[222, 'bbb', False],
[333, 'ccc', True],
[444, 'Chinese', False]]
df = pd.DataFrame(records, columns=["int_val", "str_val", "bool_val"])
# Tulis RecordBatch.
batch = pa.RecordBatch.from_pandas(df)
writer.write(batch)
# Anda juga dapat menggunakan Pandas DataFrame secara langsung.
writer.write(df)