對於高QPS的大Cellular Data Package(例如映像服務),開啟VPC高速直連,可大幅提高訪問效能、降低訪問延時。但是對於多執行個體情況,不具備負載平衡能力。如果您當前已有微服務在使用Nacos,可以利用Nacos實現存取控制。本文介紹如何通過掛載Nacos執行個體進行服務調用。
前提條件
已建立Virtual Private Cloud、交換器和安全性群組,確保交換器的IP位址區段有足夠的IP,EAS會將您部署推理服務中的執行個體IP在彈性網卡中註冊。具體操作,請參見建立和管理專用網路和管理安全性群組。
重要安全性群組控制ECS執行個體的出入流量,使用者的ECS執行個體和EAS服務執行個體之間的網路互連也受安全性群組的配置控制。預設普通安全性群組內,執行個體之間是內網互連的,您可以在配置VPC高速直連時,選擇需要訪問EAS服務的ECS執行個體所在安全性群組,從而支援執行個體之間網路互連。當需要配置使用不同的安全性群組時,請設定安全性群組的規則支援ECS執行個體之間能夠互連。
EAS已開通VPC高速直連功能,詳情請參見配置網路連通。
已有Nacos執行個體,詳情請參見建立執行個體。
確保Nacos所在的Virtual Private Cloud和交換器與您在高速直連的配置一致。
調用原理
您在指定的Virtual Private Cloud和交換器下建立了一個Nacos執行個體(與EAS開通VPC高速直連的Virtual Private Cloud和交換器相同);
您在部署EAS推理服務時,指定掛載的Nacos;
EAS將推理服務對應的Pod註冊到您的Nacos執行個體下;
用戶端啟動了一個Nacos服務註冊監聽;
Nacos將EAS服務的IP列表變化推送到用戶端;
用戶端使用SDK來選擇一個執行個體(SDK封裝了負載平衡調度演算法:加權輪詢WRR);
使用SDK返回的服務執行個體的IP:Port,查看調用資訊通過IP:Port發起調用;
配置Nacos掛載
建立或更新服務,在服務配置添加Nacos配置,配置支援數組,即如果添加多個Nacos,會將服務同時註冊到多個Nacos中,可供使用者容災等情境使用。涉及高速直連和Nacos掛載的具體配置如下:
{
"cloud": {
"networking": {
"security_group_id": "sg-*****",
"vpc_id": "vpc-***",
"vswitch_id": "vsw-****"
}
},
"networking": {
"nacos": [
{
"nacos_id": "mse_regserverless_cn-****"
}
]
}
}參數 | 描述 | ||
cloud | networking | vpc_id | 通過配置VPC、交換器和安全性群組來啟用VPC高速直連。 重要
|
vswitch_id | |||
security_group_id | |||
networking | nacos | id | 表示已建立的Nacos執行個體ID。 |
檢查Nacos服務註冊
服務部署成功後,EAS會將服務註冊到您的Nacos執行個體上,具體的註冊資訊如下:
cluster:預設的DEFAULT叢集
namespace: 預設的公用命名空間
serviceName: 你的服務名稱
groupName: 系統產生固定值,pai-eas
檢查建立的服務資訊是否正確:

檢查服務內的執行個體數是否正確,執行個體狀態是否正常:

用戶端發起調用
Go
package main
import (
"fmt"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/util"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"strconv"
"testing"
"time"
)
var Clients = make(map[string]NacosClientManager)
type NacosClientManager struct {
userId string
endPoint string
client naming_client.INamingClient
}
func NewAndGetNacosClient(userId string, endPoint string) (*NacosClientManager, error) {
key := generateKey(userId, endPoint)
client, exists := Clients[key]
if exists {
return &client, nil
}
client, exists = Clients[key]
if exists {
return &client, nil
}
newClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: constant.NewClientConfig(
constant.WithNamespaceId(""),
constant.WithTimeoutMs(5000),
constant.WithNotLoadCacheAtStart(true),
constant.WithLogDir("/tmp/nacos/log"),
constant.WithCacheDir("/tmp/nacos/cache"),
constant.WithLogLevel("debug"),
),
ServerConfigs: []constant.ServerConfig{
*constant.NewServerConfig(endPoint, 8848, constant.WithContextPath("/nacos")),
},
},
)
if err != nil {
return nil, err
}
nacosClient := NacosClientManager{
userId: userId,
endPoint: endPoint,
client: newClient,
}
Clients[key] = nacosClient
return &nacosClient, nil
}
func (p *NacosClientManager) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
instance, err := p.client.SelectOneHealthyInstance(param)
if err != nil {
return nil, fmt.Errorf("SelectOneHealthyInstance failed: %v", err)
}
fmt.Printf("SelectOneHealthyInstance success, param: %+v\n", param)
return instance, nil
}
func (p *NacosClientManager) Subscribe(service string, group string) error {
subscribeParam := &vo.SubscribeParam{
ServiceName: service,
GroupName: group,
SubscribeCallback: func(services []model.Instance, err error) {
fmt.Printf("callback return services:%s \n\n", util.ToJsonString(services))
},
}
return p.client.Subscribe(subscribeParam)
}
func generateKey(userId string, endPoint string) string {
return userId + fmt.Sprintf("-%s", endPoint)
}
func Test(t *testing.T) {
nacosClient, err := NewAndGetNacosClient("yourAliyunUid", "yourNacosEndpoint")
if err != nil {
panic(err)
}
nacosClient.Subscribe("your_service", "pai-eas")
params := vo.SelectOneHealthInstanceParam{
ServiceName: "your_service",
GroupName: "pai-eas",
}
instance, err := nacosClient.SelectOneHealthyInstance(params)
fmt.Println(instance)
// check your own invoke info on console and replace host by ip:port
url := fmt.Sprintf("http://%s:%s/api/predict/xxxxxxx", instance.Ip, strconv.FormatUint(instance.Port, 10))
fmt.Println(url)
//todo invoke service by url
} Python
Nacos官方的Python SDK沒有負載平衡演算法,需要自行實現WRR負載平衡。
安裝依賴
pip install nacos-sdk-python發起調用
# -*- coding: utf8 -*-
import nacos
# Nacos server configuration
SERVER_ADDRESSES = "yourNacosEndpoint"
NAMESPACE = "" # Use default namespace if empty
SERVICE_NAME = "your_service"
GROUP_NAME = "pai-eas"
# Create Nacos client
client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)
def callback(args):
print(args)
if __name__ == '__main__':
# Correctly pass the callback function as a list
client.add_config_watchers(SERVICE_NAME, "pai-eas", [callback])
# Fetch and print naming instance details
b = client.list_naming_instance(SERVICE_NAME, None, None, GROUP_NAME, True)
print(b)
if b"hosts":
print("ip", b["hosts"][0]["ip"])
ip = b["hosts"][0]["ip"]
port = b["hosts"][0]["port"]
# check your own invoke info on console and replace host by ip:port
url = f"http://{ip}:{port}/api/predict/xxxxxxx"
print(url)
#todo invoke service by url Java
添加Maven依賴
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>2.3.2</version>
</dependency>發起調用
package test;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
public class NacosTest {
public static void main(String[] args) {
try {
String serverAddr = "yourNacosEndpoint:8848";
NamingService namingService = NacosFactory.createNamingService(serverAddr);
ExecutorService executorService = Executors.newFixedThreadPool(1);
EventListener serviceListener = new AbstractEventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
System.out.println(((NamingEvent) event).getServiceName());
System.out.println(((NamingEvent) event).getGroupName());
}
}
@Override
public Executor getExecutor() {
return executorService;
}
};
namingService.subscribe("your_service", "pai-eas", serviceListener);
Instance instance = namingService.selectOneHealthyInstance("your_service", "pai-eas");
System.out.println(instance.getIp());
System.out.println(instance.getPort());
// check your own invoke info on console and replace host by ip:port
String url = String.format("http://%s:%d/api/predict/xxxxxxx", instance.getIp(), instance.getPort());
System.out.println(url);
//todo invoke service by url
} catch (NacosException e) {
e.printStackTrace();
}
}
}