通過Java SDK使用召回引擎(RecallEngine)
本文介紹如何使用Java SDK調用召回引擎(RecallEngine)進行資料召回和資料寫入操作。
前提條件
已建立召回引擎執行個體,並擷取到執行個體的 Endpoint、Instance ID、使用者名稱和密碼。
已在召回引擎中建立好目標表和召回服務(Service)。
安裝Java SDK
在 Maven 專案的 pom.xml 中添加以下依賴:
<dependency>
<groupId>com.aliyun.openservices.aiservice</groupId>
<artifactId>pairec-sdk</artifactId>
<version>1.0.7</version>
</dependency>初始化RecallEngine用戶端
介面
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);參數說明
參數 | 樣本 | 說明 |
endpoint | http://ep-xxxxxx.aliyuncs.com | 召回引擎服務的訪問地址。 |
username | System.getenv("RECALL_ENGINE_SERVICE_USERNAME") | 通過環境變數擷取認證使用者名稱。 |
password | System.getenv("RECALL_ENGINE_SERVICE_PASSWORD") | 通過環境變數擷取認證密碼。 |
召回引擎服務的訪問地址可以在PAI-Rec管控台 召回管理基本資料頁面查看。

可選配置方法
方法 | 說明 |
withRetryTimes(int retryTimes) | 佈建要求失敗時的重試次數,預設為0(不重試)。 |
withHttpClient(OkHttpClient httpClient) | 設定自訂的 OkHttpClient 執行個體,用於自訂串連池、逾時等參數。 |
說明
用戶端預設連線逾時為200ms,讀寫逾時為500ms,串連池最大串連數為1000。
所有可選配置方法支援鏈式調用。
樣本
import com.aliyun.openservices.pairec.recallengine.RecallEngineClient;
// 通過環境變數擷取認證資訊
String endpoint = System.getenv("RECALL_ENGINE_SERVICE_ENDPOINT");
String username = System.getenv("RECALL_ENGINE_SERVICE_USERNAME");
String password = System.getenv("RECALL_ENGINE_SERVICE_PASSWORD");
// 建立用戶端
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);
// 設定重試次數
client.withRetryTimes(2);召回資料
介面
public RecallResponse recall(RecallRequest request) throws RecallEngineException;RecallRequest參數說明
參數 | JSON屬性名稱 | 類型 | 說明 |
instanceId | instance_id | String | 召回引擎執行個體 ID。 |
service | service | String | 召回服務名稱。 |
version | version | String | 召回服務版本。 |
uid | uid | String | 使用者識別碼。 |
recalls | recalls | Map<String, RecallConf> | 召回配置集合。key 為召回名稱,value 為召回配置。 |
requestId | request_id | String | 請求 ID(可選)。 |
exposureList | exposure_list | String | 曝光列表(可選),用於過濾已曝光的內容。 |
contextParams | context_params | Map<String, Object> | 上下文參數(可選),傳遞自訂的上下文資訊。 |
debug | debug | boolean | 是否開啟偵錯模式(可選),預設為 false。 |
RecallConf參數說明
參數 | 類型 | 說明 |
trigger | String | 觸發項,即召回的 trigger 值。 |
count | int | 期望返回的召回結果數量。 |
RecallResponse說明
方法 | 傳回型別 | 說明 |
getResult() | Record | 擷取召回結果記錄集。 |
樣本
import com.aliyun.openservices.pairec.recallengine.*;
import java.util.HashMap;
import java.util.Map;
// 構造召回請求
RecallRequest request = new RecallRequest();
request.setInstanceId("your-instance-id");
request.setService("recall_test");
request.setVersion("V1");
request.setUid("123");
// 設定召回配置
Map<String, RecallConf> recalls = new HashMap<>();
recalls.put("u2i_recall", new RecallConf("123", 100));
request.setRecalls(recalls);
// 執行召回
RecallResponse resp = client.recall(request);
Record result = resp.getResult();
// 擷取結果資訊
System.out.println("Total records: " + result.size());
System.out.println("Field names: " + result.fieldNames());寫入資料
介面
public WriteResponse write(String instanceId, String table, WriteRequest request) throws RecallEngineException;參數說明
參數 | 類型 | 說明 |
instanceId | String | 召回引擎執行個體 ID。 |
table | String | 目標表名稱。 |
request | WriteRequest | 寫入請求對象。 |
WriteRequest參數說明
參數 | JSON屬性名稱 | 類型 | 說明 |
requestId | request_id | String | 請求 ID(可選)。 |
content | content | List<Map<String, Object>> | 寫入的資料內容。每個 Map 代表一行資料,key 為欄位名,value 為欄位值。 |
versionId | versionId | String | 版本 ID(可選)。 |
WriteResponse說明
WriteResponse 繼承自 Response,包含以下欄位:
方法 | 傳回型別 | 說明 |
getRequestId() | String | 請求 ID。 |
getCode() | String | 狀態代碼,成功時返回 "OK"。 |
getMessage() | String | 響應訊息。 |
getData() | Map<String, Object> | 響應資料。 |
樣本
import com.aliyun.openservices.pairec.recallengine.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
// 構造寫入請求
WriteRequest request = new WriteRequest();
request.setRequestId("write-req-123");
// 構造寫入資料
Map<String, Object> item = new HashMap<>();
item.put("user_id", "123");
item.put("item_id", "item_123");
item.put("score", 0.95);
List<Map<String, Object>> content = new ArrayList<>();
content.add(item);
request.setContent(content);
// 執行寫入
WriteResponse resp = client.write("your-instance-id", "u2i_table", request);
// 檢查寫入結果
System.out.println("Code: " + resp.getCode()); // 成功時返回 "OK"
System.out.println("Message: " + resp.getMessage());操作召回結果(Record)
RecallResponse.getResult() 返回的 Record 對象提供了一系列鏈式操作方法,用於對召回結果進行排序、過濾、截斷等處理。
介面
public Record sort(String name, boolean desc);
public Record retain(int count);
public Record filter(String name);
public Record filterByColumnValue(String name, Predicate<Object> predicate);
public Record filterByValues(Predicate<Map<String, Object>> predicate);
public Record random();
public List<Object> columnValues(String columnName);
public List<String> columnValuesString(String columnName);
public List<String> fieldNames();
public int size();方法說明
方法 | 說明 |
sort(name, desc) | 按指定列排序。desc 為 true 時降序,false 時升序。 |
retain(count) | 保留前 count 條記錄。 |
filter(name) | 按指定列進行去重。 |
filterByColumnValue(name, predicate) | 按指定列的值進行條件過濾,保留滿足 predicate 的記錄。 |
filterByValues(predicate) | 按全行資料進行條件過濾,predicate 接收每行的 Map<String, Object>。 |
random() | 將記錄隨機洗牌。 |
columnValues(columnName) | 擷取指定列的所有值。 |
columnValuesString(columnName) | 擷取指定列的所有非Null 字元串值。 |
fieldNames() | 擷取所有欄位名。 |
size() | 擷取目前記錄數量。 |
說明
sort、retain、filter、filterByColumnValue、filterByValues、random方法均返回this,支援鏈式調用。Record不是安全執行緒的,請勿在多線程環境下並行作業同一個 Record 執行個體。
樣本
RecallResponse resp = client.recall(request);
Record result = resp.getResult();
// 按 score 列降序排序,去重後取前10條
Record processed = result
.sort("score", true)
.filter("item_id")
.retain(10);
System.out.println("Processed records: " + processed.size());
// 擷取 item_id 列的所有值
List<String> itemIds = processed.columnValuesString("item_id");
System.out.println("Item IDs: " + itemIds);
// 按條件過濾:僅保留 score > 0.5 的記錄
Record filtered = result.filterByColumnValue("score", value -> {
if (value instanceof Number) {
return ((Number) value).doubleValue() > 0.5;
}
return false;
});
// 按全行條件過濾
Record filtered2 = result.filterByValues(row -> {
Object category = row.get("category");
return category != null && "video".equals(category.toString());
});完整樣本
以下是一個完整的召回和寫入操作樣本:
import com.aliyun.openservices.pairec.recallengine.*;
import java.util.*;
public class RecallEngineDemo {
public static void main(String[ ] args) throws RecallEngineException {
// 1. 初始化用戶端
String endpoint = System.getenv("RECALL_ENGINE_SERVICE_ENDPOINT");
String username = System.getenv("RECALL_ENGINE_SERVICE_USERNAME");
String password = System.getenv("RECALL_ENGINE_SERVICE_PASSWORD");
RecallEngineClient client = new RecallEngineClient(endpoint, username, password);
client.withRetryTimes(2)
String instanceId = System.getenv("INSTANCE_ID");
// 2. 寫入資料
WriteRequest writeRequest = new WriteRequest();
writeRequest.setRequestId("write-req-001");
List<Map<String, Object>> content = new ArrayList<>();
Map<String, Object> item = new HashMap<>();
item.put("user_id", "123");
item.put("item_id", "item_456");
item.put("score", 0.95);
content.add(item);
writeRequest.setContent(content);
WriteResponse writeResp = client.write(instanceId, "u2i_table", writeRequest);
System.out.println("Write result: " + writeResp.getCode());
// 3. 召回資料
RecallRequest recallRequest = new RecallRequest();
recallRequest.setInstanceId(instanceId);
recallRequest.setService("recall_test");
recallRequest.setVersion("V1");
recallRequest.setUid("123");
Map<String, RecallConf> recalls = new HashMap<>();
recalls.put("u2i_recall", new RecallConf("123", 100));
recallRequest.setRecalls(recalls);
RecallResponse recallResp = client.recall(recallRequest);
Record result = recallResp.getResult();
// 4. 處理召回結果
System.out.println("Total records: " + result.size());
System.out.println("Field names: " + result.fieldNames());
// 按 score 降序排序,取前10條
Record top10 = result.sort("score", true).retain(10);
System.out.println("Top 10 records: " + top10.toString());
// 擷取 item_id 列表
List<String> itemIds = top10.columnValuesString("item_id");
System.out.println("Top 10 item IDs: " + itemIds);
}
}異常處理
所有召回和寫入操作都可能拋出 RecallEngineException,建議在調用時進行異常捕獲處理:
try {
RecallResponse resp = client.recall(request);
// 處理結果
} catch (RecallEngineException e) {
System.err.println("Recall failed: " + e.getMessage());
}