Ray叢集是EMR Serverless Spark工作空間提供的分散式運算架構,支援Python原生分散式運算、機器學習模型訓練與推理等情境。本文介紹如何建立、啟動Ray叢集,以及如何提交Ray作業。
建立Ray叢集
登入EMR Serverless Spark控制台,進入目標工作空間。
在左側導覽列,單擊叢集管理,然後單擊RAY叢集頁簽。
單擊建立Ray叢集,在建立面板中配置以下參數,然後單擊建立。
叢集名稱:輸入集群名稱。
引擎版本:選擇引擎版本。當前提供預設版本err-1.0.1 (Ray 2.47.1, Python 3.12)。
節點群組:如需同時使用 CPU 與 GPU 混合資源,可分別建立 CPU 節點群組和 GPU 節點群組。單擊+添加Worker節點群組,配置以下資訊。
節點群組名稱:輸入節點群組名稱,不同節點群組的名稱不能重複。
資源隊列:選擇資源隊列。
資源規格:選擇節點規格。
節點數量:設定Worker節點數量。
(可選)網路連接:如需從Ray叢集訪問同VPC內的服務,選擇已建立的網路連接。
(可選)掛載納管檔案目錄:選擇已建立的納管檔案目錄,以實現在 Ray 任務中讀寫檔案目錄下的檔案。掛載是一種常用的資料訪問方式,支援將 CPFS、NAS 和 OSS 路徑映射為本地目錄,方便程式以本地路徑的方式訪問資料。如需建立新的納管檔案目錄,詳情請參見管理納管檔案目錄。
(可選)叢集進階配置:以JSON格式配置進階參數,詳情請參見叢集進階配置參數說明。
當前控制台暫不支援配置Worker節點自動調整,如需配置,可通過API方式修改,詳情請參見CreateRayCluster - 建立Ray叢集。
啟動Ray叢集
在Ray叢集列表中,找到目的地組群,單擊啟動。
等待叢集運行狀態變為運行中。
說明工作空間內初次開機Ray叢集時,系統需要建立額外組件,啟動時間約需2~3分鐘;後續啟動速度會明顯加快。
叢集啟動後,單擊調用資訊,記錄提交Ray作業所需的地址和Token資訊。
警告目前的版本中,Token對同一Ray叢集永久有效,請妥善保管,避免泄露。
單擊Dashboard,可進入Ray叢集的監控介面,查看叢集資源使用方式。
提交Ray作業
方式一:互動式開發
適合快速入門和調試。在EMR Serverless Spark控制台的Notebook環境中直接連接Ray叢集(無需額外配置網路)。如從本地串連,需確保本地機器與Ray叢集之間網路互連,且環境為Python 3.12。
安裝Ray用戶端:
pip install ray[client]==2.47.1在叢集的調用資訊頁面,擷取gRPC 地址和Token。
使用以下範例程式碼串連Ray叢集,進行互動式開發:
import ray import os def get_metadata(): headers = {"ray-token": "<yourToken>"} return [(key.lower(), value) for key, value in headers.items()] ray.init(address="<your_gRPC_address>", _metadata=get_metadata()) import time @ray.remote def square(x): time.sleep(0.1) return x * x futures = [square.remote(i) for i in range(10)] results = ray.get(futures) print("平方結果:", results)
方式二:SDK提交(批任務)
適合將Ray作業嵌入工程代碼中提交。本地環境要求:Python 3.12。
安裝Ray Job Submission用戶端庫:
pip install "ray[default]==2.47.1"在叢集的調用資訊頁面,擷取公網調用地址和Token。
使用以下範例程式碼串連Ray叢集並提交作業:
import time from ray.job_submission import JobSubmissionClient custom_headers = { "ray-token": "<yourToken>" } client = JobSubmissionClient("<公網調用地址>", headers=custom_headers) # 或者用內網,需要保證提交端在同一region vpc下,建議內網提交,更加穩定 # client = JobSubmissionClient("http://emr-spark-ray-gateway-cn-beijing-internal.spark.emr.aliyuncs.com", headers=custom_headers) job_id = client.submit_job( entrypoint="python -c 'print(\"Hello from Ray Client!\")'" ) print(f"Submitted job with ID: {job_id}") while True: status = client.get_job_status(job_id) print(f"Job status: {status}") if status.is_terminal(): break time.sleep(1) logs = client.get_job_logs(job_id) print("Job logs:") print(logs)
(可選)如需在作業中訪問掛載目錄中的檔案,可在建立Ray叢集時掛載納管檔案目錄,然後在提交作業時通過掛載路徑引用檔案。以下樣本假設已將OSS目錄掛載到
/mnt/myoss路徑下:import time from ray.job_submission import JobSubmissionClient custom_headers = { "ray-token": "<yourToken>" } client = JobSubmissionClient("<公網調用地址>", headers=custom_headers) # 通過掛載路徑直接引用OSS中的指令檔作為entrypoint job_id = client.submit_job( entrypoint="python /mnt/myoss/main.py" ) print(f"Submitted job with ID: {job_id}") while True: status = client.get_job_status(job_id) print(f"Job status: {status}") if status.is_terminal(): break time.sleep(1) logs = client.get_job_logs(job_id) print("Job logs:") print(logs)說明在Ray任務代碼中,也可以直接通過掛載路徑讀寫檔案。例如,讀取
/mnt/myoss/test.txt或寫入檔案到/mnt/myoss/目錄下,寫入的檔案會同步到對應的OSS路徑中。以下樣本展示如何在Ray任務中讀寫掛載目錄中的檔案:
# 下面是提交到ray叢集的代碼 import ray import time ray.init() @ray.remote def read_from_mount_path(i): with open('/mnt/myoss/test.txt', 'r', encoding='utf-8') as f: content = f.read() return content @ray.remote def write_to_mount_path(i): content = "Hello world " + str(i) with open('/mnt/myoss/output' + str(i) + '.txt', 'w', encoding='utf-8') as f: f.write(content) return '寫入成功' print("--- 開始運行遠程任務 ---") start_time = time.time() obj_refs = [read_from_mount_path.remote(i) for i in range(4)] obj_refs2 = [write_to_mount_path.remote(i) for i in range(4)] results = ray.get(obj_refs) results2 = ray.get(obj_refs2)
方式三:命令列提交(批任務)
適合與外部系統整合。本地環境要求:Python 3.12。
安裝Ray Job Submission用戶端庫:
pip install "ray[default]==2.47.1"在叢集的調用資訊頁面,擷取公網調用地址和Token。
執行以下命令提交Ray作業:
# ray job submit前必須設定ray address和headers export RAY_ADDRESS='<公網調用地址>' export RAY_JOB_HEADERS='{"ray-token": "<yourToken>"}' ## 提交任務 ### working dir 支援本地 & Ray core distributed sort ray job submit --working-dir "." -- python test-ray-core-sort.py ### 支援OSS讀寫 ray job submit --working-dir "." -- python test-saving-data-test.py ### 支援OSS-HDFS讀寫 ray job submit --working-dir "." -- python test-saving-data-oss-hdfs-test.py ### 代碼支援Mount讀寫 ray job submit --working-dir "." -- python test-saving-data-test-mount.py
更多命令列參數,請參見Ray Job Submission CLI官方文檔。
叢集進階配置參數說明
叢集進階配置以JSON格式填寫,所有參數均為可選。
參數 | 是否必選 | 說明 |
| 可選 | 指定在叢集啟動時需要下載到Head節點和Worker節點的OSS檔案。支援OSS和OSS-HDFS路徑,多個路徑之間用英文逗號(,)分隔。檔案下載至各節點的 |
| 可選 | 指定用於初始化Head節點和Worker節點Python基礎環境的requirements.txt檔案。支援OSS和OSS-HDFS路徑,路徑格式必須為 |