すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:はじめに

最終更新日:Nov 09, 2025

MaxFrame は、データ処理のための Pandas と互換性のある API を提供します。これらの API には、フィルタリング、射影、連結、集約のための基本的な API と、ユーザー定義関数 (UDF) を呼び出すための transform や apply などの高度な API が含まれます。高度な API は、特定のビジネスロジックやデータ操作を実装するために使用でき、標準のオペレーターが使用できない複雑なシナリオに適しています。MaxFrame は、ビッグデータ処理の要件を満たすための特定の API を提供します。たとえば、read_odps_table API を使用して MaxCompute テーブルからデータを読み取り、to_odps_table API を使用して MaxCompute テーブルにデータを書き込み、execute API を使用して遅延コンピューティングを実行できます。これらの API を使用して、オンプレミスの計算リソースに制限されることなく、ビッグデータ環境のデータを効率的に分析できます。

データ準備

このトピックでは、MaxCompute のパブリックデータセットの maxframe_ml_100k_users テーブルを使用して、MaxFrame の使用方法を説明します。テストデータは、パブリック MaxCompute プロジェクト BIGDATA_PUBLIC_DATASET の data_science スキーマに保存されています。このデータは直接使用できます。

セッションの初期化

MaxFrame ジョブを実行する前に、MaxFrame セッションを初期化する必要があります。コードのエントリーポイントで、new_session API 操作を呼び出してジョブを初期化できます。その後のすべてのデータ処理は、作成されたセッションオブジェクトを介して実行され、バックエンドサービスと対話します。次のコードに例を示します。

import os
from maxframe import new_session
from odps import ODPS

# MaxFrame アカウントを使用して MaxCompute を初期化します。
o = ODPS(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)

# MaxFrame セッションを初期化します。
new_session(odps_entry=o)

new_session API 操作の詳細については、「new_session」をご参照ください。

DataFrame オブジェクトの作成

read_odps_table および read_odps_query API 操作を使用すると、MaxCompute テーブルから DataFrame オブジェクトを作成できます。これらの DataFrame オブジェクトは、Pandas スタイルのデータ操作をサポートします。MaxFrame は、ローカルデータを使用した DataFrame オブジェクトの初期化もサポートしています。この機能は、コードを迅速にテストおよび開発するのに役立ちます。

  • MaxCompute テーブルの使用

    read_odps_table API 操作を使用して、次の操作を実行できます。

    • read_odps_table 関数を使用して MaxCompute テーブルからデータを読み取り、それを MaxFrame DataFrame オブジェクトに変換できます。

      import maxframe.dataframe as md
      
      # MaxCompute テーブルからデータを読み取り、DataFrame オブジェクトを作成します。
      df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users')
    • index_col パラメーターを設定して、テーブル内の列を DataFrame オブジェクトのインデックスとして指定します。

      import maxframe.dataframe as md
      
      # データベースの user_id 列を DataFrame オブジェクトのインデックスとして使用します。
      df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', index_col="user_id")
    • columns パラメーターを設定して、テーブル内の列を指定して DataFrame オブジェクトを作成します。

      import maxframe.dataframe as md
      
      # テーブル内の列を選択して DataFrame オブジェクトを作成します。
      df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', columns=['user_id', 'age', 'sex'])

      read_odps_table API の詳細については、「read_odps_table」をご参照ください。

  • MaxCompute SQL のクエリ結果の使用

    テーブルから DataFrame を作成することに加えて、MaxFrame では read_odps_query API 操作で SQL クエリを実行し、その結果を DataFrame のデータ入力として使用できます。

    import maxframe.dataframe as md
    
    df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`')

    index_col パラメーターを使用して、DataFrame オブジェクトのインデックスを指定します。

    import maxframe.dataframe as md
    
    df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`', index_col='user_id')

    read_odps_query API の詳細については、「read_odps_query」をご参照ください。

  • オンプレミスデータの使用

    MaxFrame は Pandas と同様の使用体験を提供します。オンプレミスデータに基づいて MaxFrame DataFrame オブジェクトを直接作成できます。サンプルコード:

    import maxframe.dataframe as md
    
    d = {'col1': [1, 2], 'col2': [3, 4]}
    df = md.DataFrame(data=d)
    print(df.execute().fetch())
    # 返された結果を取得します。
       col1  col2
    0     1     3
    1     2     4
    
    df = md.DataFrame(data=d, dtype=np.int8)
    print(df.execute().fetch())
    # 返された結果を取得します。
       col1  col2
    0     1     3
    1     2     4
    
    d = {'col1': [0, 1, 2, 3], 'col2': pd.Series([2, 3], index=[2, 3])}
    df = md.DataFrame(data=d, index=[0, 1, 2, 3])
    print(df.execute().fetch())
    # 返された結果を取得します。
       col1  col2
    0     0   NaN
    1     1   NaN
    2     2   2.0
    3     3   3.0
    
    data = np.array([(1, 2, 3), (4, 5, 6), (7, 8, 9)],
                    dtype=[("a", "i4"), ("b", "i4"), ("c", "i4")])
    df = md.DataFrame(data, columns=['c', 'a'])
    df.execute().fetch()
    # 返された結果を取得します。
       c  a
    0  3  1
    1  6  4
    2  9  7

    詳細については、「DataFrame」をご参照ください。

データ処理

MaxFrame は、Pandas と互換性のある一連の API を提供します。これらの API を使用して、データ計算、射影、フィルタリング、ソートなどのさまざまな操作を実行できます。MaxFrame は、一般的なデータ処理のための幅広いオペレーターもサポートしています。MaxFrame は、データ処理と分析の要件を満たすために UDF もサポートしています。

算術演算

加算、減算、乗算、除算などのさまざまな算術演算を DataFrame オブジェクトで直接実行できます。次の例は、MaxFrame を使用して基本的な算術演算を実行する方法を示しています。

  • 例 1: 単純なデータ加算を実行します。

    import maxframe.dataframe as md
    
    df = md.DataFrame({'angles': [0, 3, 4],
                       'degrees': [360, 180, 360]},
                      index=['circle', 'triangle', 'rectangle'])
    print(df.execute().fetch())
    # 返された結果を取得します。
               angles  degrees
    circle          0      360
    triangle        3      180
    rectangle       4      360
    
    df = df + 1
    print(df.execute().fetch())
    # 返された結果を取得します。
               angles  degrees
    circle          1      361
    triangle        4      181
    rectangle       5      361
  • 例 2: DataFrame オブジェクト間で乗算を実行します。

    import maxframe.dataframe as md
    
    df = md.DataFrame({'angles': [0, 3, 4],
                       'degrees': [360, 180, 360]},
                      index=['circle', 'triangle', 'rectangle'])
    
    other = md.DataFrame({'angles': [0, 3, 4]},
                         index=['circle', 'triangle', 'rectangle'])
    
    print(df.mul(other, fill_value=0).execute())
    
    # 返された結果を取得します。
               angles  degrees
    circle          0      0.0
    triangle        9      0.0
    rectangle      16      0.0

算術演算に関連する API の詳細については、「二項演算子関数」および「計算/記述統計」をご参照ください。

フィルタリング、射影、サンプリング

フィルタリング

フィルタリングを使用すると、特定の基準に基づいて DataFrame オブジェクトからデータを選択または除外できます。この操作は、大規模なデータセットの処理と分析に不可欠であり、最も関連性の高い情報に集中するのに役立ちます。

  • 例 1: データの最初の数行を表示します。

    import maxframe.dataframe as md
    
    df = md.DataFrame({'animal': ['alligator', 'bee', 'falcon', 'lion',
                       'monkey', 'parrot', 'shark', 'whale', 'zebra']})
    print(df.head().execute().fetch())
    
    # 返された結果を取得します。
          animal 
    0  alligator 
    1        bee 
    2     falcon 
    3       lion 
    4     monkey 
  • 例 2: 指定した列を削除します。

    import maxframe.dataframe as md                                    
    
    df = md.DataFrame(np.arange(12).reshape(3, 4), columns=['A', 'B', 'C', 'D'])
    print(df.drop(['B', 'C'], axis=1).execute().fetch())
    
    # 返された結果を取得します。
       A   D
    0  0   3
    1  4   7
    2  8  11

射影

射影を使用すると、表示する列を調整したり、列の順序を再配置したりするなど、DataFrame オブジェクトの構造を再形成できます。射影操作を実行して、データの簡略化されたビューを作成したり、特定の分析要件を満たすようにデータの表示方法を調整したりできます。

例: 列名を変更します。

import maxframe.dataframe as md
df = md.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
print(df.set_axis(['I', 'II'], axis='columns').execute().fetch())

# 返された結果を取得します。
   I  II 
0  1   4 
1  2   5 
2  3   6 

サンプリング

サンプリングは、DataFrame オブジェクトからランダムなサンプルを選択するプロセスです。サンプリングは、大規模なデータセットを処理し、データの統計的特性を推定するために不可欠です。

例: ランダムサンプリングを実行します。

import maxframe.dataframe as md

df = md.DataFrame({'num_legs': [2, 4, 8, 0],
                        'num_wings': [2, 0, 0, 0],
                        'num_specimen_seen': [10, 2, 1, 8]},
                       index=['falcon', 'dog', 'spider', 'fish'])
print(df['num_legs'].sample(n=3, random_state=1).execute())

# 返された結果を取得します。
falcon    2
fish      0
dog       4
Name: num_legs, dtype: int64

フィルタリング、射影、サンプリング操作の詳細については、「インデックス再作成/選択/ラベル操作」をご参照ください。

ソート

ソートを使用すると、1 つ以上の列の値に基づいて DataFrame オブジェクト内の行の順序を再配置できます。ほとんどの場合、ソートはデータ分析で使用されます。ソート操作を実行して、データのパターン、傾向、例外を簡単に観察できます。

例: 1 つ以上の列の値をソートします。

import maxframe.dataframe as md
import numpy as np
df = md.DataFrame({                            
    'col1': ['A', 'A', 'B', np.nan, 'D', 'C'], 
    'col2': [2, 1, 9, 8, 7, 4],                
    'col3': [0, 1, 9, 4, 2, 3],                
})            
res = df.sort_values(by=['col1']).execute()
print(res.fetch())

# 返された結果を取得します。
   col1  col2  col3
0     A     2     0
1     A     1     1
2     B     9     9
5     C     4     3
4     D     7     2
3  None     8     4


res =  df.sort_values(by=['col1', 'col2']).execute()
print(res.fetch())

# 返された結果を取得します。
     col1 col2 col3
 1   A    1    1   
 0   A    2    0   
 2   B    9    9   
 5   C    4    3   
 4   D    7    2   
 3   None  8    4   

ソート操作の詳細については、「再形成/ソート/転置」をご参照ください。

結合、マージ、連結

結合、マージ、連結は、基本的かつ強力なデータ処理操作です。これらの操作により、特定のパブリックフィールドまたはインデックスに基づいて、さまざまなデータセットを水平または垂直に組み合わせることができます。MaxFrame は、データセットを簡単に結合するのに役立つ、結合、マージ、連結に関連する API を提供します。

例: データを水平に結合します。

import maxframe.dataframe as md
df = md.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
                        'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})

other = md.DataFrame({'key': ['K0', 'K1', 'K2'],
                       'B': ['B0', 'B1', 'B2']})

print(df.join(other, lsuffix='_caller', rsuffix='_other').execute().fetch())

# 返された結果を取得します。
  key_caller   A key_other     B
0         K0  A0        K0    B0
1         K1  A1        K1    B1
2         K2  A2        K2    B2
3         K3  A3      None  None
4         K4  A4      None  None
5         K5  A5      None  None

結合、マージ、連結操作の詳細については、「結合/マージ/連結」をご参照ください。

集約と UDF

集約

集約は、値のグループを単一の値に変換するプロセスです。集約操作を実行して、データ分析中にデータの統計的特性を要約および発見できます。

例: 複数タイプの集約を実行します。

import maxframe.dataframe as md

df = md.DataFrame([[1, 2, 3],
                   [4, 5, 6],
                   [7, 8, 9],
                   [np.nan, np.nan, np.nan]],
                  columns=['A', 'B', 'C'])
print(df.agg(['sum', 'min']).execute().fetch())

# 返された結果を取得します。
        A     B     C
min   1.0   2.0   3.0
sum  12.0  15.0  18.0

df.agg({'A' : ['sum', 'min'], 'B' : ['min', 'max']}).execute().fetch()

# 返された結果を取得します。
        A    B
max   NaN  8.0
min   1.0  2.0
sum  12.0  NaN

UDF

標準のデータ処理オペレーターに加えて、MaxFrame は UDF をサポートし、DataFrame オブジェクトで UDF 処理を実行します。これにより、データ処理の柔軟性が向上し、カスタムロジックを使用してデータセットでより多くの操作を実行できます。

重要

UDF を実行する前に、`new_session` を呼び出す前に config.options.sql.settings パラメーターを使用して共通イメージを宣言する必要があります。

  • 例 1: transform メソッドを使用して UDF を呼び出します。

    import maxframe.dataframe as md
    
    from maxframe import config
    config.options.sql.settings = {
        "odps.session.image": "common",
        "odps.sql.type.system.odps2": "true"
    }
    session = new_session(o)
    
    df = md.DataFrame({'A': range(3), 'B': range(1, 4)})
    print(df.transform(lambda x: x + 1).execute().fetch())
    
    # 返された結果を取得します。
       A  B
    0  1  2
    1  2  3
    2  3  4
  • 例 2: apply メソッドを使用して UDF を呼び出します。

    UDF の実行前後で列数を変更したい場合は、apply メソッドを使用して UDF を呼び出すことができます。

    import maxframe.dataframe as md
    import numpy as np
    
    from maxframe import config
    config.options.sql.settings = {
        "odps.session.image": "common",
        "odps.sql.type.system.odps2": "true"
    }
    session = new_session(o)
    
    def simple(row):
        row['is_man'] = row['sex'] == "man"
        return row
    
    df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users')
    new_dtypes = df.dtypes.copy()
    new_dtypes["is_man"] = np.dtype(np.bool_)
    df.apply(
                simple,
                axis=1,
                result_type="expand",
                output_type="dataframe",
                dtypes=new_dtypes
            ).execute().fetch()
    
    # 返された結果を取得します。
         user_id  age sex     occupation zip_code  is_man
    0          1   24   M     technician    85711   False
    1          2   53   F          other    94043   False
    2          3   23   M         writer    32067   False
    3          4   24   M     technician    43537   False
    4          5   33   F          other    15213   False
    ..       ...  ...  ..            ...      ...     ...
    938      939   26   F        student    33319   False
    939      940   32   M  administrator    02215   False
    940      941   20   M        student    97229   False
    941      942   48   F      librarian    78209   False
    942      943   22   M        student    77841   False
    
    [943 rows x 6 columns]

集約と UDF 操作、および集計関数の選択方法の詳細については、「関数適用/GroupBy/ウィンドウ」をご参照ください。

結果の保存

データセットが変換された後、to_odps_table API 操作を使用して結果を MaxCompute テーブルに保存できます。

  • 例 1: 処理されたデータを MaxCompute テーブルに書き込みます。

    # 処理されたデータを MaxCompute テーブルに書き込みます。
    filtered_df.to_odps_table('<table_name>')
  • 例 2: ストレージ期間を指定します。

    lifecycle パラメーターを設定して、結果テーブルのライフサイクルを指定できます。

    # 処理されたデータを MaxCompute テーブルに書き込みます。
    filtered_df.to_odps_table('<table_name>', lifecycle = 7)

パラメーター

table_name: 宛先の MaxCompute テーブルの名前。テーブルが MaxCompute に存在しない場合、テーブルは自動的に作成されます。テーブルが既に存在する場合、データはデフォルトで追加されます。

overwrite パラメーターを使用して、既存のデータを上書きするかどうかを指定できます。ストレージ操作の詳細については、「to_odps_table」をご参照ください。

タスクの実行と実行結果の表示

execute() メソッドを使用してデータ処理タスクをトリガーし、fetch() メソッドを使用して結果データの一部を取得できます。

例: 実行結果を取得して表示します。

execute()fetch() メソッドを追加してデータ処理フローを完了し、結果を表示できます。Pandas と比較して、MaxFrame はその遅延評価モデルを使用して、大規模なデータセットを効率的に処理し、不要なデータ転送を削減します。

# 実行結果の部分的なデータを取得します。
data = filtered_df.execute().fetch()
print(data)

タスクの実行と結果の取得の詳細については、「execute」および「fetch」をご参照ください。