全部產品
Search
文件中心

E-MapReduce:向Ray叢集提交任務

更新時間:Jun 10, 2026

Ray叢集是EMR Serverless Spark工作空間提供的分散式運算架構,支援Python原生分散式運算、機器學習模型訓練與推理等情境。本文介紹如何建立、啟動Ray叢集,以及如何提交Ray作業。

建立Ray叢集

  1. 登入EMR Serverless Spark控制台,進入目標工作空間。

  2. 在左側導覽列,單擊叢集管理,然後單擊RAY叢集頁簽。

  3. 單擊建立Ray叢集,在建立面板中配置以下參數,然後單擊建立

    • 叢集名稱:輸入集群名稱。

    • 引擎版本:選擇引擎版本。當前提供預設版本err-1.0.1 (Ray 2.47.1, Python 3.12)。

    • 節點群組:如需同時使用 CPU 與 GPU 混合資源,可分別建立 CPU 節點群組和 GPU 節點群組。單擊+添加Worker節點群組,配置以下資訊。

      • 節點群組名稱:輸入節點群組名稱,不同節點群組的名稱不能重複。

      • 資源隊列:選擇資源隊列。

      • 資源規格:選擇節點規格。

      • 節點數量:設定Worker節點數量。

    • (可選)網路連接:如需從Ray叢集訪問同VPC內的服務,選擇已建立的網路連接。

    • (可選)掛載納管檔案目錄:選擇已建立的納管檔案目錄,以實現在 Ray 任務中讀寫檔案目錄下的檔案。掛載是一種常用的資料訪問方式,支援將 CPFS、NAS 和 OSS 路徑映射為本地目錄,方便程式以本地路徑的方式訪問資料。如需建立新的納管檔案目錄,詳情請參見管理納管檔案目錄

    • (可選)叢集進階配置:以JSON格式配置進階參數,詳情請參見叢集進階配置參數說明

說明

當前控制台暫不支援配置Worker節點自動調整,如需配置,可通過API方式修改,詳情請參見CreateRayCluster - 建立Ray叢集

啟動Ray叢集

  1. 在Ray叢集列表中,找到目的地組群,單擊啟動

  2. 等待叢集運行狀態變為運行中

    說明

    工作空間內初次開機Ray叢集時,系統需要建立額外組件,啟動時間約需2~3分鐘;後續啟動速度會明顯加快。

  3. 叢集啟動後,單擊調用資訊,記錄提交Ray作業所需的地址和Token資訊。

    警告

    目前的版本中,Token對同一Ray叢集永久有效,請妥善保管,避免泄露。

    單擊Dashboard,可進入Ray叢集的監控介面,查看叢集資源使用方式。

提交Ray作業

方式一:互動式開發

適合快速入門和調試。在EMR Serverless Spark控制台的Notebook環境中直接連接Ray叢集(無需額外配置網路)。如從本地串連,需確保本地機器與Ray叢集之間網路互連,且環境為Python 3.12。

  1. 安裝Ray用戶端:

    pip install ray[client]==2.47.1
  2. 在叢集的調用資訊頁面,擷取gRPC 地址Token

  3. 使用以下範例程式碼串連Ray叢集,進行互動式開發:

    import ray
    import os
    
    def get_metadata():
        headers = {"ray-token": "<yourToken>"}
        return [(key.lower(), value) for key, value in headers.items()]
    
    ray.init(address="<your_gRPC_address>", _metadata=get_metadata())
    
    import time
    @ray.remote
    def square(x):
        time.sleep(0.1)
        return x * x
    
    futures = [square.remote(i) for i in range(10)]
    
    results = ray.get(futures)
    print("平方結果:", results)

    image

方式二:SDK提交(批任務)

適合將Ray作業嵌入工程代碼中提交。本地環境要求:Python 3.12。

  1. 安裝Ray Job Submission用戶端庫:

    pip install "ray[default]==2.47.1"
  2. 在叢集的調用資訊頁面,擷取公網調用地址Token

  3. 使用以下範例程式碼串連Ray叢集並提交作業:

    import time
    from ray.job_submission import JobSubmissionClient
    
    custom_headers = {
        "ray-token": "<yourToken>"
    }
    
    client = JobSubmissionClient("<公網調用地址>", headers=custom_headers)
    
    # 或者用內網,需要保證提交端在同一region vpc下,建議內網提交,更加穩定
    # client = JobSubmissionClient("http://emr-spark-ray-gateway-cn-beijing-internal.spark.emr.aliyuncs.com", headers=custom_headers)
    
    job_id = client.submit_job(
        entrypoint="python -c 'print(\"Hello from Ray Client!\")'"
    )
    
    print(f"Submitted job with ID: {job_id}")
    
    while True:
        status = client.get_job_status(job_id)
        print(f"Job status: {status}")
        if status.is_terminal():
            break
        time.sleep(1)
    
    logs = client.get_job_logs(job_id)
    print("Job logs:")
    print(logs) 

    image

  4. (可選)如需在作業中訪問掛載目錄中的檔案,可在建立Ray叢集時掛載納管檔案目錄,然後在提交作業時通過掛載路徑引用檔案。以下樣本假設已將OSS目錄掛載到/mnt/myoss路徑下:

    import time
    from ray.job_submission import JobSubmissionClient
    
    custom_headers = {
        "ray-token": "<yourToken>"
    }
    
    client = JobSubmissionClient("<公網調用地址>", headers=custom_headers)
    
    # 通過掛載路徑直接引用OSS中的指令檔作為entrypoint
    job_id = client.submit_job(
        entrypoint="python /mnt/myoss/main.py"
    )
    
    print(f"Submitted job with ID: {job_id}")
    
    while True:
        status = client.get_job_status(job_id)
        print(f"Job status: {status}")
        if status.is_terminal():
            break
        time.sleep(1)
    
    logs = client.get_job_logs(job_id)
    print("Job logs:")
    print(logs)
    說明

    在Ray任務代碼中,也可以直接通過掛載路徑讀寫檔案。例如,讀取/mnt/myoss/test.txt或寫入檔案到/mnt/myoss/目錄下,寫入的檔案會同步到對應的OSS路徑中。

    以下樣本展示如何在Ray任務中讀寫掛載目錄中的檔案:

    # 下面是提交到ray叢集的代碼
    
    import ray
    import time
    
    ray.init()
    
    @ray.remote
    def read_from_mount_path(i):
        with open('/mnt/myoss/test.txt', 'r', encoding='utf-8') as f:
            content = f.read()
        return content
    
    @ray.remote
    def write_to_mount_path(i):
        content = "Hello world " + str(i)
        with open('/mnt/myoss/output' + str(i) + '.txt', 'w', encoding='utf-8') as f:
            f.write(content)
        return '寫入成功'
    
    print("--- 開始運行遠程任務 ---")
    start_time = time.time()
    obj_refs = [read_from_mount_path.remote(i) for i in range(4)]
    obj_refs2 = [write_to_mount_path.remote(i) for i in range(4)]
    results = ray.get(obj_refs)
    results2 = ray.get(obj_refs2)

方式三:命令列提交(批任務)

適合與外部系統整合。本地環境要求:Python 3.12。

  1. 安裝Ray Job Submission用戶端庫:

    pip install "ray[default]==2.47.1"
  2. 在叢集的調用資訊頁面,擷取公網調用地址Token

  3. 執行以下命令提交Ray作業:

    # ray job submit前必須設定ray address和headers
    export RAY_ADDRESS='<公網調用地址>'               
    export RAY_JOB_HEADERS='{"ray-token": "<yourToken>"}'
    
    ## 提交任務
    ### working dir 支援本地 & Ray core distributed sort
    ray job submit --working-dir "." -- python  test-ray-core-sort.py
    
    ### 支援OSS讀寫
    ray job submit --working-dir "." -- python test-saving-data-test.py
    
    ### 支援OSS-HDFS讀寫
    ray job submit --working-dir "." -- python test-saving-data-oss-hdfs-test.py
    
    ### 代碼支援Mount讀寫
    ray job submit --working-dir "." -- python  test-saving-data-test-mount.py

更多命令列參數,請參見Ray Job Submission CLI官方文檔

叢集進階配置參數說明

叢集進階配置以JSON格式填寫,所有參數均為可選。

參數

是否必選

說明

userDefinedFiles

可選

指定在叢集啟動時需要下載到Head節點和Worker節點的OSS檔案。支援OSS和OSS-HDFS路徑,多個路徑之間用英文逗號(,)分隔。檔案下載至各節點的/home/ray/work-dir目錄下。樣本:oss://mybucket/hello.py,oss://mybucket2/test/test.jar

userRequirementsFile

可選

指定用於初始化Head節點和Worker節點Python基礎環境的requirements.txt檔案。支援OSS和OSS-HDFS路徑,路徑格式必須為oss://<bucket>/<path>/requirements.txt。Ray叢集啟動後,系統自動執行pip install -r requirements.txt命令(非同步執行,不影響叢集啟動)。