API參考
一. 介面規範
1.公用請求Header
名字 | 描述 |
x-datahub-client-version | API版本資訊 |
x-datahub-security-token | Security Token,如果存在 |
Date | 標準GMT時間,格式:EEE, dd MMM yyyy HH:mm:ss z |
Authorization | 簽名資訊 |
Content-Type | 傳輸資料序列化協議 |
2. 公用返回Header
名字 | 描述 |
Content-Type | 傳輸資料序列化協議 |
Content-Length | 傳輸資料長度 |
x-datahub-request-id | 全域唯一請求ID |
3. 錯誤碼
名字 | 描述 | 備忘 |
InvalidParameter | 參數錯誤 | |
InvalidCursor | Cursor無效 | |
NoSuchXXX | 資源不存在 | |
XXXAlreadyExist | 資源已存在 | |
Unauthorized | 認證失敗 | AccessKey資訊錯誤,使用者時間戳記不對等 |
NoPermission | 鑒權失敗 | RAM許可權錯誤 |
OperationDenied | 禁止操作 | 操作禁止,比如刪除有topic存在的project |
LimitExceeded | 流控受限 | 服務端QPS、流量等限制 |
InvalidShardOperation | Shard分裂或合并,變成Sealed | |
MalformedRecord | 資料格式不正確 | |
OffsetReseted | 點位重設 | |
OffsetSessionChanged | SubId被其他使用者open佔用 | |
SubscriptionOffline | 訂閱下線 | |
InternalServerError | 系統內部錯誤 | |
其他 | 其他非可重試異常,後續可能會被回收 |
4. 統一錯誤返回格式
名字 | 描述 |
ErrorCode | 錯誤碼 |
ErrorMessage | 詳細錯誤描述資訊 |
錯誤響應樣本:
HTTP/1.1 403
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: application/json
Content-Length: xxx
{
"ErrorCode": "Unauthorized",
"ErrorMessage": "Authroize failed"
}5. 限制描述
名字 | 描述 |
ProjectName | 長度:[3, 32],僅包含字母、數字和'_', 以字母開頭,不區分大小寫 |
TopicName | 長度:[3, 128],僅包含字母、數字和'_', 以字母開頭,不區分大小寫 |
二、Authorization欄位計算的方法
Authorization = "DATAHUB " + AccessId + ":" + Signature
Signature = base64(hmac-sha1(AccessKey,
HTTPMethod + "\n"
+ Content-Type + "\n"
+ Date + "\n"
+ CanonicalizedDataHubHeaders + "\n"
+ CanonicalizedResource))AccessKey表示簽名所需的密鑰。HTTPMethod表示HTTP 要求的Method,主要有PUT、GET、POST、HEAD、DELETE等。\n表示分行符號。Content-Type表示請求內容的類型,一般固定為“application/json”。Date表示此次操作的時間,且必須為GMT格式,如“Sun, 22 Nov 2015 08:16:38 GMT”。CanonicalizedDataHubHeaders表示以x-datahub-為首碼的HTTP Header的字典序排列。CanonicalizedResource表示使用者想要訪問的DataHub資源的Url,若包含param,必須按字典序。
CanonicalizedDataHubHeaders構造方法
所有以 x-datahub- 為首碼的HTTP Header被稱為 CanonicalizedDataHubHeaders。構造方法如下:
將所有以
x-datahub-為首碼的HTTP要求標頭的名字轉換成小寫 。如X-DATAHUB-Client-Version:1.1需要轉換成x-datahub-client-version:1.1。如果請求是以STS獲得的AccessKeyId和AccessKeySecret發送時,還需要將獲得的security-token值以
x-datahub-security-token:token的形式加入到簽名字串中。將上一步得到的所有HTTP要求標頭按照名字的字典序進行升序排列。
刪除要求標頭和內容之間分隔字元兩端出現的任何空格。如
x-datahub-client-versionn : 1.1轉換成:x-datahub-client-version:1.1。將每一個頭和內容用
\n分隔字元分隔拼成最後的CanonicalizedDataHubHeaders。
CanonicalizedResource構造方法
使用者發送請求中想訪問的DataHub目標資源被稱為CanonicalizedResource。構造方法如下:
將
CanonicalizedResource置成Null 字元串 “”;放入要訪問的DataHub資源,比如某個topic:
/projects/<ProjectName>/topics/<TopicName>如果請求的資源套件含額外的URL參數,按照字典序,從小到大排列並以 & 為分隔字元產生參數字串。在CanonicalizedResource字串尾添加 ?和參數字串。此時的CanonicalizedResource如:
/projects/<ProjectName>/topics/<TopicName>/connectors/sink_odps?donetime
計算簽名頭規則
簽名的字串必須為 UTF-8 格式。含有中文字元的簽名字串必須先進行 UTF-8 編碼,再與
AccessKey計算最終簽名。簽名的方法用RFC 2104中定義的HMAC-SHA1方法,其中Key為
AccessKey。以
x-datahub-開頭的header在簽名驗證前需要符合以下規範:header的名字需要變成小寫。
header按字典序自小到大排序。
分割header name和value的冒號前後不能有空格。
每個Header之後都有一個分行符號“\n”,如果沒有Header,CanonicalizedDataHubHeaders就設定為空白。
簽名樣本:
請求 | 簽名字串計算公式 | 簽名字串 |
POST /projects/test_project/topics/test_topic HTTP/1.1 Host: https://dh-cn-hangzhou.aliyuncs.com User-Agent: customer x-datahub-client-version: 1.1 Content-Type: application/json Date: Thu, 10 Jan 2019 07:28:29 GMT | Signature = base64(hmac-sha1(AccessKey,HTTPMethod + “\n” + Content-Type + “\n” + Date + “\n” + CanonicalizedDataHubHeaders+ CanonicalizedResource)) | POST\napplication/json\nThu, 10 Jan 2019 07:28:29 GMT\nx-datahub-client-version:1.1\n/projects/test_project/topics/test_topic |
python計算簽名方法如下:
import base64
import hmac
import sha
h = hmac.new("****your accessKey*****",
"POST\napplication/json\nThu, 10 Jan 2019 07:28:29 GMT\nx-datahub-client-version:1.1\n/projects/test_project/topics/test_topic", sha)
Signature = base64.b64encode(h.digest())
print("Signature: %s" % Signature)要求標頭部樣本:
Authorization值格式:DATAHUB AccessId:Signature
POST /projects/test_project/topics/test_topic HTTP/1.1
Authorization: DATAHUB AccessId:Signature
Content-Type: application/json
Date: Thu, 10 Jan 2019 07:28:29 GMT
Host: http://dh-cn-hangzhou.aliyuncs.com
User-Agent: customer
x-datahub-client-version: 1.1Java 8 簽名參考範例
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
public abstract class Authorization {
private static final String DEFAULT_ENCODING = "UTF-8";
private static final String DEFAULT_HASH = "HmacSHA1";
private static final String X_DATAHUB_PREFIX = "x-datahub";
private static final String HEADER_CONTENT_TYPE = "Content-Type";
private static final String HEADER_DATE = "Date";
static public String getAkAuthorization(Request request) {
String canonicalURL = request.getUrlPath();
String canonicalQueryString = request.getQueryStrings();
String canonicalHeaderString = getCanonicalHeaders(
getSortedHeadersToSign(request.getHeaders()));
String canonicalRequest = request.getMethod().toUpperCase() + "\n" +
canonicalHeaderString + "\n" + canonicalURL;
if (canonicalQueryString != null && !canonicalQueryString.isEmpty()) {
canonicalRequest += "?" + canonicalQueryString;
}
String signature = HMAC1Sign(request.getAccessKey(), canonicalRequest);
return "DATAHUB " + request.getAccessId() + ":" + signature;
}
static private String HMAC1Sign(String accessKey, String canonicalRequest) {
try {
SecretKeySpec signingKey = new SecretKeySpec(accessKey.getBytes(), DEFAULT_HASH);
Mac mac = Mac.getInstance(DEFAULT_HASH);
mac.init(signingKey);
return Base64.getEncoder().encodeToString(
mac.doFinal(canonicalRequest.getBytes(DEFAULT_ENCODING))
).trim();
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
static private String getCanonicalHeaders(Map<String, String> headers) {
StringBuilder sb = new StringBuilder();
Iterator<Map.Entry<String, String>> pairs = headers.entrySet().iterator();
while (pairs.hasNext()) {
Map.Entry<String, String> pair = pairs.next();
if (pair.getKey().startsWith(X_DATAHUB_PREFIX)) {
sb.append(pair.getKey());
sb.append(":");
sb.append(pair.getValue());
} else {
sb.append(pair.getValue());
}
if (pairs.hasNext()) {
sb.append("\n");
}
}
return sb.toString();
}
static private SortedMap<String, String> getSortedHeadersToSign(Map<String, String> headers) {
SortedMap<String, String> sortedHeaders = new TreeMap<>();
for (Map.Entry<String, String> entry : headers.entrySet()) {
String lowerKey = entry.getKey().toLowerCase();
if (lowerKey.equalsIgnoreCase(HEADER_CONTENT_TYPE) ||
lowerKey.equalsIgnoreCase(HEADER_DATE) ||
lowerKey.startsWith(X_DATAHUB_PREFIX)) {
if (!entry.getValue().isEmpty()) {
sortedHeaders.put(lowerKey, entry.getValue());
}
}
}
if (!sortedHeaders.containsKey(HEADER_CONTENT_TYPE.toLowerCase())) {
sortedHeaders.put(HEADER_CONTENT_TYPE.toLowerCase(), "");
}
return sortedHeaders;
}
public static class Request {
private String accessId;
private String accessKey;
private String urlPath;
private String method;
private Map<String, String> headers;
private String queryStrings;
public String getAccessId() {
return accessId;
}
public Request setAccessId(String accessId) {
this.accessId = accessId;
return this;
}
public String getAccessKey() {
return accessKey;
}
public Request setAccessKey(String accessKey) {
this.accessKey = accessKey;
return this;
}
public String getUrlPath() {
return urlPath;
}
public Request setUrlPath(String urlPath) {
this.urlPath = urlPath;
return this;
}
public String getMethod() {
return method;
}
public Request setMethod(String method) {
this.method = method;
return this;
}
public Map<String, String> getHeaders() {
return headers;
}
public Request setHeaders(Map<String, String> headers) {
this.headers = headers;
return this;
}
public String getQueryStrings() {
return queryStrings;
}
public Request setQueryStrings(String queryStrings) {
this.queryStrings = queryStrings;
return this;
}
}
public static void main(String[] args) {
Map<String, String> headers = new HashMap<>();
headers.put("Date", "Thu, 10 Jan 2019 07:28:29 GMT");
headers.put("x-datahub-client-version", "1.1");
headers.put("Content-type", "application/json");
String accessId = "testKeyID";
String accessKey = "testKeySecret";
String method = "POST";
String path = "/projects/test_project/topics/test_topic";
String canonicalQueryString = ""; //字典序 a=x&b=y
Authorization.Request authRequest = new Authorization.Request()
.setAccessId(accessId)
.setAccessKey(accessKey)
.setMethod(method.toUpperCase())
.setUrlPath(path)
.setHeaders(headers)
.setQueryStrings(canonicalQueryString);
System.out.println(Authorization.getAkAuthorization(authRequest));
}
}三、介面定義
建立Project
請求
請求文法
POST /projects/<ProjectName> HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Comment | String | 描述資訊,限制1024位元組 |
響應
響應文法
HTTP/1.1 201 Created樣本
請求樣本
POST /projects/<ProjectName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Comment": "test project"
}響應樣本
HTTP/1.1 201 Created
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Length: 0查詢Project
請求
請求文法
GET /projects/<ProjectName> HTTP/1.1響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
CreateTime | long | 建立時間,單位:秒 |
LastModifyTime | long | 更新時間,單位:秒 |
Comment | String | 描述資訊 |
樣本
請求樣本
GET /projects/<ProjectName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Content-Length: xxx
{
"Comment": "test project",
"CreateTime": 1525763481,
"LastModifyTime": 1525763481
}查詢Project列表
請求
請求文法
GET /projects HTTP/1.1響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
ProjectNames | List | Project名稱列表 |
樣本
請求樣本
GET /projects HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"ProjectNames": [
"project1",
"projcet2"
]
}更新Project
請求
請求文法
PUT /projects/<ProjectName> HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Comment | String | 描述資訊 |
響應
響應文法
HTTP/1.1 200 OK樣本
請求樣本
PUT /projects/<ProjectName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Comment": "update comment"
}響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Length: 0刪除Project
請求
請求文法
DELETE /projects/<ProjectName> HTTP/1.1響應
響應文法
HTTP/1.1 200 OK樣本
請求樣本
DELETE /projects/<ProjectName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Length: 0建立Topic
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | 操作類型 |
ShardCount | int | 初始shard數目 |
Lifecycle | int | 資料存放區生命週期 |
RecordType | String | BLOB(非結構化資料)/TUPLE(結構化資料) |
RecordSchema | String | 建立TUPLE類型topic時需指定schema, BLOB類型時,不傳該參數 |
Comment | String | 描述資訊 |
ExpandMode | String | 開啟擴充模式傳值extend,其他情況不需要該參數。 |
響應
響應文法
HTTP/1.1 201 Created樣本
請求樣本
POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Action": "create",
"ShardCount": 1,
"Lifecycle": 1,
"RecordType": "TUPLE",
"RecordSchema": "{\"fields\":[{\"name\":\"field1\",\"type\":\"STRING\"},{\"name\":\"field2\",\"type\":\"BIGINT\"}]}}",
"Comment": "create topic",
"ExpandMode": "extend"
}響應樣本
HTTP/1.1 201 Created
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Content-Length: 0查詢Topic
請求
請求文法
GET /projects/<ProjectName>/topics/<TopicName> HTTP/1.1響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
ShardCount | int | 初始shard數目 |
Lifecycle | int | 資料存放區生命週期 |
RecordType | String | BLOB(非結構化資料)/TUPLE(結構化資料) |
RecordSchema | String | 建立TUPLE類型topic時需指定schema, BLOB類型時,不傳該參數 |
Comment | String | 描述資訊 |
CreateTime | long | 建立時間 |
LastModifyTime | long | 更新時間 |
樣本
請求樣本
GET /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString響應樣本
HTTP/1.1 201 Created
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Content-Length: xxx
{
"ShardCount": 1,
"Lifecycle": 1,
"RecordType": "TUPLE",
"RecordSchema": "{\"fields\":[{\"name\":\"field1\",\"type\":\"STRING\"},{\"name\":\"field2\",\"type\":\"BIGINT\"}]}",
"Comment": "create topic",
"CreateTime": 1525763481,
"LastModifyTime": 1525763481
}查詢Topic列表
請求
請求文法
GET /projects/<ProjectName>/topics HTTP/1.1響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
TopicNames | List | Project名稱列表 |
樣本
請求樣本
GET /projects/<ProjectNames>/topics HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"TopicNames": [
"topic1",
"topic2"
]
}更新Topic
請求
請求文法
PUT /projects/<ProjectName>/topics/<TopicName> HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Comment | String | 描述資訊 |
響應
響應文法
HTTP/1.1 200 OK樣本
請求樣本
PUT /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Comment": "update comment"
}刪除Topic
請求
請求文法
DELETE /projects/<ProjectName>/topics/<TopicName> HTTP/1.1響應
響應文法
HTTP/1.1 200 OK樣本
請求樣本
DELETE /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Length: 0擷取Shard列表
請求
請求文法
GET /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
ShardId | String | Shard Id |
State | String | Shard目前狀態,包括OPENING,ACTIVE,CLOSED等狀態 |
BeginHashKey | String | 起始HashKey |
EndHashKey | String | 終止HashKey |
ParentShardIds | List | Shard分裂或合并之前的父Shard資訊 |
樣本
請求樣本
GET /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"Shards": [
{
"ShardId": "0",
"State": "ACTIVE",
"BeginHashKey":"00000000000000000000000000000000",
"EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
"ParentShardIds:[]
}
]
}分裂Shard
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:split |
ShardId | String | 需要分裂的Shard Id |
SplitKey | String | 按照此Key進行Split,SplitKey一般等於 BeginHashKey + (EndHashKey - BeginHashKey) / 2 |
響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
NewShards | List | 分裂後的Shard列表 |
樣本
請求樣本
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Action": "split",
"ShardId": "0",
"SplitKye": "7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
}響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"NewShards": [
{
"ShardId": "1",
"BeginHashKey":"00000000000000000000000000000000",
"EndHashKey":"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
},
{
"ShardId":"0",
"BeginHashKey":"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
"EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
}
]
}合并Shard
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:merge |
ShardId | String | 需要合并的Shard Id |
AdjacentShardId | String | 臨近的並且滿足合并條件的Shard Id |
響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
ShardId | String | 合并後的Shard Id |
BeginHashKey | String | 起始HashKey |
EndHashKey | String | 終止HashKey |
樣本
請求樣本
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Action": "merge",
"ShardId": "0",
"AdjacentShardId": "1"
}響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"ShardId":"2",
"BeginHashKey":"00000000000000000000000000000000",
"EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
}查詢資料Cursor
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:cursor |
Type | String | 按照何種類型擷取Cursor,包括:OLDEST, LATEST, SYSTEM_TIME, SEQUENCE |
SystemTime | Int64 | Type為SYSTEM_TIME時填寫,單位ms |
Sequence | Int64 | Type為SEQUENCE時填寫 |
響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
Cursor | String | 返回的Cursor資訊 |
RecordTime | Int64 | 資料寫入DataHub時間, 單位ms |
Sequence | Int64 | 資料寫入的Sequence,單Shard內唯一 |
樣本
請求樣本
POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: application/json
Content-Length: xxx
{
"Action": "cursor",
"Type": "SEQUENCE",
"Sequence": 1
}響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"Cursor": "30005af19b3800000000000000000000",
"RecordTime": 1525783352873,
"Sequence": 1
}寫入資料 - 不按shard寫入
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:pub |
ShardId | String | ShardId |
Attributes | Map<String, String> | 使用者屬性欄位 |
Data | 如果為BLOB類型,Data為資料Base64編碼後資料;如果為TUPLE資料,為String類型數組 |
響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
FailedRecordCount | Int | 失敗條數 |
FailedRecords | Array | 失敗詳情 |
樣本
請求樣本
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "pub",
"Records": [
{
"ShardId": "0",
"Attributes": {
"attr1": "value1",
"attr2": "value2"
},
"Data": ["A","B","3","4"]
}
]
} // BLOB
{
"Action": "pub",
"Record": [
{
"ShardId": "0",
"Attributes": {
"attr1": "value1",
"attr2": "value2"
},
"Data": "Base64String"
}
]
}響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"FailedRecordCount": 1,
"FailedRecords": [
{
"Index": 0,
"ErrorCode": "errorCode",
"ErrorMessage": "errormsg"
}
]
}讀取資料
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:sub |
Cursor | String | 從Cursor位置開始讀取 |
Limit | Int | 讀取的條數 |
響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
NextCursor | String | 下一條資料的Cursor資訊 |
SystemTime | Int64 | record寫入DataHub的時間,單位ms |
Cursor | String | record對應的Cursor資訊 |
Sequence | Int64 | record寫入DataHub的Sequence |
Attributes | Map | 使用者的屬性欄位 |
Data | 使用者的資料欄位 |
樣本
請求樣本
POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "sub",
"Cursor": "30005af19b3800000000000000000000",
"Limit": 1
}響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"NextCursor": "30005af19b3800000000000000090001",
"Records": [
{
"Cursor": "30005af19b3800000000000000000000",
"SystemTime": 1525783352873,
"Sequence": 1,
"Attributes": {
"key1": "value1",
"key2": "value2"
},
"Data": ["AAA", "100"]
}
]
}新增Field
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:appendfield |
FieldName | String | Field名稱 |
FieldType | String | Field類型,包括STRING, BIGINT等 |
響應
響應文法
HTTP/1.1 200 OK樣本
請求樣本
POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "appendfield",
"FieldName": "field1",
"FieldType": "BIGINT"
}建立Connector
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Type | String | Connector類型,如SINK_ODPS等 |
ColumnFields | Array | 需同步的Field列表 |
Config | Map | Connector相關配置 |
響應
響應文法
HTTP/1.1 201 Created樣本
請求樣本,以SINK_ODPS為例
POST /projects/<ProjectName>/topics/<TopicName>/connectors/sink_odps HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Type": "SINK_ODPS",
"ColumnFields": ["field1", "field2"],
"Config": {
"Project": "odpsProject",
"Topic": "odpsTopic",
"OdpsEndpoint": "xxx",
"TunnelEndpoint": "xxx",
"AccessId": "xxx",
"AccessKey": "xxx",
"PartitionMode": "SYSTEM_TIME",
"TimeRange": 60,
"PartitionConfig": {
"pt": "%Y%m%d",
"ct": "%H%M"
}
}
}查詢Connector
請求
請求文法
GET /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1響應
響應文法
HTTP/1.1 200 OK查詢Connector列表
請求
請求文法
GET /projects/<ProjectName>/topics/<TopicName>/connectors HTTP/1.1響應
響應文法
HTTP/1.1 200 OK樣本
請求樣本
GET /projects/<ProjectName>/topics/<TopicName>/connectors HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"Connectors": [
"sink_odps", "sink_ads"
]
}刪除Connector
請求
請求文法
DELETE /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1響應
響應文法
HTTP/1.1 200 OKReload Connector
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:reload |
ShardId | String | 不設定則Reload整個connector |
響應
響應文法
HTTP/1.1 200 OK樣本
請求樣本
POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "reload"
}擷取Connector Shard狀態資訊
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:status |
ShardId | String | 擷取此ShardId的狀態資訊 |
響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
CurrentSequence | Int64 | 當前處理點位資訊 |
State | Enum | 當前Shard的運行狀態 |
LastErrorMessage | String | 當State不為CONTEXT_EXECUTING時,返回錯誤資訊 |
DiscardCount | Int64 | 從connector運行到現在丟棄的資料總條數 |
樣本
請求樣本
POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "status",
"ShardId": "0"
}響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"State": "CONTEXT_EXECUTING",
"CurrentSequence": 10,
"DiscardCount": 0,
"LastErrorMessage": ""
}Append Connector Field
請求
請求文法
POST /projects//topics//connectors/ HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:appendfield |
FieldName | String | Field名稱 |
響應
響應文法
HTTP/1.1 200 OK請求樣本
POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "appendfiled",
"FieldName": "field1"
}建立訂閱
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:create |
Comment | String | 描述資訊 |
響應
響應文法
HTTP/1.1 201 Created響應元素
名稱 | 類型 | 描述 |
SubId | String | 訂閱Id |
樣本
請求樣本
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "create",
"Comment": "xxxx"
}響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"SubId": "1542078393028fzsZx"
}查詢訂閱
請求
請求文法
GET /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
SubId | String | 訂閱Id |
State | Int | 0: online, 1: offline |
Comment | String | 描述資訊 |
樣本
請求樣本
GET /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"SubId": "xxxx",
"Comment": "xxxx"
"State": 1
}查詢訂閱列表
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:list |
PageIndex | Int | 分頁Index |
PageSize | Int | 分頁Size |
響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
TotalCount | Int | Page總數 |
請求樣本
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "list",
"PageIndex": 1,
"PageSize": 10
}響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"Subscriptions": [
{
"Comment": "xxxx",
"State": 1,
"SubId": "1542079169844gH8HM"
},
{
"Comment": "xxxx",
"State": 1,
"SubId": "1542078393028fzsZx"
}
],
"TotalCount": 2
}刪除訂閱
請求
請求文法
DELETE /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1樣本
請求樣本
DELETE /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString更新訂閱狀態
請求
請求文法
PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1響應
響應文法
HTTP/1.1 200 OK樣本
請求樣本
PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"State": 0
}open點位session
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:open |
ShardIds | Array | Shard列表 |
響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
Timestamp | Int64 | 點位時間戳記,單位ms |
Sequence | Int64 | 點位Sequence |
Version | Int64 | Session VersionId |
SessionId | String | SessionId |
樣本
請求樣本
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "open",
"ShardIds": ["0"]
}響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"Offsets": {
"0": {
"Timestamp": 1000,
"Sequence": 1,
"Version": 1,
"SessionId": "xxx"
}
}
}查詢點位
請求
請求文法
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:get |
ShardIds | Array | Shard列表 |
響應
響應文法
HTTP/1.1 200 OK響應元素
名稱 | 類型 | 描述 |
Timestamp | Int64 | 點位時間戳記,單位ms |
Sequence | Int64 | 點位Sequence |
Version | Int64 | Session VersionId |
SessionId | String | SessionId |
樣本
請求樣本
POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "get",
"ShardIds": ["0"]
}響應樣本
HTTP/1.1 200 OK
x-datahub-request-id: 2018050817492199d6650a00000039
Content-Type: applicaton/json
Conent-Length: xxx
{
"Offsets": {
"0": {
"Timestamp": 1000,
"Sequence": 1,
"Version": 1,
"SessionId": "xxx"
}
}
}提交點位
請求
請求文法
PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1請求元素
名稱 | 類型 | 描述 |
Action | String | Action為:commit |
Timestamp | Int64 | 點位時間戳記,單位ms |
Sequence | Int64 | 點位Sequence |
Version | Int64 | Session VersionId |
SessionId | Int64 | SessionId |
響應
響應文法
HTTP/1.1 200 OK樣本
請求樣本
PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
x-datahub-client-version: 1.1
Date: Tue, 08 May 2018 09:47:48 GMT
Authorization: AuthorizationString
Content-Type: applicaton/json
Conent-Length: xxx
{
"Action": "commit",
"Offsets": {
"0": {
"Timestamp": 1000,
"Sequence": 1,
"Version": 1,
"SessionId": 1
}
}
}