All Products
Search
Document Center

MaxCompute:Tables

Last Updated:Jun 02, 2026

PyODPS lets you create, read, write, and delete MaxCompute tables programmatically. You can also manage schemas, partitions, and bulk data transfers through MaxCompute Tunnel.

Prerequisites

Before you begin, ensure that you have:

  • A MaxCompute project

  • PyODPS installed and configured

  • An AccessKey ID and AccessKey secret

  • An initialized MaxCompute entry object (o)

Quick start

Create a table, write data, and read it back:

from odps.models import Schema

# Define columns
schema = Schema.from_lists(['id', 'name'], ['bigint', 'string'])

# Create the table
table = o.create_table('my_table', schema, if_not_exists=True)

# Write records
o.write_table('my_table', [[1, 'Alice'], [2, 'Bob']])

# Read records
for record in o.read_table('my_table'):
    print(record)

API method summary

Operation

Method

Key parameters

Description

List tables

o.list_tables()

prefix, type, extended

List all tables in a project

Check existence

o.exist_table()

table_name

Check whether a table exists

Get table

o.get_table()

table_name, project

Get a table object

Create table

o.create_table()

schema, if_not_exists, lifecycle

Create a table

Write data

o.write_table()

partition, create_partition

Append records to a table

Read data

o.read_table()

partition

Read records from a table

Delete table

o.delete_table()

if_exists

Delete a table

Convert to DataFrame

table.to_df()

Convert a table to a PyODPS DataFrame

Sync metadata

table.reload()

Refresh the local table object from the server

For the full PyODPS method reference, see Method descriptions.

List tables

List all tables in a project:

for table in o.list_tables():
    print(table)

Filter by name prefix:

for table in o.list_tables(prefix="table_prefix"):
    print(table.name)

By default, list_tables() returns only table names. Accessing properties such as table_schema or creation_time triggers additional requests and increases latency. In PyODPS 0.11.5 and later, pass extended=True to retrieve these properties in a single call:

for table in o.list_tables(extended=True):
    print(table.name, table.creation_time)

Filter by table type:

# Valid types: managed_table, external_table, virtual_view, materialized_view
managed_tables = list(o.list_tables(type="managed_table"))
external_tables = list(o.list_tables(type="external_table"))
virtual_views = list(o.list_tables(type="virtual_view"))
materialized_views = list(o.list_tables(type="materialized_view"))

Check whether a table exists

print(o.exist_table('pyodps_iris'))
# Returns True if the table exists

Get table information

Get a table object with get_table():

t = o.get_table('pyodps_iris')

Print the table schema:

print(t.schema)

Output:

odps.Schema {
  sepallength           double      # Sepal length (cm)
  sepalwidth            double      # Sepal width (cm)
  petallength           double      # Petal length (cm)
  petalwidth            double      # Petal width (cm)
  name                  string      # Type
}

Access column details:

# All columns
print(t.schema.columns)

# A specific column
print(t.schema['sepallength'])

# Column comment
print(t.schema['sepallength'].comment)

Access table properties:

print(t.lifecycle)        # Table lifecycle
print(t.creation_time)    # Creation time
print(t.is_virtual_view)  # Whether the table is a view
print(t.size)             # Table size in bytes
print(t.comment)          # Table comment

Access tables across projects

Pass the project parameter to get a table from another project:

t = o.get_table('table_name', project='other_project')

Create a table schema

Two methods are available for creating schemas.

Method 1: Column and Partition objects

Use Column and Partition objects from odps.models when you need full control over column definitions, including comments:

from odps.models import Schema, Column, Partition

columns = [
    Column(name='num', type='bigint', comment='the column'),
    Column(name='num2', type='double', comment='the column2'),
]
partitions = [Partition(name='pt', type='string', comment='the partition')]
schema = Schema(columns=columns, partitions=partitions)

Access schema properties:

# All columns including partition columns
print(schema.columns)

# Partition columns only
print(schema.partitions)

# Non-partition column names
print(schema.names)

# Non-partition column types
print(schema.types)

Method 2: Schema.from_lists()

Schema.from_lists() is simpler but does not support column comments:

from odps.models import Schema

schema = Schema.from_lists(
    ['num', 'num2'],           # Column names
    ['bigint', 'double'],      # Column types
    ['pt'],                    # Partition names
    ['string']                 # Partition types
)
print(schema.columns)

Create a table

From a schema object

from odps.models import Schema

schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

# Create a table
table = o.create_table('my_new_table', schema)

# Skip creation if the table already exists
table = o.create_table('my_new_table', schema, if_not_exists=True)

# Set the lifecycle (days before auto-deletion)
table = o.create_table('my_new_table', schema, lifecycle=7)

From column definitions as strings

# Partitioned table (columns, partition columns)
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

# Non-partitioned table
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)

Enable extended data types

By default, only these data types are supported: BIGINT, DOUBLE, DECIMAL, STRING, DATETIME, BOOLEAN, MAP, and ARRAY.

To use extended types such as TINYINT and STRUCT, enable the MaxCompute V2.0 data type extension:

from odps import options

options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')

Synchronize table updates

When another program modifies a table, call reload() to refresh the local object with the latest metadata from the server:

from odps.models import Schema

schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
table = o.create_table('my_new_table', schema)

# Fetch the latest table metadata from the server
table.reload()

Write data to a table

write_table()

Use write_table() for simple, one-shot writes where you have all records ready. It appends to the table and handles partitions in a single call.

records = [
    [111, 1.0],
    [222, 2.0],
    [333, 3.0],
    [444, 4.0]
]

# Write to a partition; create the partition if it does not exist
o.write_table('my_new_table', records, partition='pt=test', create_partition=True)
Important

Each call to write_table() creates a file on the server. This operation is time-consuming, and too many small files degrade query performance. Write multiple records per call, or pass a generator object.

write_table() always appends data. To replace existing data:

  • Non-partitioned tables: Call table.truncate().

  • Partitioned tables: Delete and recreate the partition.

open_writer()

Use open_writer() for streaming writes or to write records incrementally within a managed session.

t = o.get_table('my_new_table')

with t.open_writer(partition='pt=test02', create_partition=True) as writer:
    records = [
        [1, 1.0],
        [2, 2.0],
        [3, 3.0],
        [4, 4.0]
    ]
    writer.write(records)  # Accepts any iterable

Write to multi-level partitions:

t = o.get_table('test_table')

with t.open_writer(partition='pt1=test1,pt2=test2') as writer:
    records = [
        t.new_record([111, 'aaa', True]),
        t.new_record([222, 'bbb', False]),
        t.new_record([333, 'ccc', True]),
        t.new_record([444, 'Chinese', False])
    ]
    writer.write(records)

Multi-process parallel writing

Multiple processes can write to the same table concurrently by sharing a session ID and writing to separate blocks. Each block maps to a file on the server. The main process commits after all workers finish.

import random
from multiprocessing import Pool
from odps.tunnel import TableTunnel

def write_records(tunnel, table, session_id, block_id):
    # Reuse the existing session
    local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
    # Write to this process's block
    with local_session.open_record_writer(block_id) as writer:
        for i in range(5):
            record = table.new_record([random.randint(1, 100), random.random()])
            writer.write(record)

if __name__ == '__main__':
    N_WORKERS = 3

    table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
    tunnel = TableTunnel(o)
    upload_session = tunnel.create_upload_session(table.name)

    # Share the session ID across processes
    session_id = upload_session.id

    pool = Pool(processes=N_WORKERS)
    futures = []
    block_ids = []
    for i in range(N_WORKERS):
        futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i)))
        block_ids.append(i)
    [f.get() for f in futures]

    # Commit all blocks
    upload_session.commit(block_ids)

Read data from a table

read_table()

Use read_table() to iterate over all records in a table or partition:

for record in o.read_table('my_new_table', partition='pt=test'):
    print(record)

head()

Preview up to 10,000 records without a full table scan:

t = o.get_table('my_new_table')

for record in t.head(3):
    print(record)

open_reader()

Use open_reader() when you need sliced access or a record count before reading. It exposes a count attribute and supports index slicing:

t = o.get_table('my_new_table')

with t.open_reader(partition='pt=test') as reader:
    count = reader.count
    for record in reader[5:10]:  # Read a slice of records
        print(record)

Without a with block:

reader = t.open_reader(partition='pt=test')
count = reader.count
for record in reader[5:10]:
    print(record)

Delete a table

# Delete only if the table exists
o.delete_table('my_table_name', if_exists=True)

# Or call drop() on a table object
t.drop()

Convert a table to a DataFrame

to_df() converts a table to a PyODPS DataFrame. For details, see DataFrame (not recommended).

table = o.get_table('my_table_name')
df = table.to_df()

Manage partitions

Check whether a table is partitioned

table = o.get_table('my_new_table')
if table.schema.partitions:
    print('Table %s is partitioned.' % table.name)

Iterate over partitions

table = o.get_table('my_new_table')

# All partitions
for partition in table.partitions:
    print(partition.name)

# Sub-partitions under pt=test
for partition in table.iterate_partitions(spec='pt=test'):
    print(partition.name)

# Partitions matching a condition (PyODPS 0.11.3 and later)
for partition in table.iterate_partitions(spec='dt>20230119'):
    print(partition.name)

Starting from PyODPS 0.11.3, iterate_partitions() accepts logical expressions such as dt>20230119.

Check whether a partition exists

table = o.get_table('my_new_table')
table.exist_partition('pt=test,sub=2015')

Get partition information

table = o.get_table('my_new_table')
partition = table.get_partition('pt=test')
print(partition.creation_time)
print(partition.size)

Create a partition

t = o.get_table('my_new_table')
t.create_partition('pt=test', if_not_exists=True)

Delete a partition

t = o.get_table('my_new_table')
t.delete_partition('pt=test', if_exists=True)

# Or call drop() on a partition object
partition.drop()

Records and data type mappings

A record represents a single row in a MaxCompute table. All four I/O methods — open_reader(), open_writer(), open_record_reader(), and open_record_writer() — use records.

Create a record by calling new_record() on a table object.

Given this table schema:

odps.Schema {
  c_int_a                 bigint
  c_string_a              string
  c_bool_a                boolean
  c_datetime_a            datetime
  c_array_a               array<string>
  c_map_a                 map<bigint,string>
  c_struct_a              struct<a:bigint,b:string>
}

Create and manipulate records:

import datetime

t = o.get_table('mytable')  # o is the MaxCompute entry object

# Create a record with initial values
# The number of values must match the number of fields in the schema
r = t.new_record([1024, 'val1', False, datetime.datetime.now(), None, None])

# Create an empty record
r2 = t.new_record()

# Set values by index
r2[0] = 1024

# Set values by field name
r2['c_string_a'] = 'val1'

# Set values by attribute
r2.c_string_a = 'val1'

# Set ARRAY value
r2.c_array_a = ['val1', 'val2']

# Set MAP value
r2.c_map_a = {1: 'val1'}

# Set STRUCT value (PyODPS 0.11.5 and later)
r2.c_struct_a = (1, 'val1')             # tuple
r2.c_struct_a = {"a": 1, "b": 'val1'}   # dict

# Get values
print(r[0])                          # By index
print(r['c_string_a'])               # By field name
print(r.c_string_a)                  # By attribute
print(r[0: 3])                       # Slice
print(r[0, 2, 3])                    # Multiple indices
print(r['c_int_a', 'c_double_a'])    # Multiple field names

Data type mappings

MaxCompute type

Python type

TINYINT, SMALLINT, INT, BIGINT

int

FLOAT, DOUBLE

float

STRING

str

BINARY

bytes

DATETIME

datetime.datetime

DATE

datetime.date

BOOLEAN

bool

DECIMAL

decimal.Decimal

MAP

dict

ARRAY

list

STRUCT

tuple / namedtuple

TIMESTAMP

pandas.Timestamp

TIMESTAMP_NTZ

pandas.Timestamp

INTERVAL_DAY_TIME

pandas.Timedelta

STRING handling

By default, STRING maps to Unicode strings (str in Python 3, unicode in Python 2). To store binary data in a STRING column, set options.tunnel.string_as_binary = True.

Time zone behavior

PyODPS uses the local time zone by default. MaxCompute does not store time zone values — it converts datetime values to UNIX timestamps for storage.

To use UTC:

options.local_timezone = False

To use a specific time zone:

options.local_timezone = 'Asia/Shanghai'

DECIMAL in Python 2

When the cdecimal package is installed, PyODPS uses cdecimal.Decimal instead of decimal.Decimal in Python 2.

STRUCT type behavior

Before PyODPS 0.11.5, STRUCT maps to dict. Starting from PyODPS 0.11.5, STRUCT maps to namedtuple by default.

To restore the previous dict behavior:

options.struct_as_dict = True
In DataWorks environments, struct_as_dict defaults to False for historical compatibility. PyODPS 0.11.5 and later accept both dict and tuple for STRUCT values. Earlier versions accept only dict.

MaxCompute Tunnel

MaxCompute Tunnel is the low-level data channel for bulk uploads and downloads. For most use cases, write_table() and read_table() are simpler. Use Tunnel when you need multi-process writes or fine-grained session control.

PyODPS does not support uploading data through external tables (for example, tables backed by OSS or Tablestore).

If CPython is installed, PyODPS compiles C code during installation to accelerate Tunnel-based transfers.

Upload data

from odps.tunnel import TableTunnel

table = o.get_table('my_table')

tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')

with upload_session.open_record_writer(0) as writer:
    record = table.new_record()
    record[0] = 'test1'
    record[1] = 'id1'
    writer.write(record)

    record = table.new_record(['test2', 'id2'])
    writer.write(record)

# Commit outside the with block. Committing before all data is written causes an error.
upload_session.commit([0])

Download data

from odps.tunnel import TableTunnel

tunnel = TableTunnel(o)
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')

with download_session.open_record_reader(0, download_session.count) as reader:
    for record in reader:
        print(record)