PyODPS では、MaxCompute のテーブルおよびパーティションの作成、読み取り、書き込み、管理を行うためのメソッドが提供されています。本ページでは、実行可能なコード例を交えながら、代表的なテーブル操作について説明します。
すべてのテーブルを一覧表示
エントリーオブジェクトに対して list_tables() を呼び出すと、プロジェクト内のすべてのテーブルを反復処理できます。
for table in odps.list_tables():
# プロジェクト内のすべてのテーブルを照会します。テーブルの存在確認
エントリーオブジェクトに対して exist_table() を呼び出してテーブルの存在を確認し、get_table() を呼び出してテーブルのメタデータを取得します。
t = odps.get_table('table_name')
t.schema
odps.Schema {
c_int_a bigint
c_int_b bigint
c_double_a double
c_double_b double
c_string_a string
c_string_b string
c_bool_a boolean
c_bool_b boolean
c_datetime_a datetime
c_datetime_b datetime
}
t.lifecycle
-1
print(t.creation_time)
2014-05-15 14:58:43
t.is_virtual_view
False
t.size
1408
t.schema.columns
[<column c_int_a, type bigint>,
<column c_int_b, type bigint>,
<column c_double_a, type double>,
<column c_double_b, type double>,
<column c_string_a, type string>,
<column c_string_b, type string>,
<column c_bool_a, type boolean>,
<column c_bool_b, type boolean>,
<column c_datetime_a, type datetime>,
<column c_datetime_b, type datetime>]テーブルオブジェクトは以下のプロパティを公開しています。
プロパティ | 説明 |
| カラム定義を含む |
| ライフサイクル値( |
| テーブル作成時のタイムスタンプ |
| テーブルが仮想ビューであるかどうか |
| テーブルサイズ(バイト単位) |
| すべてのカラムオブジェクトのリスト |
テーブルスキーマの作成
テーブルスキーマを作成するには、2 つのアプローチがあります。
カラムおよびパーティションを明示的に定義
Schema、Column、Partition を odps.models からインポートし、カラムおよびパーティションのリストを Schema コンストラクターに渡します。このアプローチでは、各カラムおよびパーティションに対して comment パラメーターを指定できます。
from odps.models import Schema, Column, Partition
columns = [
Column(name='num', type='bigint', comment='the column'),
Column(name='num2', type='double', comment='the column2'),
]
partitions = [Partition(name='pt', type='string', comment='the partition')]
schema = Schema(columns=columns, partitions=partitions)スキーマを作成した後、以下のプロパティでその内容を確認できます。
すべてのカラム(パーティションキー列を含む):出力結果:
print(schema.columns)[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]パーティションキー列のみ:出力結果:
print(schema.partitions)[<partition pt, type string>]パーティション以外のカラム名:出力結果:
print(schema.names)['num', 'num2']パーティション以外のカラムのデータの型:出力結果:
print(schema.types)[bigint, double]
Schema.from_lists() を使用
Schema.from_lists() は、名前のリストとデータの型のリストを受け取る簡略化されたメソッドです。この方法はシンプルですが、カラムやパーティションに対するコメントを直接指定することはできません。
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
print(schema.columns)出力結果:
[<column num, type bigint>,
<column num2, type double>,
<partition pt, type string>]テーブルの作成
o.create_table() を呼び出してテーブルを作成します。すべてのカラムのデータの型が有効であることを確認してください。2 つのアプローチが利用可能です:1 つは Schema オブジェクトを渡す方法、もう 1 つはカラム定義を文字列として渡す方法です。
スキーマからテーブルを作成
まず Schema を構築し、それを create_table() に渡します。
# テーブルスキーマを作成します。
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
# 作成したスキーマを使用してテーブルを作成します。
table = o.create_table('my_new_table', schema)
# 同じ名前のテーブルが存在しない場合にのみテーブルを作成します。
table = o.create_table('my_new_table', schema, if_not_exists=True)
# テーブルのライフサイクルを設定します。
table = o.create_table('my_new_table', schema, lifecycle=7)テーブルが正しく作成されたかを確認します。
print(o.exist_table('my_new_table'))True が返された場合、テーブルは正常に作成されています。
カラム定義からテーブルを作成
カラム名およびデータの型を文字列またはタプルとして create_table() に渡します。
# 指定された共通カラムおよびパーティションキー列を持つパーティションテーブル my_new_table を作成します。
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
# パーティションを持たないテーブル my_new_table02 を作成します。
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)テーブルが正しく作成されたかを確認します。
print(o.exist_table('my_new_table'))True が返された場合、テーブルは正常に作成されています。
MaxCompute V2.0 拡張データの型を有効化
デフォルトでは、create_table() は BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP、ARRAY のみをサポートします。TINYINT や STRUCT などの追加のデータの型を使用するには、options.sql.use_odps2_extension を True に設定します。
from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')テーブルの削除
エントリーオブジェクトに対して delete_table() を呼び出すか、テーブルオブジェクトに対して drop() を呼び出します。
o.delete_table('my_table_name', if_exists=True) # テーブルが存在する場合にのみ削除します。
t.drop() # drop() メソッドを呼び出して、テーブルが存在する場合に削除します。テーブルパーティションの管理
テーブルがパーティション化されているかを確認
table = o.get_table('my_new_table')
if table.schema.partitions:
print('Table %s is partitioned.' % table.name)パーティションを反復処理
table = o.get_table('my_new_table')
for partition in table.partitions: # すべてのパーティションを反復処理します。
print(partition.name) # 反復処理のステップです。このステップではパーティション名が表示されます。
for partition in table.iterate_partitions(spec='pt=test'): # test という名前のパーティション内のレベル 2 のパーティションを反復処理します。
print(partition.name) # 反復処理のステップです。このステップではパーティション名が表示されます。
for partition in table.iterate_partitions(spec='dt>20230119'): # dt>20230119 条件を満たすパーティション内のレベル 2 のパーティションを反復処理します。
print(partition.name) # 反復処理のステップです。このステップではパーティション名が表示されます。iterate_partitions 内の論理式(例:dt>20230119)は、PyODPS 0.11.3 以降が必要です。
パーティションの存在確認
table = o.get_table('my_new_table')
table.exist_partition('pt=test,sub=2015')パーティション情報の取得
table = o.get_table('my_new_table')
partition = table.get_partition('pt=test')
print(partition.creation_time)
partition.sizeパーティションの作成
t = o.get_table('my_new_table')
t.create_partition('pt=test', if_not_exists=True) # 同じ名前のパーティションが存在しない場合にのみパーティションを作成します。パーティションの削除
t = o.get_table('my_new_table')
t.delete_partition('pt=test', if_exists=True) # if_exists パラメーターを True に設定します。これにより、パーティションが存在する場合にのみ削除されます。
partition.drop() # drop() メソッドを呼び出して、パーティションが存在する場合に削除します。テーブルからのデータ読み取り
head() を使用して先頭 N 件のレコードを読み取り
head() メソッドは、テーブルから最大 10,000 件までの先頭レコードを取得します。
from odps import ODPS
t = o.get_table('dual')
for record in t.head(3):
# 各レコードを処理します。open_reader() を with 文とともに使用して読み取り
with t.open_reader(partition='pt=test') as reader:
count = reader.count
for record in reader[5:10] # 全レコードを読み取るまでこの文を複数回実行できます。読み取るレコード数は count で指定されます。並列処理用のコードに変更することもできます。
# 1 件のレコードを処理します。open_reader() を with 文なしで使用して読み取り
reader = t.open_reader(partition='pt=test')
count = reader.count
for record in reader[5:10] # 全レコードを読み取るまでこの文を複数回実行できます。読み取るレコード数は count で指定されます。並列処理用のコードに変更することもできます。
# 1 件のレコードを処理します。pandas DataFrame への直接読み取り
with t.open_reader(partition='pt=test') as reader:
pd_df = reader.to_pandas()テーブルへのデータ書き込み
open_writer() を with 文とともに使用して書き込み
with t.open_writer(partition='pt=test') as writer:
records = [[111, 'aaa', True], # リストを使用できます。
[222, 'bbb', False],
[333, 'ccc', True],
[444, 'Chinese', False]]
writer.write(records) # レコードはイテラブルオブジェクトにすることができます。
records = [t.new_record([111, 'aaa', True]), # レコードオブジェクトを使用できます。
t.new_record([222, 'bbb', False]),
t.new_record([333, 'ccc', True]),
t.new_record([444, 'Chinese', False])]
writer.write(records)Record オブジェクト(t.new_record() で作成)または単純なリストのいずれかを writer.write() に渡すことができます。
書き込み時にパーティションを自動作成
パーティションが存在しない場合に自動的に作成するには、create_partition=True を設定します。
with t.open_writer(partition='pt=test', create_partition=True) as writer:
records = [[111, 'aaa', True], # リストを使用できます。
[222, 'bbb', False],
[333, 'ccc', True],
[444, 'Chinese', False]]
writer.write(records) # レコードはイテラブルなオブジェクトでなければなりません。write_table() を使用して書き込み
MaxCompute エントリーオブジェクトの write_table() メソッドは、データ書き込みのためのよりシンプルなインターフェイスを提供します。
records = [[111, 'aaa', True], # リストを使用できます。
[222, 'bbb', False],
[333, 'ccc', True],
[444, 'Chinese', False]]
o.write_table('test_table', records, partition='pt=test', create_partition=True)Apache Arrow 形式でのデータの読み取りおよび書き込み
Apache Arrow は、異なるプラットフォーム間でのデータ交換に使用されるクロスプラットフォーム対応のフォーマットです。MaxCompute では、2021 年からテーブルデータの Arrow 形式での読み取りがサポートされています。PyODPS 0.11.2 以降のバージョンでこの機能が利用可能です。
Python 環境に pyarrow をインストールした後、arrow=True を指定して open_reader() または open_writer() を呼び出すことで、Arrow RecordBatch オブジェクトの読み取りおよび書き込みが可能になります。
import pandas as pd
import pyarrow as pa
with t.open_writer(partition='pt=test', create_partition=True, arrow=True) as writer:
records = [[111, 'aaa', True],
[222, 'bbb', False],
[333, 'ccc', True],
[444, 'Chinese', False]]
df = pd.DataFrame(records, columns=["int_val", "str_val", "bool_val"])
# RecordBatch を書き込みます。
batch = pa.RecordBatch.from_pandas(df)
writer.write(batch)
# pandas DataFrame を直接使用することもできます。
writer.write(df)