このトピックでは、MaxFrameに関するよくある質問に対する回答を提供します。
質問1: エラーメッセージ「関数UDF定義に無効な型INT、odps.sql.type.system.odps2=trueを設定する必要があります。使用するには」
原因: MaxCompute V2.0データ型エディションがジョブで有効になっていませんが、MaxCompute V2.0データ型エディションでサポートされているデータ型がジョブで使用されています。
解決策: 関連するフラグパラメーターを設定して、MaxCompute 2.0データ型エディションを有効にします。 サンプルコード:
from maxframe import config # Add the following code before new_session. config.options.sql.settings = { "odps.sql.type.system.odps2": "true" }
質問2: エラーメッセージ「UDF: 'cloudpickle' という名前のモジュールがありません」
原因: 必要なcloudpickleパッケージが使用されていません。
解決策: MaxComputeベースイメージを参照します。 サンプルコード:
from maxframe import config # Add the following code before new_session. config.options.sql.settings = { "odps.session.image": "common", }
質問3: DataFrameに送信されたUDFのリソースを再利用する方法
一部のUDFシナリオでは、プログラムは、データベース接続の初期化やモデルの読み込みなど、多くのリソースの作成と破棄のアクティビティを実行します。 リソースの再利用を実現するために、デフォルト値が1回だけ初期化されるPython関数パラメーターの機能を使用できます。 たとえば、次のUDFコードは、モデルが1回だけ読み込まれることを示しています。
def predict(s, _ctx={}):
from ultralytics import YOLO
# The initial value of _ctx is an empty dictionary, which is initialized only once during the execution of the Python program.
# When using the model, you can first check if the model exists in _ctx. If it does not exist, load it and then store it in the dictionary.
if not _ctx.get("model", None):
model = YOLO(os.path.join("./", "yolo11n.pt"))
_ctx["model"] = model
model = _ctx["model"]
# Call the relevant interfaces of the model.
次のUDFコードは、カスタムクラスMyConnector
を使用してデータベース接続の作成と終了を処理することを示しています。
class MyConnector:
def __init__(self):
# create a database connection in __init__ function.
self.conn = create_connection()
def __del__(self):
# close a database connection in __del__ function.
try:
self.conn.close()
except:
pass
def process(s, connector=MyConnector()):
# Directly call the database connection within the connector, without creating and closing the connection again inside the UDF.
connector.conn.execute("xxxxx")
説明
初期化が実行される実際の回数は、UDFを実行しているワーカーの数によって異なります。 各Workerは、別々のPython環境でUDFを実行します。 たとえば、UDF呼び出しが100,000行のデータを処理する必要があり、10のUDF Workerに分散され、各Workerが10,000行を処理する場合、合計10回の初期化が実行されます。 ただし、各ワーカーは、10,000のデータ行の処理中に初期化プロセスを1回だけ実行します。