このページでは、PyODPS のインストールと使用時によくあるエラーと質問について説明します。
クイックリファレンス
| エラーまたは質問 | セクション |
|---|---|
| "Warning: XXX not installed" | インストールエラー |
| "Project Not Found" | インストールエラー |
| "Syntax Error" | インストールエラー |
| macOS での "Permission Denied" | インストールエラー |
| macOS での "Operation Not Permitted" | インストールエラー |
| "No Module Named ODPS" | インポートエラー |
| "Cannot Import Name ODPS" | インポートエラー |
| "Cannot Import Module odps" | インポートエラー |
| IPython または Jupyter Notebook での "ImportError" | インポートエラー |
o.get_table('table_name').size の意味 |
使用に関する質問 |
| Tunnel エンドポイントの設定 | 使用に関する質問 |
| サードパーティの CPython パッケージ | 使用に関する質問 |
| DataFrame のサイズ制限 | 使用に関する質問 |
max_pt DataFrame 内 |
使用に関する質問 |
open_writer() と write_table() |
使用に関する質問 |
| DataWorks ノードが返す行数が少ない | 使用に関する質問 |
| DataFrame の行数の取得 | 使用に関する質問 |
| "sourceIP is not in the white list" | 使用に関する質問 |
options.sql.settings が有効にならない |
使用に関する質問 |
| "IndexError: list index out of range" | 使用に関する質問 |
| "ODPSError: ODPS entrance should be provided" | 使用に関する質問 |
| "lifecycle is not specified in mandatory mode" | 使用に関する質問 |
| "Perhaps the datastream from server is crushed" | 使用に関する質問 |
| "Project is protected" | 使用に関する質問 |
| "ConnectionError: timed out try catch exception" | 使用に関する質問 |
| "NameError: name 'get_task_cost' is not defined" | 使用に関する質問 |
| ログで中国語の文字がエンコードされて表示される | 使用に関する質問 |
| DATETIME フィールドが STRING として返される | 使用に関する質問 |
| DataFrame で Python の機能を使用する | 使用に関する質問 |
| Pandas バックエンドを使用したローカルでのデバッグ | 使用に関する質問 |
| ネストされたループの実行が遅い | 使用に関する質問 |
| データをローカルにダウンロードしないようにする | 使用に関する質問 |
| データをローカルにダウンロードする場合 | 使用に関する質問 |
open_reader の 10,000 レコード制限 |
使用に関する質問 |
| ビルトイン演算子と UDF の比較 | 使用に関する質問 |
DataFrame().schema.partitions が空になる |
使用に関する質問 |
| DataFrame でのデカルト積 | 使用に関する質問 |
| Jieba を使用した中国語のテキスト分割 | 使用に関する質問 |
| テーブルからすべてのデータをダウンロードする | 使用に関する質問 |
| NULL 値の割合の計算 | 使用に関する質問 |
| 新しいデータ型を有効にする | 使用に関する質問 |
| "ValueError" | 使用に関する質問 |
| SQL クエリの実行が遅い | 使用に関する質問 |
インストールエラー
"Warning: XXX not installed"
pip を使用して、不足しているコンポーネントをインストールしてください。エラーメッセージの XXX の部分にコンポーネント名が示されています。
"Project Not Found"
次の 2 点を確認してください:
-
エンドポイント:エンドポイントは対象のプロジェクトを指している必要があります。詳細については、「エンドポイント」をご参照ください。
-
エントリオブジェクトのパラメーター:MaxCompute エントリオブジェクトのパラメーターが正しい位置にあることを確認してください。詳細については、「DataWorks からオンプレミス環境への PyODPS ノードの移行」をご参照ください。
"Syntax Error"
PyODPS は Python 2.5 以前をサポートしていません。Python 2.6、Python 2.7.6 以降、または Python 3.3 以降を使用してください。
macOS での "Permission Denied"
sudo を付けてインストールを実行してください:
sudo pip install pyodps
macOS での "Operation Not Permitted"
これはシステム整合性保護 (SIP) が原因です。無効にするには、次の手順を実行します:
-
お使いの Mac を再起動し、起動中に Command (⌘) + R を押し続けてリカバリーモードに入ります。
-
ターミナルを開き、以下を実行します:
csrutil disable reboot
詳細については、「Operation Not Permitted when on root - El Capitan (rootless disabled)」をご参照ください。
インポートエラー
"No Module Named ODPS"
これは、現在の作業ディレクトリに名前の競合がある場合に最もよく発生します。現在のディレクトリに odps.py または init.py という名前のファイル、あるいは odps という名前のフォルダが含まれていないか確認してください。もし存在する場合は、名前を変更してください。
その他の原因:
-
競合するパッケージ:以前に
odpsという名前のパッケージをインストールした場合は、sudo pip uninstall odpsで削除してください。 -
複数の Python バージョン:複数の Python バージョンがインストールされています。アクティブなバージョンが 1 つだけであることを確認してください。
-
PyODPS がインストールされていない:現在の Python バージョンでインストールしてください。詳細については、「PyODPS のインストール」をご参照ください。
"Cannot Import Name ODPS"
現在の作業ディレクトリに odps.py という名前のファイルが存在し、パッケージをシャドウイングしています。そのファイルの名前を変更するか移動してから、再度インポートを試みてください。
"Cannot Import Module odps"
これは通常、依存関係の問題です。PyODPS のテクニカルサポート DingTalk グループに参加し、グループ管理者に連絡して支援を求めてください。
IPython または Jupyter Notebook での "ImportError"
コードの先頭に from odps import errors を追加してください。エラーが解決しない場合は、IPython の依存関係が不足している可能性があります。Jupyter を再インストールしてください:
sudo pip install -U jupyter
使用に関する質問
o.get_table('table_name').size は何を返しますか?
size フィールドは、行数ではなく、テーブルの物理ストレージサイズを返します。
Tunnel エンドポイントを設定するにはどうすればよいですか?
options.tunnel.endpoint をエンドポイントの URL に設定します。利用可能なすべてのオプションについては、「aliyun-odps-python-sdk options reference」をご参照ください。
CPython を含むサードパーティパッケージを使用するにはどうすればよいですか?
CPython を含む WHL パッケージを生成します。例については、「MaxCompute で使用できる crcmod の作成」をご参照ください。
PyODPS DataFrame にサイズ制限はありますか?
PyODPS 自体にはテーブルサイズの制限はありません。ローカルの Pandas から作成された DataFrame の場合、制限は利用可能なローカルメモリです。
DataFrame で max_pt を使用するにはどうすればよいですか?
odps.df.func モジュールを使用して、MaxCompute のビルトイン関数を呼び出します:
from odps.df import func
df = o.get_table('your_table').to_df()
df[df.ds == func.max_pt('your_project.your_table')] # ds はパーティション列です。
open_writer() と write_table() の違いは何ですか?
write_table() を呼び出すたびに、サーバー上に新しいファイルが作成されます。小規模なデータセットで繰り返し呼び出すと、多くのファイルが生成され、クエリのパフォーマンスが低下し、メモリの問題を引き起こす可能性があります。
推奨:すべてのレコードを 1 回の呼び出しで渡すか、Generator オブジェクトを使用して、作成されるファイルの数を最小限に抑えます:
# 効率が悪い:呼び出しごとに 1 つのファイルが作成される
write_table(records_batch_1)
write_table(records_batch_2)
# 効率が良い:すべてのレコードを 1 回の呼び出しで渡す
write_table(all_records)
open_writer() はブロックに直接書き込むため、ストリーミング書き込みに適しています。
使用方法の詳細については、「テーブルへのデータの書き込み」をご参照ください。
なぜ DataWorks PyODPS ノードはローカル実行よりも少ない行数を返すのですか?
DataWorks はデフォルトで Instance Tunnel を有効にしていません。これがない場合、instance.open_reader は Result インターフェイスを使用し、これは 10,000 レコードに制限されています。
すべてのレコードを取得するには、Instance Tunnel を有効にして制限を解除します:
options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False # 10,000 レコードの上限を解除します。
with instance.open_reader() as reader:
for record in reader:
...
Instance Tunnel を有効にした後、reader.count を使用して合計レコード数を取得します。
DataFrame の行数を取得するにはどうすればよいですか?
DataFrame は遅延実行を使用します。つまり、明示的にトリガーするまで操作は実行されません。すぐにカウントを取得するには、次のようにします:
iris = DataFrame(o.get_table('pyodps_iris'))
iris.count().execute()
execute() なしで count() を呼び出すと、数値ではなく遅延式が返されます。遅延実行の詳細については、「実行」および「集計」をご参照ください。
"sourceIP is not in the white list"
MaxCompute プロジェクトで IP ホワイトリスト保護が有効になっています。プロジェクトオーナーに連絡して、ご利用の IP アドレスをホワイトリストに追加してもらってください。詳細については、「IP アドレスホワイトリストの管理」をご参照ください。
options.sql.settings が有効にならない
パラメーター名はクライアントと PyODPS で異なります。クライアントは odps.stage.mapper.split.size を使用しますが、PyODPS は odps.sql.mapper.split.size を使用します。これらは同じパラメーターではありません。
PyODPS で正しいパラメーター名を使用してください:
from odps import options
options.sql.settings = {'odps.stage.mapper.split.size': 32}
head() 呼び出し時の "IndexError: list index out of range"
DataFrame に行がないか、要求されたインデックスが利用可能な行数を超えています。head() を呼び出す前に DataFrame が空でないことを確認してください。
Pandas DataFrame のアップロード時の "ODPSError: ODPS entrance should be provided"
PyODPS がグローバルな MaxCompute エントリオブジェクトを見つけられません。次のいずれかの方法で修正してください:
-
%enterマジックコマンド (Room メカニズム) を使用して、グローバルエントリを自動的に設定します。 -
MaxCompute エントリオブジェクトで
to_global()を呼び出します。 -
エントリオブジェクトを明示的に渡します:
DataFrame(pd_df).persist('your_table', odps=odps)。
"lifecycle is not specified in mandatory mode"
このプロジェクトでは、すべてのテーブルにライフサイクルの値を指定する必要があります。書き込む前に設定してください:
from odps import options
options.lifecycle = 7 # 日数。整数である必要があります。
"Perhaps the datastream from server is crushed"
これはダーティデータを示しています。データの列数がターゲットテーブルのスキーマと一致していることを確認してください。
"Project is protected"
プロジェクトのセキュリティポリシーにより、直接のデータ読み取りが制限されています。
-
すべてのデータにアクセスする場合:プロジェクトオーナーに例外ルールを追加してもらうか、DataWorks または別のマスキングツールを使用して、保護されていないプロジェクトにデータをエクスポートしてください。
-
データをプレビューする場合:
o.execute_sql('select * from <table_name>').open_reader()またはo.get_table('<table_name>').to_df()を使用してください。
"ConnectionError: timed out try catch exception"
デフォルトの接続タイムアウトは 5 秒です。これは、ネットワークやサーバーのレイテンシーが高い場合に断続的な障害を引き起こす最も一般的な原因です。スクリプトの先頭でタイムアウトを増やしてください:
from odps import options
options.connect_timeout = 30
特定のマシンでエラーが発生する場合、サンドボックスのネットワーク制限がアクセスをブロックしている可能性があります。それらのタスクを実行するために専用のリソースグループを使用してください。
"NameError: name 'get_task_cost' is not defined"
関数名 get_sql_task_cost は無効です。代わりに execute_sql_cost を使用してください。
ログで中国語の文字がエンコードされた文字列として表示される
これは Python 2 にのみ影響します。中国語の文字を含む文字列を出力する際は、% フォーマット演算子を使用してください:
print("我叫 %s" % ('abc'))
open_reader を使用すると DATETIME フィールドが STRING として返される
options.tunnel.use_instance_tunnel = False の場合、PyODPS はレガシーな Result インターフェイスを呼び出します。これはデータを CSV 形式で返すため、DATETIME 値は文字列として返されます。
正しく型付けされたデータを取得するには、Instance Tunnel を有効にします:
options.tunnel.use_instance_tunnel = True
PyODPS DataFrame で Python の言語機能を使用するにはどうすればよいですか?
PyODPS DataFrame は、標準の Python 関数および制御フローと互換性があります。
関数の定義と再利用:
def euclidean_distance(from_x, from_y, to_x, to_y):
return ((from_x - to_x) ** 2 + (from_y - to_y) ** 2).sqrt()
def manhattan_distance(from_x, from_y, to_x, to_y):
return (from_x - to_x).abs() + (from_y - to_y).abs()
# DataFrame に適用
euclidean_distance(df.from_x, df.from_y, df.to_x, df.to_y).rename('distance')
ループと `reduce` を使用したテーブルの結合:
30 テーブルの UNION ALL SQL ステートメントを書く代わりに、Python の reduce を使用します:
table_names = ['table1', ..., 'tableN']
dfs = [o.get_table(tn).to_df() for tn in table_names]
result = reduce(lambda x, y: x.union(y), dfs)
Pandas バックエンドを使用して PyODPS をローカルでデバッグするにはどうすればよいですか?
DEBUG フラグを使用して、他のコードを変更することなく、ローカルの Pandas 実行と MaxCompute 実行を切り替えます:
df = o.get_table('movielens_ratings').to_df()
DEBUG = True
if DEBUG:
df = df[:100].to_pandas(wrap=True)
# 以降のすべてのコードは変更されません。DEBUG=True の場合、ローカルで実行されます。
DEBUG = False に設定すると、MaxCompute 上で完全なジョブが実行されます。より高度なローカルデバッグ体験には、MaxCompute Studio を使用してください。
ネストされたループの実行が遅い
最も一般的な原因は、外側のループ内に df = XXX を配置することです。これにより、反復ごとに新しい DataFrame オブジェクトが作成され、コストが高くなります。代わりに、ループ内で結果を dict に収集し、ループが完了した後に一度だけ DataFrame を構築します。
データをローカルにダウンロードしないようにするにはどうすればよいですか?
詳細については、「PyODPS ノードを使用してデータをローカルディレクトリにダウンロードして処理するか、オンラインでデータを処理する」をご参照ください。
データをローカルにダウンロードするのが適切なのはいつですか?
データ量が少ない場合は、ローカル処理のためにデータをダウンロードします。
大規模な操作、特に 1 行を複数行に展開したり、Python 関数を行ごとに適用したりする場合は、PyODPS DataFrame を使用して計算を MaxCompute 上で実行し続けます。たとえば、JSON 文字列をキーと値の行に展開するには、次のようにします:
from odps.df import output
@output(['k', 'v'], ['string', 'int'])
def h(row):
import json
for k, v in json.loads(row.json).items():
yield k, v
df.apply(h, axis=1)
open_reader が 10,000 レコードしか返さない場合、どうすればもっと取得できますか?
SQL の結果をテーブルとして保存し、そのテーブルから読み取ります:
o.execute_sql('create table result_table as select * from your_table')
o.get_table('result_table').open_reader()
なぜ UDF ではなくビルトイン演算子を使用するのですか?
ビルトイン演算子は、ユーザー定義関数 (UDF) よりも大幅に高速に実行されます。数百万行を処理するジョブの場合、UDF は実行時間を 7 秒から 27 秒に増加させる可能性があります。より大きなデータセットの場合、その差はさらに大きくなります。
パーティションテーブルで DataFrame().schema.partitions が空になるのはなぜですか?
DataFrame はパーティション列を通常の列と同じように扱います。それらを区別しません。パーティション列でフィルタリングするには、直接クエリを実行します:
df = o.get_table('your_table').to_df()
print(df[df.ds == ''].execute())
パーティションとパーティションベースの読み取りの詳細については、「テーブル」をご参照ください。
PyODPS DataFrame でデカルト積を実行するにはどうすればよいですか?
詳細については、「PyODPS DataFrame でのデカルト積の処理」をご参照ください。
PyODPS ノードで Jieba を使用して中国語のテキストを分割するにはどうすればよいですか?
詳細については、「PyODPS ノードを使用して Jieba に基づいて中国語のテキストを分割する」をご参照ください。
テーブルからすべてのデータをダウンロードするにはどうすればよいですか?
デフォルトでは、PyODPS はインスタンスから読み取れるデータ量を制限しません。ただし、options.tunnel.limit_instance_tunnel を指定しない場合、制限が自動的に有効になり、ダウンロードできるレコード数はプロジェクトの構成に基づいて上限が設定されます。ほとんどの場合、一度に最大 10,000 レコードです。すべてのデータをダウンロードするには、Instance Tunnel を有効にして制限を無効にします:
options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False # レコードの上限を解除します。
with instance.open_reader() as reader:
for record in reader:
...
execute_sql または DataFrame を使用して NULL 値の割合を計算できますか?
どちらも機能しますが、この種の計算には一般的に DataFrame の集約操作の方が高速です。
PyODPS で新しいデータ型を有効にするにはどうすればよいですか?
単一のクエリの場合、execute_sql で設定をヒントとして渡します:
o.execute_sql(
'set odps.sql.type.system.odps2=true; select * from your_table',
hints={"odps.sql.submit.mode": "script"}
)
単一の DataFrame ジョブの場合 (persist、execute、または to_pandas)、その呼び出しにヒントを渡します:
from odps.df import DataFrame
users = DataFrame(o.get_table('odps2_test'))
users.persist('copy_test', hints={'odps.sql.type.system.odps2': 'true'})
セッション内のすべての DataFrame ジョブの場合、グローバルオプションを設定します:
options.sql.use_odps2_extension = True
PyODPS 使用時の "ValueError"
SDK を V0.8.4 以降にアップグレードしてください。アップグレードできない場合は、スクリプトに以下を追加してください:
from odps.types import Decimal
Decimal._max_precision = 38
PyODPS を介した SQL クエリの実行が遅い
SQL の実行が遅いのは、通常 PyODPS 自体が原因ではありません。次の手順で問題をご確認ください:
1. ネットワークとサーバーのレイテンシーを確認する
プロキシサーバーまたはネットワークリンクが遅延を追加していないか、またサーバー側のタスクキューが滞っていないかを確認します。
2. タスクの送信とデータの読み取りを分離する
送信と読み取りを 1 回の呼び出しで組み合わせると、どこで遅延が発生しているかを判断するのが難しくなります。各フェーズを個別に測定するために、それらを分割します:
# 変更前:送信と読み取りを組み合わせる
with o.execute_sql('select * from your_table').open_reader() as reader:
for row in reader:
print(row)
# 変更後:別々のステップに分割する
inst = o.run_sql('select * from your_table')
inst.wait_for_success()
with inst.open_reader() as reader:
for row in reader:
print(row)
3. Logview がないか確認する (DataWorks のみ)
DataWorks を介して送信されたジョブの場合、SQL タスクが Logview リンクを生成していることを確認します。0.11.6 未満の PyODPS バージョンで execute_sql または run_sql で送信されたタスクは、Logview の生成に失敗する可能性があります。
4. デバッグログを有効にする
デバッグログが有効になっている場合、PyODPS はすべてのリクエストとレスポンスをログに記録します。これにより、リクエストの各ステージの正確なタイムスタンプが表示されます:
import datetime
import logging
from odps import ODPS
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
o = ODPS(...) # アカウントの認証情報を入力します。
print("Check time:", datetime.datetime.now())
inst = o.run_sql("select * from your_table")
ログ出力には、各フェーズがいつ開始され、どれくらいの時間がかかったかが示され、遅延が発生している場所を特定するのに役立ちます。