全部產品
Search
文件中心

DataHub:API參考

更新時間:Jul 13, 2024

API參考

一. 介面規範

1.公用請求Header

名字

描述

x-datahub-client-version

API版本資訊

x-datahub-security-token

Security Token,如果存在

Date

標準GMT時間,格式:EEE, dd MMM yyyy HH:mm:ss z

Authorization

簽名資訊

Content-Type

傳輸資料序列化協議

2. 公用返回Header

名字

描述

Content-Type

傳輸資料序列化協議

Content-Length

傳輸資料長度

x-datahub-request-id

全域唯一請求ID

3. 錯誤碼

名字

描述

備忘

InvalidParameter

參數錯誤

InvalidCursor

Cursor無效

NoSuchXXX

資源不存在

XXXAlreadyExist

資源已存在

Unauthorized

認證失敗

AccessKey資訊錯誤,使用者時間戳記不對等

NoPermission

鑒權失敗

RAM許可權錯誤

OperationDenied

禁止操作

操作禁止,比如刪除有topic存在的project

LimitExceeded

流控受限

服務端QPS、流量等限制

InvalidShardOperation

Shard分裂或合并,變成Sealed

MalformedRecord

資料格式不正確

OffsetReseted

點位重設

OffsetSessionChanged

SubId被其他使用者open佔用

SubscriptionOffline

訂閱下線

InternalServerError

系統內部錯誤

其他

其他非可重試異常,後續可能會被回收

4. 統一錯誤返回格式

名字

描述

ErrorCode

錯誤碼

ErrorMessage

詳細錯誤描述資訊

錯誤響應樣本:

    HTTP/1.1 403
    x-datahub-request-id: 2018050817492199d6650a00000039
    Content-Type: application/json
    Content-Length: xxx

    {
        "ErrorCode": "Unauthorized",
        "ErrorMessage": "Authroize failed"
    }

5. 限制描述

名字

描述

ProjectName

長度:[3, 32],僅包含字母、數字和'_', 以字母開頭,不區分大小寫

TopicName

長度:[3, 128],僅包含字母、數字和'_', 以字母開頭,不區分大小寫

二、Authorization欄位計算的方法

Authorization = "DATAHUB " + AccessId + ":" + Signature
Signature = base64(hmac-sha1(AccessKey,
            HTTPMethod + "\n"
            + Content-Type + "\n"
            + Date + "\n"
            + CanonicalizedDataHubHeaders + "\n"
            + CanonicalizedResource))

  • AccessKey 表示簽名所需的密鑰。

  • HTTPMethod 表示HTTP 要求的Method,主要有PUT、GET、POST、HEAD、DELETE等。

  • \n 表示分行符號。

  • Content-Type 表示請求內容的類型,一般固定為“application/json”。

  • Date 表示此次操作的時間,且必須為GMT格式,如“Sun, 22 Nov 2015 08:16:38 GMT”。

  • CanonicalizedDataHubHeaders 表示以 x-datahub- 為首碼的HTTP Header的字典序排列。

  • CanonicalizedResource 表示使用者想要訪問的DataHub資源的Url,若包含param,必須按字典序。

CanonicalizedDataHubHeaders構造方法

所有以 x-datahub- 為首碼的HTTP Header被稱為 CanonicalizedDataHubHeaders。構造方法如下:

  1. 將所有以 x-datahub- 為首碼的HTTP要求標頭的名字轉換成小寫 。如X-DATAHUB-Client-Version:1.1需要轉換成x-datahub-client-version:1.1

  2. 如果請求是以STS獲得的AccessKeyId和AccessKeySecret發送時,還需要將獲得的security-token值以x-datahub-security-token:token的形式加入到簽名字串中。

  3. 將上一步得到的所有HTTP要求標頭按照名字的字典序進行升序排列。

  4. 刪除要求標頭和內容之間分隔字元兩端出現的任何空格。如x-datahub-client-versionn : 1.1轉換成:x-datahub-client-version:1.1

  5. 將每一個頭和內容用 \n 分隔字元分隔拼成最後的CanonicalizedDataHubHeaders

CanonicalizedResource構造方法

使用者發送請求中想訪問的DataHub目標資源被稱為CanonicalizedResource。構造方法如下:

  1. CanonicalizedResource置成Null 字元串 “”;

  2. 放入要訪問的DataHub資源,比如某個topic: /projects/<ProjectName>/topics/<TopicName>

  3. 如果請求的資源套件含額外的URL參數,按照字典序,從小到大排列並以 & 為分隔字元產生參數字串。在CanonicalizedResource字串尾添加 ?和參數字串。此時的CanonicalizedResource如:/projects/<ProjectName>/topics/<TopicName>/connectors/sink_odps?donetime

計算簽名頭規則

  • 簽名的字串必須為 UTF-8 格式。含有中文字元的簽名字串必須先進行 UTF-8 編碼,再與AccessKey計算最終簽名。

  • 簽名的方法用RFC 2104中定義的HMAC-SHA1方法,其中Key為 AccessKey

  • x-datahub-開頭的header在簽名驗證前需要符合以下規範:

  • header的名字需要變成小寫。

  • header按字典序自小到大排序。

  • 分割header name和value的冒號前後不能有空格。

  • 每個Header之後都有一個分行符號“\n”,如果沒有Header,CanonicalizedDataHubHeaders就設定為空白。

簽名樣本

請求

簽名字串計算公式

簽名字串

POST /projects/test_project/topics/test_topic HTTP/1.1 Host: https://dh-cn-hangzhou.aliyuncs.com

User-Agent: customer x-datahub-client-version: 1.1 Content-Type: application/json Date: Thu, 10 Jan 2019 07:28:29 GMT

Signature = base64(hmac-sha1(AccessKey,HTTPMethod + “\n” + Content-Type + “\n” + Date + “\n” + CanonicalizedDataHubHeaders+ CanonicalizedResource))

POST\napplication/json\nThu, 10 Jan 2019 07:28:29 GMT\nx-datahub-client-version:1.1\n/projects/test_project/topics/test_topic

python計算簽名方法如下:

import base64
import hmac
import sha
h = hmac.new("****your accessKey*****",
"POST\napplication/json\nThu, 10 Jan 2019 07:28:29 GMT\nx-datahub-client-version:1.1\n/projects/test_project/topics/test_topic", sha)
Signature = base64.b64encode(h.digest())
print("Signature: %s" % Signature)

要求標頭部樣本:

Authorization值格式:DATAHUB AccessId:Signature

POST /projects/test_project/topics/test_topic HTTP/1.1
Authorization: DATAHUB AccessId:Signature
Content-Type: application/json
Date: Thu, 10 Jan 2019 07:28:29 GMT
Host: http://dh-cn-hangzhou.aliyuncs.com
User-Agent: customer
x-datahub-client-version: 1.1

Java 8 簽名參考範例

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

public abstract class Authorization {
    private static final String DEFAULT_ENCODING = "UTF-8";
    private static final String DEFAULT_HASH = "HmacSHA1";
    private static final String X_DATAHUB_PREFIX = "x-datahub";
    private static final String HEADER_CONTENT_TYPE = "Content-Type";
    private static final String HEADER_DATE = "Date";

    static public String getAkAuthorization(Request request) {
        String canonicalURL = request.getUrlPath();
        String canonicalQueryString = request.getQueryStrings();
        String canonicalHeaderString = getCanonicalHeaders(
                getSortedHeadersToSign(request.getHeaders()));

        String canonicalRequest = request.getMethod().toUpperCase() + "\n" +
                canonicalHeaderString + "\n" + canonicalURL;
        if (canonicalQueryString != null && !canonicalQueryString.isEmpty()) {
            canonicalRequest += "?" + canonicalQueryString;
        }

        String signature = HMAC1Sign(request.getAccessKey(), canonicalRequest);

        return "DATAHUB " + request.getAccessId() + ":" + signature;
    }

    static private String HMAC1Sign(String accessKey, String canonicalRequest) {
        try {
            SecretKeySpec signingKey = new SecretKeySpec(accessKey.getBytes(), DEFAULT_HASH);
            Mac mac = Mac.getInstance(DEFAULT_HASH);
            mac.init(signingKey);
            return Base64.getEncoder().encodeToString(
                    mac.doFinal(canonicalRequest.getBytes(DEFAULT_ENCODING))
            ).trim();
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    static private String getCanonicalHeaders(Map<String, String> headers) {
        StringBuilder sb = new StringBuilder();
        Iterator<Map.Entry<String, String>> pairs = headers.entrySet().iterator();
        while (pairs.hasNext()) {
            Map.Entry<String, String> pair = pairs.next();
            if (pair.getKey().startsWith(X_DATAHUB_PREFIX)) {
                sb.append(pair.getKey());
                sb.append(":");
                sb.append(pair.getValue());
            } else {
                sb.append(pair.getValue());
            }

            if (pairs.hasNext()) {
                sb.append("\n");
            }
        }
        return sb.toString();
    }

    static private SortedMap<String, String> getSortedHeadersToSign(Map<String, String> headers) {
        SortedMap<String, String> sortedHeaders = new TreeMap<>();

        for (Map.Entry<String, String> entry : headers.entrySet()) {
            String lowerKey = entry.getKey().toLowerCase();
            if (lowerKey.equalsIgnoreCase(HEADER_CONTENT_TYPE) ||
                    lowerKey.equalsIgnoreCase(HEADER_DATE) ||
                    lowerKey.startsWith(X_DATAHUB_PREFIX)) {
                if (!entry.getValue().isEmpty()) {
                    sortedHeaders.put(lowerKey, entry.getValue());
                }
            }
        }

        if (!sortedHeaders.containsKey(HEADER_CONTENT_TYPE.toLowerCase())) {
            sortedHeaders.put(HEADER_CONTENT_TYPE.toLowerCase(), "");
        }

        return sortedHeaders;
    }

    public static class Request {
        private String accessId;
        private String accessKey;
        private String urlPath;
        private String method;
        private Map<String, String> headers;
        private String queryStrings;

        public String getAccessId() {
            return accessId;
        }

        public Request setAccessId(String accessId) {
            this.accessId = accessId;
            return this;
        }

        public String getAccessKey() {
            return accessKey;
        }

        public Request setAccessKey(String accessKey) {
            this.accessKey = accessKey;
            return this;
        }

        public String getUrlPath() {
            return urlPath;
        }

        public Request setUrlPath(String urlPath) {
            this.urlPath = urlPath;
            return this;
        }

        public String getMethod() {
            return method;
        }

        public Request setMethod(String method) {
            this.method = method;
            return this;
        }

        public Map<String, String> getHeaders() {
            return headers;
        }

        public Request setHeaders(Map<String, String> headers) {
            this.headers = headers;
            return this;
        }

        public String getQueryStrings() {
            return queryStrings;
        }

        public Request setQueryStrings(String queryStrings) {
            this.queryStrings = queryStrings;
            return this;
        }
    }

    public static void main(String[] args) {
        Map<String, String> headers = new HashMap<>();
        headers.put("Date", "Thu, 10 Jan 2019 07:28:29 GMT");
        headers.put("x-datahub-client-version", "1.1");
        headers.put("Content-type", "application/json");

        String accessId = "testKeyID";
        String accessKey = "testKeySecret";
        String method = "POST";
        String path = "/projects/test_project/topics/test_topic";
        String canonicalQueryString = ""; //字典序 a=x&b=y

        Authorization.Request authRequest = new Authorization.Request()
                .setAccessId(accessId)
                .setAccessKey(accessKey)
                .setMethod(method.toUpperCase())
                .setUrlPath(path)
                .setHeaders(headers)
                .setQueryStrings(canonicalQueryString);
        System.out.println(Authorization.getAkAuthorization(authRequest));
    }
}

三、介面定義

建立Project

請求

請求文法

  POST /projects/<ProjectName> HTTP/1.1

請求元素

名稱

類型

描述

Comment

String

描述資訊,限制1024位元組

響應

響應文法

  HTTP/1.1 201 Created

樣本

請求樣本

  POST /projects/<ProjectName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx

  {
      "Comment": "test project"
  }

響應樣本

  HTTP/1.1 201 Created
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Length: 0

查詢Project

請求

請求文法

  GET /projects/<ProjectName> HTTP/1.1

響應

響應文法

  HTTP/1.1 200 OK

響應元素

名稱

類型

描述

CreateTime

long

建立時間,單位:秒

LastModifyTime

long

更新時間,單位:秒

Comment

String

描述資訊

樣本

請求樣本

  GET /projects/<ProjectName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Content-Length: xxx

  {
      "Comment": "test project",
      "CreateTime": 1525763481,
      "LastModifyTime": 1525763481
  }

查詢Project列表

請求

請求文法

  GET /projects HTTP/1.1

響應

響應文法

  HTTP/1.1 200 OK

響應元素

名稱

類型

描述

ProjectNames

List

Project名稱列表

樣本

請求樣本

  GET /projects HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
      "ProjectNames": [
          "project1",
          "projcet2"
      ]
  }

更新Project

請求

請求文法

  PUT /projects/<ProjectName> HTTP/1.1

請求元素

名稱

類型

描述

Comment

String

描述資訊

響應

響應文法

  HTTP/1.1 200 OK

樣本

請求樣本

  PUT /projects/<ProjectName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx

  {
      "Comment": "update comment"
  }

響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Length: 0

刪除Project

請求

請求文法

  DELETE /projects/<ProjectName> HTTP/1.1

響應

響應文法

  HTTP/1.1 200 OK

樣本

請求樣本

  DELETE /projects/<ProjectName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Length: 0

建立Topic

請求

請求文法

  POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1

請求元素

名稱

類型

描述

Action

String

操作類型

ShardCount

int

初始shard數目

Lifecycle

int

資料存放區生命週期

RecordType

String

BLOB(非結構化資料)/TUPLE(結構化資料)

RecordSchema

String

建立TUPLE類型topic時需指定schema, BLOB類型時,不傳該參數

Comment

String

描述資訊

ExpandMode

String

開啟擴充模式傳值extend,其他情況不需要該參數。

響應

響應文法

  HTTP/1.1 201 Created

樣本

請求樣本

  POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx

  {
      "Action": "create",
      "ShardCount": 1,
      "Lifecycle": 1,
      "RecordType": "TUPLE",
      "RecordSchema": "{\"fields\":[{\"name\":\"field1\",\"type\":\"STRING\"},{\"name\":\"field2\",\"type\":\"BIGINT\"}]}}",
      "Comment": "create topic",
      "ExpandMode": "extend"
  }

響應樣本

  HTTP/1.1 201 Created
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Content-Length: 0

查詢Topic

請求

請求文法

  GET /projects/<ProjectName>/topics/<TopicName> HTTP/1.1

響應

響應文法

  HTTP/1.1 200 OK

響應元素

名稱

類型

描述

ShardCount

int

初始shard數目

Lifecycle

int

資料存放區生命週期

RecordType

String

BLOB(非結構化資料)/TUPLE(結構化資料)

RecordSchema

String

建立TUPLE類型topic時需指定schema, BLOB類型時,不傳該參數

Comment

String

描述資訊

CreateTime

long

建立時間

LastModifyTime

long

更新時間

樣本

請求樣本

  GET /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

響應樣本

  HTTP/1.1 201 Created
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Content-Length: xxx

  {
      "ShardCount": 1,
      "Lifecycle": 1,
      "RecordType": "TUPLE",
      "RecordSchema": "{\"fields\":[{\"name\":\"field1\",\"type\":\"STRING\"},{\"name\":\"field2\",\"type\":\"BIGINT\"}]}",
      "Comment": "create topic",
      "CreateTime": 1525763481,
      "LastModifyTime": 1525763481
  }

查詢Topic列表

請求

請求文法

  GET /projects/<ProjectName>/topics HTTP/1.1

響應

響應文法

  HTTP/1.1 200 OK

響應元素

名稱

類型

描述

TopicNames

List

Project名稱列表

樣本

請求樣本

  GET /projects/<ProjectNames>/topics HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
      "TopicNames": [
          "topic1",
          "topic2"
      ]
  }

更新Topic

請求

請求文法

  PUT /projects/<ProjectName>/topics/<TopicName> HTTP/1.1

請求元素

名稱

類型

描述

Comment

String

描述資訊

響應

響應文法

  HTTP/1.1 200 OK

樣本

請求樣本

  PUT /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  
  {
          "Comment": "update comment"
  }

刪除Topic

請求

請求文法

  DELETE /projects/<ProjectName>/topics/<TopicName> HTTP/1.1

響應

響應文法

  HTTP/1.1 200 OK

樣本

請求樣本

  DELETE /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Length: 0

擷取Shard列表

請求

請求文法

  GET /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1

響應

響應文法

  HTTP/1.1 200 OK

響應元素

名稱

類型

描述

ShardId

String

Shard Id

State

String

Shard目前狀態,包括OPENING,ACTIVE,CLOSED等狀態

BeginHashKey

String

起始HashKey

EndHashKey

String

終止HashKey

ParentShardIds

List

Shard分裂或合并之前的父Shard資訊

樣本

請求樣本

  GET /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
      "Shards": [
              {
                      "ShardId": "0",
                      "State": "ACTIVE",
                      "BeginHashKey":"00000000000000000000000000000000",
                      "EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                      "ParentShardIds:[]
              }
      ]
  }

分裂Shard

請求

請求文法

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1

  • 請求元素

名稱

類型

描述

Action

String

Action為:split

ShardId

String

需要分裂的Shard Id

SplitKey

String

按照此Key進行Split,SplitKey一般等於 BeginHashKey + (EndHashKey - BeginHashKey) / 2

響應

響應文法

  HTTP/1.1 200 OK

響應元素

名稱

類型

描述

NewShards

List

分裂後的Shard列表

樣本

請求樣本

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  
  {
          "Action": "split",
          "ShardId": "0",
          "SplitKye": "7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
  }

響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
          "NewShards": [
                  {
                          "ShardId": "1",
                          "BeginHashKey":"00000000000000000000000000000000",
                          "EndHashKey":"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
                  },
                  {
                          "ShardId":"0",
                          "BeginHashKey":"7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                          "EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
                  }
          ]
  }

合并Shard

請求

請求文法

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1

請求元素

名稱

類型

描述

Action

String

Action為:merge

ShardId

String

需要合并的Shard Id

AdjacentShardId

String

臨近的並且滿足合并條件的Shard Id

響應

響應文法

  HTTP/1.1 200 OK

響應元素

名稱

類型

描述

ShardId

String

合并後的Shard Id

BeginHashKey

String

起始HashKey

EndHashKey

String

終止HashKey

樣本

請求樣本

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  
  {
          "Action": "merge",
          "ShardId": "0",
          "AdjacentShardId": "1"
  }

響應樣本

  HTTP/1.1 200 OK
        x-datahub-request-id: 2018050817492199d6650a00000039
        Content-Type: applicaton/json
        Conent-Length: xxx
        
        {
                "ShardId":"2",
                "BeginHashKey":"00000000000000000000000000000000",
                "EndHashKey":"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
        }

查詢資料Cursor

請求

請求文法

  POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1

請求元素

名稱

類型

描述

Action

String

Action為:cursor

Type

String

按照何種類型擷取Cursor,包括:OLDEST, LATEST, SYSTEM_TIME, SEQUENCE

SystemTime

Int64

Type為SYSTEM_TIME時填寫,單位ms

Sequence

Int64

Type為SEQUENCE時填寫

響應

響應文法

  HTTP/1.1 200 OK

響應元素

名稱

類型

描述

Cursor

String

返回的Cursor資訊

RecordTime

Int64

資料寫入DataHub時間, 單位ms

Sequence

Int64

資料寫入的Sequence,單Shard內唯一

樣本

請求樣本

  POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: application/json
  Content-Length: xxx
  
  {
          "Action": "cursor",
          "Type": "SEQUENCE",
          "Sequence": 1
  }

響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
          "Cursor": "30005af19b3800000000000000000000",
          "RecordTime": 1525783352873,
          "Sequence": 1
  }

寫入資料 - 不按shard寫入

請求

請求文法

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1

請求元素

名稱

類型

描述

Action

String

Action為:pub

ShardId

String

ShardId

Attributes

Map<String, String>

使用者屬性欄位

Data

如果為BLOB類型,Data為資料Base64編碼後資料;如果為TUPLE資料,為String類型數組

響應

響應文法

  HTTP/1.1 200 OK

響應元素

名稱

類型

描述

FailedRecordCount

Int

失敗條數

FailedRecords

Array

失敗詳情

樣本

請求樣本

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
          "Action": "pub",
          "Records": [
                  {
                          "ShardId": "0",
                          "Attributes": {
                                  "attr1": "value1",
                                  "attr2": "value2"
                          },
                          "Data": ["A","B","3","4"]
                  }
          ]
  }

  // BLOB
  {
          "Action": "pub",
          "Record": [
                  {
                          "ShardId": "0",
                          "Attributes": {
                                  "attr1": "value1",
                                  "attr2": "value2"
                          },
                          "Data": "Base64String"
                  }
          ]
  }

  • 響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
      "FailedRecordCount": 1,
      "FailedRecords": [
                      {
                              "Index": 0,
                              "ErrorCode": "errorCode",
                              "ErrorMessage": "errormsg"
                      }
      ]
  }

讀取資料

請求

  • 請求文法

  POST /projects/<ProjectName>/topics/<TopicName>/shards/<ShardId> HTTP/1.1

  • 請求元素

名稱

類型

描述

Action

String

Action為:sub

Cursor

String

從Cursor位置開始讀取

Limit

Int

讀取的條數

響應

  • 響應文法

  HTTP/1.1 200 OK

  • 響應元素

名稱

類型

描述

NextCursor

String

下一條資料的Cursor資訊

SystemTime

Int64

record寫入DataHub的時間,單位ms

Cursor

String

record對應的Cursor資訊

Sequence

Int64

record寫入DataHub的Sequence

Attributes

Map

使用者的屬性欄位

Data

使用者的資料欄位

樣本

  • 請求樣本

  POST /projects/<ProjectName>/topics/<TopicName>/shards HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "sub",
          "Cursor": "30005af19b3800000000000000000000",
          "Limit": 1
  }

  • 響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
      "NextCursor": "30005af19b3800000000000000090001",
      "Records": [
                      {
                              "Cursor": "30005af19b3800000000000000000000",
                              "SystemTime": 1525783352873,
                              "Sequence": 1,
                              "Attributes": {
                                      "key1": "value1",
                                      "key2": "value2"
                              },
                              "Data": ["AAA", "100"]
                      }
      ]
  }

新增Field

請求

  • 請求文法

  POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1

  • 請求元素

名稱

類型

描述

Action

String

Action為:appendfield

FieldName

String

Field名稱

FieldType

String

Field類型,包括STRING, BIGINT等

響應

  • 響應文法

  HTTP/1.1 200 OK

樣本

  • 請求樣本

  POST /projects/<ProjectName>/topics/<TopicName> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "appendfield",
          "FieldName": "field1",
          "FieldType": "BIGINT"
  }

建立Connector

請求

  • 請求文法

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1

  • 請求元素

名稱

類型

描述

Type

String

Connector類型,如SINK_ODPS等

ColumnFields

Array

需同步的Field列表

Config

Map

Connector相關配置

響應

  • 響應文法

  HTTP/1.1 201 Created

樣本

  • 請求樣本,以SINK_ODPS為例

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/sink_odps HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Type": "SINK_ODPS",
          "ColumnFields": ["field1", "field2"],
          "Config": {
                  "Project": "odpsProject",
                  "Topic": "odpsTopic",
                  "OdpsEndpoint": "xxx",
                  "TunnelEndpoint": "xxx",
                  "AccessId": "xxx",
                  "AccessKey": "xxx",
                  "PartitionMode": "SYSTEM_TIME",
                  "TimeRange": 60,
                  "PartitionConfig": {
                          "pt": "%Y%m%d",
                          "ct": "%H%M"
                  }
          }
  }

查詢Connector

請求

  • 請求文法

  GET /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1

響應

  • 響應文法

  HTTP/1.1 200 OK

查詢Connector列表

請求

  • 請求文法

  GET /projects/<ProjectName>/topics/<TopicName>/connectors HTTP/1.1

響應

  • 響應文法

  HTTP/1.1 200 OK

樣本

  • 請求樣本

  GET /projects/<ProjectName>/topics/<TopicName>/connectors HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

  • 響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
          "Connectors": [
                  "sink_odps", "sink_ads"
          ]
  }

刪除Connector

請求

  • 請求文法

  DELETE /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1

響應

  • 響應文法

  HTTP/1.1 200 OK

Reload Connector

請求

  • 請求文法

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1

  • 請求元素

名稱

類型

描述

Action

String

Action為:reload

ShardId

String

不設定則Reload整個connector

響應

  • 響應文法

  HTTP/1.1 200 OK

樣本

  • 請求樣本

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "reload"
  }

擷取Connector Shard狀態資訊

請求

  • 請求文法

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1

  • 請求元素

名稱

類型

描述

Action

String

Action為:status

ShardId

String

擷取此ShardId的狀態資訊

響應

  • 響應文法

  HTTP/1.1 200 OK

  • 響應元素

名稱

類型

描述

CurrentSequence

Int64

當前處理點位資訊

State

Enum

當前Shard的運行狀態

LastErrorMessage

String

當State不為CONTEXT_EXECUTING時,返回錯誤資訊

DiscardCount

Int64

從connector運行到現在丟棄的資料總條數

樣本

  • 請求樣本

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "status",
          "ShardId": "0"
  }

  • 響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx

  {
          "State": "CONTEXT_EXECUTING",
          "CurrentSequence": 10,
          "DiscardCount": 0,
          "LastErrorMessage": ""
  }

Append Connector Field

請求

  • 請求文法

POST /projects//topics//connectors/ HTTP/1.1
  • 請求元素

名稱

類型

描述

Action

String

Action為:appendfield

FieldName

String

Field名稱

響應

  • 響應文法

  HTTP/1.1 200 OK

  • 請求樣本

  POST /projects/<ProjectName>/topics/<TopicName>/connectors/<ConnectorType> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "appendfiled",
          "FieldName": "field1"
  }

建立訂閱

請求

  • 請求文法

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1

  • 請求元素

名稱

類型

描述

Action

String

Action為:create

Comment

String

描述資訊

響應

  • 響應文法

  HTTP/1.1 201 Created

  • 響應元素

名稱

類型

描述

SubId

String

訂閱Id

樣本

  • 請求樣本

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "create",
          "Comment": "xxxx"
  }

  • 響應樣本

  HTTP/1.1 200 OK
        x-datahub-request-id: 2018050817492199d6650a00000039
         Content-Type: applicaton/json
  Conent-Length: xxx

        {
          "SubId": "1542078393028fzsZx"
  }

查詢訂閱

請求

  • 請求文法

  GET /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1

響應

  • 響應文法

  HTTP/1.1 200 OK

  • 響應元素

名稱

類型

描述

SubId

String

訂閱Id

State

Int

0: online, 1: offline

Comment

String

描述資訊

樣本

  • 請求樣本

  GET /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT

  • 響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "SubId": "xxxx",
          "Comment": "xxxx"
          "State": 1
  }

查詢訂閱列表

請求

  • 請求文法

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1

  • 請求元素

名稱

類型

描述

Action

String

Action為:list

PageIndex

Int

分頁Index

PageSize

Int

分頁Size

響應

  • 響應文法

  HTTP/1.1 200 OK

  • 響應元素

名稱

類型

描述

TotalCount

Int

Page總數

  • 請求樣本

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "list",
          "PageIndex": 1,
          "PageSize": 10
  }

  • 響應樣本

  HTTP/1.1 200 OK
  x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Subscriptions": [
                  {
                           "Comment": "xxxx",
                            "State": 1,
                            "SubId": "1542079169844gH8HM"
                  },
                  {
                          "Comment": "xxxx",
                          "State": 1,
                          "SubId": "1542078393028fzsZx"
                  }
          ],
          "TotalCount": 2
  }

刪除訂閱

請求

  • 請求文法

  DELETE /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1

樣本

  • 請求樣本

  DELETE /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString

更新訂閱狀態

請求

  • 請求文法

  PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1

響應

  • 響應文法

  HTTP/1.1 200 OK

樣本

  • 請求樣本

  PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId> HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "State": 0
  }

open點位session

請求

  • 請求文法

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1

  • 請求元素

名稱

類型

描述

Action

String

Action為:open

ShardIds

Array

Shard列表

響應

  • 響應文法

  HTTP/1.1 200 OK

  • 響應元素

名稱

類型

描述

Timestamp

Int64

點位時間戳記,單位ms

Sequence

Int64

點位Sequence

Version

Int64

Session VersionId

SessionId

String

SessionId

樣本

  • 請求樣本

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "open",
          "ShardIds": ["0"]
  }

  • 響應樣本

  HTTP/1.1 200 OK
        x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Offsets": {
                  "0": {
                          "Timestamp": 1000,
                          "Sequence": 1,
                          "Version": 1,
                          "SessionId": "xxx"
                  }
          }
  }

查詢點位

請求

  • 請求文法

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1

  • 請求元素

名稱

類型

描述

Action

String

Action為:get

ShardIds

Array

Shard列表

響應

  • 響應文法

  HTTP/1.1 200 OK

  • 響應元素

名稱

類型

描述

Timestamp

Int64

點位時間戳記,單位ms

Sequence

Int64

點位Sequence

Version

Int64

Session VersionId

SessionId

String

SessionId

樣本

  • 請求樣本

  POST /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "get",
          "ShardIds": ["0"]
  }

  • 響應樣本

  HTTP/1.1 200 OK
        x-datahub-request-id: 2018050817492199d6650a00000039
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Offsets": {
                  "0": {
                          "Timestamp": 1000,
                          "Sequence": 1,
                          "Version": 1,
                          "SessionId": "xxx"
                  }
          }
  }

提交點位

請求

  • 請求文法

  PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1

  • 請求元素

名稱

類型

描述

Action

String

Action為:commit

Timestamp

Int64

點位時間戳記,單位ms

Sequence

Int64

點位Sequence

Version

Int64

Session VersionId

SessionId

Int64

SessionId

響應

  • 響應文法

  HTTP/1.1 200 OK

樣本

  • 請求樣本

  PUT /projects/<ProjectName>/topics/<TopicName>/subscriptions/<SubId>/offsets HTTP/1.1
  x-datahub-client-version: 1.1
  Date: Tue, 08 May 2018 09:47:48 GMT
  Authorization: AuthorizationString
  Content-Type: applicaton/json
  Conent-Length: xxx
  
  {
          "Action": "commit",
          "Offsets": {
                  "0": {
                          "Timestamp": 1000,
                          "Sequence": 1,
                          "Version": 1,
                          "SessionId": 1
                  }
          }
  }