このトピックでは、SDK for Pythonを使用して、一般的なシナリオでテーブルに対して操作を実行する方法の例を示します。
すべてのテーブルの照会
エントリオブジェクトのlist_tables()
メソッドを使用して、プロジェクト内のすべてのテーブルをクエリできます。
for table in odps.list_tables():
# Query all tables in a project.
テーブルが存在するかどうかを確認する
エントリオブジェクトのexist_table()
メソッドを使用して、テーブルが存在するかどうかを確認できます。 次に、get_table()
メソッドを使用して、テーブルに関する情報を取得します。
t = odps.get_table('table_name')
t.schema
odps.Schema {
c_int_a bigint
c_int_b bigint
c_double_a double
c_double_b double
c_string_a string
c_string_b string
c_bool_a boolean
c_bool_b boolean
c_datetime_a datetime
c_datetime_b datetime
}
t.lifecycle
-1
print(t.creation_time)
2014-05-15 14:58:43
t.is_virtual_view
False
t.size
1408
t.schema.columns
[<column c_int_a, type bigint>,
<column c_int_b, type bigint>,
<column c_double_a, type double>,
<column c_double_b, type double>,
<column c_string_a, type string>,
<column c_string_b, type string>,
<column c_bool_a, type boolean>,
<column c_bool_b, type boolean>,
<column c_datetime_a, type datetime>,
<column c_datetime_b, type datetime>]
テーブルスキーマの作成
次のいずれかの方法を使用して、テーブルスキーマを作成できます。
テーブル列とオプションのパーティションに基づいてスキーマを作成します。
from odps.models import Schema, Column, Partition columns = [ Column(name='num', type='bigint', comment='the column'), Column(name='num2', type='double', comment='the column2'), ] partitions = [Partition(name='pt', type='string', comment='the partition')] schema = Schema(columns=columns, partitions=partitions)
スキーマを作成した後、列とパーティションに関する情報を取得できます。
すべての列に関する情報を取得します。
print(schema.columns)
レスポンス例:
[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
パーティションキー列に関する情報を取得します。
print(schema.partitions)
レスポンス例:
[<partition pt, type string>]
非パーティションキー列の名前を取得します。
print(schema.names)
レスポンス例:
['num', 'num2']
パーティションキー以外の列のデータ型を取得します。
print(schema.types)
レスポンス例:
[bigint, double]
schema. from_lists()
メソッドを呼び出してスキーマを作成します。 このメソッドは簡単に呼び出すことができますが、列とパーティションのコメントを直接設定することはできません。from odps.models import Schema schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string']) print(schema.columns)
レスポンス例:
[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
テーブルの作成
o.create_table()
メソッドを呼び出して、テーブルスキーマを使用するか、列の名前とデータ型を指定してテーブルを作成できます。 テーブルを作成するときは、テーブル内の列のデータ型が有効であることを確認する必要があります。
テーブルスキーマを使用してテーブルを作成する
テーブルスキーマを使用してテーブルを作成する場合、テーブルを作成する前にスキーマを作成する必要があります。
# Create a table schema.
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
# Create a table by using the schema that you created.
table = o.create_table('my_new_table', schema)
# Create a table only if no table with the same name exists.
table = o.create_table('my_new_table', schema, if_not_exists=True)
# Configure the lifecycle of the table.
table = o.create_table('my_new_table', schema, lifecycle=7)
print(o.exist_table('my_new_table '))
メソッドを呼び出して、テーブルが正常に作成されたかどうかを確認できます。 True
が返された場合、テーブルは正常に作成されます。
テーブルに含める列の名前とデータ型を指定してテーブルを作成します。
# Create a partitioned table named my_new_table with specified common columns and partition key columns.
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
# Create a non-partitioned table named my_new_table02.
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)
print(o.exist_table('my_new_table '))
メソッドを呼び出して、テーブルが正常に作成されたかどうかを確認できます。 True
が返された場合、テーブルは正常に作成されます。
テーブルに含める列の名前とデータ型を指定してテーブルを作成します。MaxCompute V2.0データ型エディションの新しいデータ型
既定では、テーブルを作成するときに、BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP、およびARRAYデータ型のみがサポートされます。 TINYINTやSTRUCTなどの他のデータ型を使用する必要がある場合は、options.sql.us e_odps2_extension
をTrueに設定する必要があります。 例:
from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')
テーブルの削除
delete_table()
メソッドを呼び出して、既存のテーブルを削除できます。
o.delete_table('my_table_name', if_exists=True) # Delete a table only if the table exists.
t.drop() # Call the drop() method to drop a table if the table exists.
テーブルパーティションの管理
テーブルがパーティション分割されているかどうかを確認します。
table = o.get_table('my_new_table') if table.schema.partitions: print('Table %s is partitioned.' % table.name)
テーブル内のすべてのパーティションを繰り返します。
table = o.get_table('my_new_table') for partition in table.partitions: # Iterate over all partitions. print(partition.name) # An iteration step. In this step, the partition name is displayed. for partition in table.iterate_partitions(spec='pt=test'): # Iterate over level-2 partitions in the partition named test. print(partition.name) # An iteration step. In this step, the partition name is displayed. for partition in table.iterate_partitions(spec='dt>20230119'): # Iterate over level-2 partitions in the partitions that meet the dt>20230119 condition. print(partition.name) # An iteration step. In this step, the partition name is displayed.
重要PyODPS 0.11.3以降では、上記の例の
dt>20230119
など、iterate_partitions
の論理式を指定できます。パーティションが存在するかどうかを確認します。
table = o.get_table('my_new_table') table.exist_partition('pt=test,sub=2015')
パーティションに関する情報を取得します。
table = o.get_table('my_new_table') partition = table.get_partition('pt=test') print(partition.creation_time) partition.size
パーティションを作成します。
t = o.get_table('my_new_table') t.create_partition('pt=test', if_not_exists=True) # Create a partition only if no partition with the same name exists.
既存のパーティションを削除します。
t = o.get_table('my_new_table') t.delete_partition('pt=test', if_exists=True) # Set the if_exists parameter to True. This ensures that a partition is deleted only if the partition exists. partition.drop() # Call the drop() method to drop a partition if the partition exists.
テーブルからのデータの読み取り
さまざまな方法でテーブルからデータを読み取ることができます。
headメソッドを使用して、各テーブルの最初の10,000以下のデータレコードを取得します。
from odps import ODPS t = o.get_table('dual') for record in t.head(3): # Process each record.
WITH句を使用してステートメントを実行します。
with t.open_reader(partition='pt=test') as reader: count = reader.count for record in reader[5:10] # You can execute the statement multiple times until all records are read. The number of records is specified by count. You can change the code to parallel-operation code. # Process one record.
WITH句を使用せずにステートメントを実行します。
reader = t.open_reader(partition='pt=test') count = reader.count for record in reader[5:10] # You can execute the statement multiple times until all records are read. The number of records is specified by count. You can change the code to parallel-operation code. # Process one record.
データをPandas DataFramesに直接読み取ります。
with t.open_reader(partition='pt=test') as reader: pd_df = reader.to_pandas()
テーブルへのデータの書き込み
open_reader
と同様に、テーブルオブジェクトのopen_writer
を使用して、writerを開き、テーブルにデータを書き込むことができます。
WITH句を使用してステートメントを実行します。
with t.open_writer(partition='pt=test') as writer: records = [[111, 'aaa', True], # A list can be used. [222, 'bbb', False], [333, 'ccc', True], [444, 'Chinese', False]] writer.write(records) # Records can be iterable objects. records = [t.new_record([111, 'aaa', True]), # Record objects can be used. t.new_record([222, 'bbb', False]), t.new_record([333, 'ccc', True]), t.new_record([444, 'Chinese', False])] writer.write(records)
指定されたパーティションが存在しない場合は、create_partitionパラメーターをTrueに設定してパーティションを作成します。 例:
with t.open_writer(partition='pt=test', create_partition=True) as writer: records = [[111, 'aaa', True], # A list can be used. [222, 'bbb', False], [333, 'ccc', True], [444, 'Chinese', False]] writer.write(records) # Records can be iterable objects.
より簡単な方法は、MaxComputeオブジェクトのwrite_tableメソッドを使用してデータを書き込むことです。
records = [[111, 'aaa', True], # A list can be used. [222, 'bbb', False], [333, 'ccc', True], [444, 'Chinese', False]] o.write_table('test_table', records, partition='pt=test', create_partition=True)
説明write_table
を呼び出すたびに、MaxComputeはサーバー上にファイルを生成します。 この操作には時間がかかる。 大量のファイルが生成されると、その後のクエリ操作の効率が低下します。 したがって、write_table
メソッドを使用する場合は、一度に複数のレコードを書き込むか、ジェネレータオブジェクトを提供することをお勧めします。write_table
メソッドを使用してデータを書き込むと、新しいデータが既存のデータに追加されます。 PyODPSは、既存のデータを上書きするオプションを提供しません。 上書きするデータを手動で削除する必要があります。 パーティション分割されていないテーブルの場合は、table.truncate()
を呼び出す必要があります。 パーティションテーブルの場合は、最初にパーティションを削除する必要があります。
矢印形式を使用してデータを読み書きする
Apache Arrowは、異なるプラットフォーム間のデータ交換をサポートする言語間の形式です。 2021以降、MaxComputeは矢印形式を使用したテーブルデータの読み取りをサポートしています。 PyODPS 0.11.2以降のバージョンはこの機能をサポートしています。 Python環境にpyarrowをインストールした後、open_writer
を呼び出すときにarrow=True
設定を追加できます。 この方法で、Arrow RecordBatchesを読み書きできます。
import pandas as pd
import pyarrow as pa
with t.open_writer(partition='pt=test', create_partition=True, arrow=True) as writer:
records = [[111, 'aaa', True],
[222, 'bbb', False],
[333, 'ccc', True],
[444, 'Chinese', False]]
df = pd.DataFrame(records, columns=["int_val", "str_val", "bool_val"])
# Write a RecordBatch.
batch = pa.RecordBatch.from_pandas(df)
writer.write(batch)
# You can also use Pandas DataFrame directly.
writer.write(df)