全部產品
Search
文件中心

Platform For AI:通過關聯服務發現Nacos調用

更新時間:Jan 10, 2025

對於高QPS的大Cellular Data Package(例如映像服務),開啟VPC高速直連,可大幅提高訪問效能、降低訪問延時。但是對於多執行個體情況,不具備負載平衡能力。如果您當前已有微服務在使用Nacos,可以利用Nacos實現存取控制。本文介紹如何通過掛載Nacos執行個體進行服務調用。

前提條件

  • 已建立Virtual Private Cloud、交換器和安全性群組,確保交換器的IP位址區段有足夠的IP,EAS會將您部署推理服務中的執行個體IP在彈性網卡中註冊。具體操作,請參見建立和管理專用網路管理安全性群組

    重要

    安全性群組控制ECS執行個體的出入流量,使用者的ECS執行個體和EAS服務執行個體之間的網路互連也受安全性群組的配置控制。預設普通安全性群組內,執行個體之間是內網互連的,您可以在配置VPC高速直連時,選擇需要訪問EAS服務的ECS執行個體所在安全性群組,從而支援執行個體之間網路互連。當需要配置使用不同的安全性群組時,請設定安全性群組的規則支援ECS執行個體之間能夠互連。

  • EAS已開通VPC高速直連功能,詳情請參見配置網路連通

  • 已有Nacos執行個體,詳情請參見建立執行個體

  • 確保Nacos所在的Virtual Private Cloud和交換器與您在高速直連的配置一致。

調用原理

  1. 您在指定的Virtual Private Cloud和交換器下建立了一個Nacos執行個體(與EAS開通VPC高速直連的Virtual Private Cloud和交換器相同);

  2. 您在部署EAS推理服務時,指定掛載的Nacos;

  3. EAS將推理服務對應的Pod註冊到您的Nacos執行個體下;

  4. 用戶端啟動了一個Nacos服務註冊監聽;

  5. Nacos將EAS服務的IP列表變化推送到用戶端;

  6. 用戶端使用SDK來選擇一個執行個體(SDK封裝了負載平衡調度演算法:加權輪詢WRR);

  7. 使用SDK返回的服務執行個體的IP:Port,查看調用資訊通過IP:Port發起調用;

配置Nacos掛載

建立或更新服務,在服務配置添加Nacos配置,配置支援數組,即如果添加多個Nacos,會將服務同時註冊到多個Nacos中,可供使用者容災等情境使用。涉及高速直連和Nacos掛載的具體配置如下:

{
    "cloud": {
        "networking": {
            "security_group_id": "sg-*****",
            "vpc_id": "vpc-***",
            "vswitch_id": "vsw-****"
        }
    },
    "networking": {
        "nacos": [
            {
                "nacos_id": "mse_regserverless_cn-****"
            }
        ]
    }
}

參數

描述

cloud

networking

vpc_id

通過配置VPC、交換器和安全性群組來啟用VPC高速直連。

重要
  • 請使用與Nacos執行個體一致的專用網路。

  • 請確保交換器的IP位址區段有足夠的IP。

vswitch_id

security_group_id

networking

nacos

id

表示已建立的Nacos執行個體ID。

檢查Nacos服務註冊

服務部署成功後,EAS會將服務註冊到您的Nacos執行個體上,具體的註冊資訊如下:

  • cluster:預設的DEFAULT叢集

  • namespace: 預設的公用命名空間

  • serviceName: 你的服務名稱

  • groupName: 系統產生固定值,pai-eas

檢查建立的服務資訊是否正確:

掛載Nacos

檢查服務內的執行個體數是否正確,執行個體狀態是否正常:

Nacos執行個體狀態

用戶端發起調用

Go

 package main

import (
	"fmt"
	"github.com/nacos-group/nacos-sdk-go/v2/clients"
	"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
	"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
	"github.com/nacos-group/nacos-sdk-go/v2/model"
	"github.com/nacos-group/nacos-sdk-go/v2/util"
	"github.com/nacos-group/nacos-sdk-go/v2/vo"
	"strconv"
	"testing"
	"time"
)

var Clients = make(map[string]NacosClientManager)

type NacosClientManager struct {
	userId   string
	endPoint string
	client   naming_client.INamingClient
}

func NewAndGetNacosClient(userId string, endPoint string) (*NacosClientManager, error) {
	key := generateKey(userId, endPoint)
	client, exists := Clients[key]

	if exists {
		return &client, nil
	}

	client, exists = Clients[key]
	if exists {
		return &client, nil
	}

	newClient, err := clients.NewNamingClient(
		vo.NacosClientParam{
			ClientConfig: constant.NewClientConfig(
				constant.WithNamespaceId(""),
				constant.WithTimeoutMs(5000),
				constant.WithNotLoadCacheAtStart(true),
				constant.WithLogDir("/tmp/nacos/log"),
				constant.WithCacheDir("/tmp/nacos/cache"),
				constant.WithLogLevel("debug"),
			),
			ServerConfigs: []constant.ServerConfig{
				*constant.NewServerConfig(endPoint, 8848, constant.WithContextPath("/nacos")),
			},
		},
	)
	if err != nil {
		return nil, err
	}
	nacosClient := NacosClientManager{
		userId:   userId,
		endPoint: endPoint,
		client:   newClient,
	}
	Clients[key] = nacosClient
	return &nacosClient, nil
}

func (p *NacosClientManager) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
	instance, err := p.client.SelectOneHealthyInstance(param)
	if err != nil {
		return nil, fmt.Errorf("SelectOneHealthyInstance failed: %v", err)
	}
	fmt.Printf("SelectOneHealthyInstance success, param: %+v\n", param)
	return instance, nil
}

func (p *NacosClientManager) Subscribe(service string, group string) error {
	subscribeParam := &vo.SubscribeParam{
		ServiceName: service,
		GroupName:   group,
		SubscribeCallback: func(services []model.Instance, err error) {
			fmt.Printf("callback return services:%s \n\n", util.ToJsonString(services))
		},
	}
	return p.client.Subscribe(subscribeParam)
}

func generateKey(userId string, endPoint string) string {
	return userId + fmt.Sprintf("-%s", endPoint)
}

func Test(t *testing.T) {

	nacosClient, err := NewAndGetNacosClient("yourAliyunUid", "yourNacosEndpoint")
	if err != nil {
		panic(err)
	}

	nacosClient.Subscribe("your_service", "pai-eas")

	params := vo.SelectOneHealthInstanceParam{
		ServiceName: "your_service",
		GroupName:   "pai-eas",
	}
	
	instance, err := nacosClient.SelectOneHealthyInstance(params)
	fmt.Println(instance)

    // check your own invoke info on console and replace host by ip:port
	url := fmt.Sprintf("http://%s:%s/api/predict/xxxxxxx", instance.Ip, strconv.FormatUint(instance.Port, 10))
	fmt.Println(url)

    //todo invoke service by url
}    

Python

說明

Nacos官方的Python SDK沒有負載平衡演算法,需要自行實現WRR負載平衡。

安裝依賴

pip install nacos-sdk-python

發起調用

# -*- coding: utf8 -*-
import nacos

# Nacos server configuration
SERVER_ADDRESSES = "yourNacosEndpoint"
NAMESPACE = ""  # Use default namespace if empty
SERVICE_NAME = "your_service"
GROUP_NAME = "pai-eas"

# Create Nacos client
client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)


def callback(args):
    print(args)


if __name__ == '__main__':
    # Correctly pass the callback function as a list
    client.add_config_watchers(SERVICE_NAME, "pai-eas", [callback])

    # Fetch and print naming instance details
    b = client.list_naming_instance(SERVICE_NAME, None, None, GROUP_NAME, True)
    print(b)
    if b"hosts":
        print("ip", b["hosts"][0]["ip"])
        ip = b["hosts"][0]["ip"]
        port = b["hosts"][0]["port"]
        # check your own invoke info on console and replace host by ip:port
        url = f"http://{ip}:{port}/api/predict/xxxxxxx"
        print(url)
     #todo invoke service by url        

Java

添加Maven依賴

<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
    <version>2.3.2</version>
</dependency>

發起調用

package test;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;



import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;

public class NacosTest {
    public static void main(String[] args) {
        try {
            String serverAddr = "yourNacosEndpoint:8848";
            NamingService namingService = NacosFactory.createNamingService(serverAddr);

            ExecutorService executorService = Executors.newFixedThreadPool(1);
            EventListener serviceListener = new AbstractEventListener() {
                @Override
                public void onEvent(Event event) {
                    if (event instanceof NamingEvent) {
                        System.out.println(((NamingEvent) event).getServiceName());
                        System.out.println(((NamingEvent) event).getGroupName());
                    }
                }

                @Override
                public Executor getExecutor() {
                    return executorService;
                }
            };
            namingService.subscribe("your_service", "pai-eas", serviceListener);
            Instance instance = namingService.selectOneHealthyInstance("your_service", "pai-eas");
            System.out.println(instance.getIp());
            System.out.println(instance.getPort());
            // check your own invoke info on console and replace host by ip:port
            String url = String.format("http://%s:%d/api/predict/xxxxxxx", instance.getIp(), instance.getPort());
            System.out.println(url);
            
            //todo invoke service by url
            
        } catch (NacosException e) {
            e.printStackTrace();
        }
    }
}