すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:SDK for Python: tablesの使用例

最終更新日:Jan 17, 2025

このトピックでは、SDK for Pythonを使用して、一般的なシナリオでテーブルに対して操作を実行する方法の例を示します。

すべてのテーブルの照会

エントリオブジェクトのlist_tables() メソッドを使用して、プロジェクト内のすべてのテーブルをクエリできます。

for table in odps.list_tables():
    # Query all tables in a project.

テーブルが存在するかどうかを確認する

エントリオブジェクトのexist_table() メソッドを使用して、テーブルが存在するかどうかを確認できます。 次に、get_table() メソッドを使用して、テーブルに関する情報を取得します。

t = odps.get_table('table_name')
t.schema
odps.Schema {
  c_int_a                 bigint
  c_int_b                 bigint
  c_double_a              double
  c_double_b              double
  c_string_a              string
  c_string_b              string
  c_bool_a                boolean
  c_bool_b                boolean
  c_datetime_a            datetime
  c_datetime_b            datetime
}
t.lifecycle
-1
print(t.creation_time)
2014-05-15 14:58:43
t.is_virtual_view
False
t.size
1408
t.schema.columns
[<column c_int_a, type bigint>,
 <column c_int_b, type bigint>,
 <column c_double_a, type double>,
 <column c_double_b, type double>,
 <column c_string_a, type string>,
 <column c_string_b, type string>,
 <column c_bool_a, type boolean>,
 <column c_bool_b, type boolean>,
 <column c_datetime_a, type datetime>,
 <column c_datetime_b, type datetime>]            

テーブルスキーマの作成

次のいずれかの方法を使用して、テーブルスキーマを作成できます。

  • テーブル列とオプションのパーティションに基づいてスキーマを作成します。

    from odps.models import Schema, Column, Partition
    columns = [
        Column(name='num', type='bigint', comment='the column'),
        Column(name='num2', type='double', comment='the column2'),
    ]
    partitions = [Partition(name='pt', type='string', comment='the partition')]
    schema = Schema(columns=columns, partitions=partitions)

    スキーマを作成した後、列とパーティションに関する情報を取得できます。

    • すべての列に関する情報を取得します。

      print(schema.columns)

      レスポンス例:

      [<column num, type bigint>,
       <column num2, type double>,
       <partition pt, type string>]
    • パーティションキー列に関する情報を取得します。

      print(schema.partitions)

      レスポンス例:

      [<partition pt, type string>]
    • 非パーティションキー列の名前を取得します。

      print(schema.names)

      レスポンス例:

      ['num', 'num2']
    • パーティションキー以外の列のデータ型を取得します。

      print(schema.types)

      レスポンス例:

      [bigint, double]
  • schema. from_lists() メソッドを呼び出してスキーマを作成します。 このメソッドは簡単に呼び出すことができますが、列とパーティションのコメントを直接設定することはできません。

    from odps.models import Schema
    schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
    print(schema.columns)

    レスポンス例:

    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]

テーブルの作成

o.create_table() メソッドを呼び出して、テーブルスキーマを使用するか、列の名前とデータ型を指定してテーブルを作成できます。 テーブルを作成するときは、テーブル内の列のデータ型が有効であることを確認する必要があります。

テーブルスキーマを使用してテーブルを作成する

テーブルスキーマを使用してテーブルを作成する場合、テーブルを作成する前にスキーマを作成する必要があります。

# Create a table schema.
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

# Create a table by using the schema that you created.
table = o.create_table('my_new_table', schema)

# Create a table only if no table with the same name exists. 
table = o.create_table('my_new_table', schema, if_not_exists=True)

# Configure the lifecycle of the table. 
table = o.create_table('my_new_table', schema, lifecycle=7)

print(o.exist_table('my_new_table ')) メソッドを呼び出して、テーブルが正常に作成されたかどうかを確認できます。 Trueが返された場合、テーブルは正常に作成されます。

テーブルに含める列の名前とデータ型を指定してテーブルを作成します。

# Create a partitioned table named my_new_table with specified common columns and partition key columns. 
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

# Create a non-partitioned table named my_new_table02. 
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)

print(o.exist_table('my_new_table ')) メソッドを呼び出して、テーブルが正常に作成されたかどうかを確認できます。 Trueが返された場合、テーブルは正常に作成されます。

テーブルに含める列の名前とデータ型を指定してテーブルを作成します。MaxCompute V2.0データ型エディションの新しいデータ型

既定では、テーブルを作成するときに、BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP、およびARRAYデータ型のみがサポートされます。 TINYINTやSTRUCTなどの他のデータ型を使用する必要がある場合は、options.sql.us e_odps2_extensionをTrueに設定する必要があります。 例:

from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')

テーブルの削除

delete_table() メソッドを呼び出して、既存のテーブルを削除できます。

o.delete_table('my_table_name', if_exists=True)  # Delete a table only if the table exists. 
t.drop() # Call the drop() method to drop a table if the table exists.

テーブルパーティションの管理

  • テーブルがパーティション分割されているかどうかを確認します。

    table = o.get_table('my_new_table')
    if table.schema.partitions:
        print('Table %s is partitioned.' % table.name)
  • テーブル内のすべてのパーティションを繰り返します。

    table = o.get_table('my_new_table')
    for partition in table.partitions:  # Iterate over all partitions.
        print(partition.name)  # An iteration step. In this step, the partition name is displayed.
    for partition in table.iterate_partitions(spec='pt=test'):  # Iterate over level-2 partitions in the partition named test.
        print(partition.name)  # An iteration step. In this step, the partition name is displayed.
    for partition in table.iterate_partitions(spec='dt>20230119'):  # Iterate over level-2 partitions in the partitions that meet the dt>20230119 condition.
        print(partition.name)  # An iteration step. In this step, the partition name is displayed.
    重要

    PyODPS 0.11.3以降では、上記の例のdt>20230119など、iterate_partitionsの論理式を指定できます。

  • パーティションが存在するかどうかを確認します。

    table = o.get_table('my_new_table')
    table.exist_partition('pt=test,sub=2015')
  • パーティションに関する情報を取得します。

    table = o.get_table('my_new_table')
    partition = table.get_partition('pt=test')
    print(partition.creation_time)
    partition.size
  • パーティションを作成します。

    t = o.get_table('my_new_table')
    t.create_partition('pt=test', if_not_exists=True)  # Create a partition only if no partition with the same name exists.

  • 既存のパーティションを削除します。

    t = o.get_table('my_new_table')
    t.delete_partition('pt=test', if_exists=True)  # Set the if_exists parameter to True. This ensures that a partition is deleted only if the partition exists. 
    partition.drop()  # Call the drop() method to drop a partition if the partition exists.

テーブルからのデータの読み取り

さまざまな方法でテーブルからデータを読み取ることができます。

  • headメソッドを使用して、各テーブルの最初の10,000以下のデータレコードを取得します。

    from odps import ODPS
    t = o.get_table('dual')
    for record in t.head(3):
        # Process each record.
  • WITH句を使用してステートメントを実行します。

    with t.open_reader(partition='pt=test') as reader:
    count = reader.count
    for record in reader[5:10]  # You can execute the statement multiple times until all records are read. The number of records is specified by count. You can change the code to parallel-operation code.
        # Process one record.
  • WITH句を使用せずにステートメントを実行します。

    reader = t.open_reader(partition='pt=test')
    count = reader.count
    for record in reader[5:10]  # You can execute the statement multiple times until all records are read. The number of records is specified by count. You can change the code to parallel-operation code.
        # Process one record.
  • データをPandas DataFramesに直接読み取ります。

    with t.open_reader(partition='pt=test') as reader:
    pd_df = reader.to_pandas()

テーブルへのデータの書き込み

open_readerと同様に、テーブルオブジェクトのopen_writerを使用して、writerを開き、テーブルにデータを書き込むことができます。

  • WITH句を使用してステートメントを実行します。

    with t.open_writer(partition='pt=test') as writer:
    	  records = [[111, 'aaa', True],                 # A list can be used.
    	             [222, 'bbb', False],
    	             [333, 'ccc', True],
    	             [444, 'Chinese', False]]
        writer.write(records)  # Records can be iterable objects.
    
    records = [t.new_record([111, 'aaa', True]),   # Record objects can be used.
               t.new_record([222, 'bbb', False]),
               t.new_record([333, 'ccc', True]),
               t.new_record([444, 'Chinese', False])]
    writer.write(records)
  • 指定されたパーティションが存在しない場合は、create_partitionパラメーターをTrueに設定してパーティションを作成します。 例:

    with t.open_writer(partition='pt=test', create_partition=True) as writer:
        records = [[111, 'aaa', True],                 # A list can be used.
                   [222, 'bbb', False],
                   [333, 'ccc', True],
                   [444, 'Chinese', False]]
        writer.write(records)  # Records can be iterable objects.
  • より簡単な方法は、MaxComputeオブジェクトのwrite_tableメソッドを使用してデータを書き込むことです。

    records = [[111, 'aaa', True],                 # A list can be used.
               [222, 'bbb', False],
               [333, 'ccc', True],
               [444, 'Chinese', False]]
    o.write_table('test_table', records, partition='pt=test', create_partition=True)
    説明
    • write_tableを呼び出すたびに、MaxComputeはサーバー上にファイルを生成します。 この操作には時間がかかる。 大量のファイルが生成されると、その後のクエリ操作の効率が低下します。 したがって、write_tableメソッドを使用する場合は、一度に複数のレコードを書き込むか、ジェネレータオブジェクトを提供することをお勧めします。

    • write_tableメソッドを使用してデータを書き込むと、新しいデータが既存のデータに追加されます。 PyODPSは、既存のデータを上書きするオプションを提供しません。 上書きするデータを手動で削除する必要があります。 パーティション分割されていないテーブルの場合は、table.truncate() を呼び出す必要があります。 パーティションテーブルの場合は、最初にパーティションを削除する必要があります。

矢印形式を使用してデータを読み書きする

Apache Arrowは、異なるプラットフォーム間のデータ交換をサポートする言語間の形式です。 2021以降、MaxComputeは矢印形式を使用したテーブルデータの読み取りをサポートしています。 PyODPS 0.11.2以降のバージョンはこの機能をサポートしています。 Python環境にpyarrowをインストールした後、open_writerを呼び出すときにarrow=True設定を追加できます。 この方法で、Arrow RecordBatchesを読み書きできます。

import pandas as pd
import pyarrow as pa
with t.open_writer(partition='pt=test', create_partition=True, arrow=True) as writer:
    records = [[111, 'aaa', True],
               [222, 'bbb', False],
               [333, 'ccc', True],
               [444, 'Chinese', False]]
    df = pd.DataFrame(records, columns=["int_val", "str_val", "bool_val"])
    # Write a RecordBatch.
    batch = pa.RecordBatch.from_pandas(df)
    writer.write(batch)
    # You can also use Pandas DataFrame directly.
    writer.write(df)