このトピックでは、Mars クラスタの作成方法、Mars を使用して MaxCompute テーブルからデータを読み取る方法、および Mars UI の URL を取得する方法について説明します。
前提条件
Mars ランタイム環境が準備されていること。詳細については、「準備」をご参照ください。
Mars クラスタ操作
Mars クラスタを作成する
次のコマンドを実行して、Mars クラスタを作成します。このプロセスは完了するまでに時間がかかります。
from odps import options options.verbose = True # 上記のコマンドが DataWorks PyODPS 3 ノードで既に構成されている場合は、これら 2 つのコマンドを実行する必要はありません。 client = o.create_mars_cluster(5, 4, 16, min_worker_num=3)
パラメーター:
5: クラスタ内のワーカーノードの数。
4: 各ワーカーノードの CPU コア数。
16: 各ワーカーノードのメモリサイズ。単位: GB。
説明各ワーカーノードにリクエストするメモリサイズは 1 GB より大きくする必要があります。CPU コアとメモリサイズの最適な比率は 1:4 です。たとえば、4 つの CPU コアと 16 GB のメモリを持つワーカーノードを構成します。
最大 30 個のワーカーノードを作成できます。ワーカーノードの数が上限を超えると、イメージサーバーが過負荷になる可能性があります。30 を超えるワーカーノードが必要な場合は、DingTalk グループ(ID: 11782920)を検索して MaxCompute 開発者コミュニティに参加し、テクニカルサポートに連絡してください。
min_worker_num: システムがクライアントオブジェクトを返すために起動する必要があるワーカーノードの最小数。このパラメーターを 3 に設定すると、3 つのワーカーノードが起動した後にシステムはクライアントオブジェクトを返します。
Mars クラスタを作成するときに
options.verbose
を True に設定すると、MaxCompute インスタンスの Logview および Mars UI の URL がコマンド出力に表示されます。Mars UI を使用して Mars クラスタに接続し、クラスタとジョブのステータスをクエリできます。ジョブを送信する
Mars クラスタを作成すると、クラスタはクラスタに接続するデフォルトセッションを作成します。
.execute()
メソッドを呼び出して、ジョブをクラスタに送信し、デフォルトセッションでジョブを実行できます。import mars.dataframe as md import mars.tensor as mt md.DataFrame(mt.random.rand(10, 3)).execute() # .execute() メソッドを呼び出して、作成されたクラスタにジョブを送信します。
クラスタを停止して解放する
Mars クラスタは、作成から 3 日後に自動的に解放されます。
client.stop_server()
メソッドを呼び出して、クラスタを解放することもできます。client.stop_server()
MaxCompute テーブルに対する読み取りおよび書き込み操作
Mars は、MaxCompute テーブルから直接データを読み取り、MaxCompute テーブルにデータを書き込むことができます。このセクションでは、test_mars
テーブルおよび test_mars_persist
テーブルへの読み取りおよび書き込みを例として使用します。次のコードは、test_mars
および test_mars_persist
のテーブル作成ステートメントを示しています。
CREATE TABLE test_mars(col1 INT,col2 INT);
INSERT into test_mars(col1, col2) VALUES (1, 2);
INSERT into test_mars(col1, col2) VALUES (3, 4);
INSERT into test_mars(col1, col2) VALUES (5, 6);
CREATE TABLE test_mars_persist(col1 INT,col2 INT);
テーブルからデータを読み取る
Mars は
o.to_mars_dataframe
メソッドを呼び出して、MaxCompute テーブルからデータを読み取り、Mars DataFrame を返します。df = o.to_mars_dataframe('test_mars') df.head(3).execute()
返される結果:
col1 col2 0 1 2 1 3 4 2 5 6
テーブルにデータを書き込む
Mars は
o.persist_mars_dataframe(df, 'table_name')
メソッドを呼び出して、Mars DataFrame を MaxCompute テーブルとして保存します。df = o.to_mars_dataframe('test_mars') df2 = df + 1 o.persist_mars_dataframe(df2, 'test_mars_persist') # Mars DataFrame を MaxCompute テーブルとして保存します。 o.get_table('test_mars_persist').to_df().head(6) # PyODPS DataFrame API 操作を呼び出してデータをクエリします。
返される結果:
col1 col2 0 2 3 1 4 5 2 6 7
その他の操作
既存の Mars クラスタを使用する
インスタンス ID に基づいて Mars クラスタを再作成します。
client = o.create_mars_cluster(instance_id=**instance-id**)
Mars UI の URL を取得する
Mars クラスタを作成するときに
options.verbose
を True に設定すると、Mars UI の URL がコマンド出力に自動的に表示されます。client.endpoint
を使用して、Mars UI の URL を取得することもできます。print(client.endpoint)
インスタンスの Logview URL を取得する
Mars クラスタを作成するときに
options.verbose
を True に設定すると、Logview URL がコマンド出力に自動的に表示されます。client.get_logview_address()
を使用して、Logview URL を取得することもできます。print(client.get_logview_address())