全部產品
Search
文件中心

DataHub:Java High-Level SDK

更新時間:Jul 17, 2025

介紹

Java 的 High-Level sdk 一般 稱為 client-library,主要分為 Producer 和 Consumer,下面會介紹 Producer 和 Consumer 的相關參數和一些常見的用法。

身份認證

AccessKey(簡稱AK)是阿里雲提供給阿里雲使用者的存取金鑰,用於訪問阿里雲OpenAPI時的身分識別驗證。AccessKey包括AccessKey ID和AccessKey Secret,需妥善保管。AK如果泄露,會威脅該帳號下所有資源的安全。訪問阿里雲OpenAPI時,如果在代碼中寫入程式碼明文AK,容易因代碼倉庫許可權管理不當造成AK泄露。

Alibaba Cloud Credentials是阿里雲為阿里雲開發人員使用者提供的身份憑證管理工具。配置了Credentials預設憑據鏈後,訪問阿里雲OpenAPI時,您無需在代碼中寫入程式碼明文AK,可有效保證您帳號下雲資源的安全。

前提條件

配置方案

本文樣本的全部通採用配置環境變數方式擷取 AK 資訊,更多方式請訪問管理訪問憑據

重要

使用設定檔的方案時,請確保您系統中不存在環境變數ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。否則,設定檔將不生效。

阿里雲SDK支援通過定義ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET環境變數來建立預設的訪問憑證。調用介面時,程式直接存取憑證,讀取您的存取金鑰(即AccessKey)並自動完成鑒權。

配置方法

配置環境變數ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

Linux和macOS系統配置方法

執行以下命令:

export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>

<access_key_id>需替換為已準備好的AccessKey ID,<access_key_secret>替換為AccessKey Secret。

Windows系統配置方法

  1. 建立環境變數檔案,添加環境變數ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,並寫入已準備好的AccessKey ID和AccessKey Secret。

  2. 重啟Windows系統。

程式碼範例

EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

Producer 介紹

限制說明

  • Producer 是安全執行緒的,理論上同一個進程內一個 topic 只需要有一個 Producer 即可

參數介紹

Producer 的參數都通過 ProducerConfig 來設定,如果要設定參數maxAsyncThreadNum,需要調用 ProducerConfig 的 setMaxAsyncThreadNum。

參數名稱

類型

是否必須

預設值

描述

maxAsyncThreadNum

16

發送資料的線程池大小

userAgent

String

dcl-xxx

maxRetryCount

int

1

最大重試次數

maxRetryIntervalMs

int

1000

可重試錯誤的稍候再試,不包含限流報錯

maxRetryIntervalMsForLimit

int

100

寫資料被限流後的稍候再試

ProducerInterceptor

Object

-

寫入資料的時候可以添加攔截器做額外的處理,例如:添加額外的 attribute 資訊

HttpConfig

Object

-

Http 相關預設值較多,建議直接查看代碼

maxAsyncBufferRecords

int

INT_MAX

非同步發送時,最大攢批的資料條數,一般通過 size 控制,所以這裡預設值為 INT_MAX

maxAsyncBufferTimeMs

long

10000

非同步發送時,最長緩衝時間

maxAsyncBufferSize

long

4 * 1024 * 1024

非同步發送時,最大攢批 size

maxAsyncQueueSize

long

16

非同步發送時,攢批完成正在發送的請求數,超過會阻塞發送介面,主要是防止 OOM

useTTFormat

enableHeartbeat

bool

false

是否發送心跳包,一般情況下都不需要開啟

heartbeatGenerator

Object

DefaultBlobHeartbeatGenerator

如果開啟發送心跳,如果使用者佈建了那麼會優先使用使用者佈建的heartbeatGenerator,沒有設定預設會使用DefaultBlobHeartbeatGenerator,

Producer 樣本

相關依賴

<!-- 零信任憑證相關 -->
<dependency>
	<groupId>com.aliyun</groupId>
	<artifactId>credentials-java</artifactId>
	<version>1.0.2</version>
</dependency>

<dependency>
	<groupId>com.aliyun.datahub</groupId>
	<artifactId>datahub-client-library</artifactId>
	<version>1.4.11</version>
</dependency>

非同步寫入(推薦)

非同步寫入的好處是不需要使用者攢批,並且攢批的方式可以通過參數進行設定,可以參考上面參數介紹進行調優。

public static void main(String[] args) throws InterruptedException {
	// 通過環境變數擷取AK資訊
	EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

	String endpoint ="https://dh-cn-hangzhou.aliyuncs.com";
	String projectName = "test_project";
	String topicName = "test_topic";

	// 初始化Producer,這裡直接使用預設配置
	ProducerConfig config = new ProducerConfig(endpoint, provider);
	DatahubProducer producer = new DatahubProducer(projectName, topicName, config);

	RecordSchema schema = producer.getTopicSchema();
	// 如果開啟了多version schema,這裡也可以擷取指定version的schema
	// RecordSchema schema = producer.getTopicSchema(3);

	// 對於非同步寫入,可以根據需要來選擇是否註冊回呼函數
	WriteCallback callback = new WriteCallback() {
		@Override
		public void onSuccess(String shardId, List<RecordEntry> records, long elapsedTimeMs, long sendTimeMs) {
			System.out.println("write success");
		}

		@Override
		public void onFailure(String shardId, List<RecordEntry> records, long elapsedTimeMs, DatahubClientException e) {
			System.out.println("write failed");
		}
	};

	for (int i = 0; i < 10000; ++i) {
		try {
            // generate data by schema
            TupleRecordData data = new TupleRecordData(schema);
            data.setField("field1", "hello");
            data.setField("field2", 1234);
            RecordEntry recordEntry = new RecordEntry();
            recordEntry.setRecordData(data);

            producer.sendAsync(recordEntry, callback);
            // 如果不需要關心資料是否發送成功,那麼就不需要註冊回調,直接發送
            // producer.sendAsync(recordEntry, null);
        } catch (DatahubClientException e) {
            // TODO 處理異常,一般是不可重試錯誤或者超過重試次數;
            Thread.sleep(1000);
        }
	}

	// 保證退出前,資料全部被發送完
	producer.flush(true);
	producer.close();
}

Hash 寫入

如果資料有保序的需求,那麼需要根據一些資訊進行 hash,相同 hash 值的資料會寫入到同一個 shard,單個 shard 的資料是可以保證順序的,一般 hash 寫入建議使用非同步方式寫入。

public static void main(String[] args) throws InterruptedException {
    // 通過環境變數擷取AK資訊
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";

    // 初始化Producer,這裡直接使用預設配置
    ProducerConfig config = new ProducerConfig(endpoint, provider);
    DatahubProducer producer = new DatahubProducer(projectName, topicName, config);


    RecordSchema schema = producer.getTopicSchema();
    // 如果開啟了多version schema,這裡也可以擷取指定version的schema
    // RecordSchema schema = producer.getTopicSchema(3);

    // 對於非同步寫入,可以註冊回呼函數
    WriteCallback callback = new WriteCallback() {
        @Override
        public void onSuccess(String shardId, List<RecordEntry> records, long elapsedTimeMs, long sendTimeMs) {
            System.out.println("write success");
        }

        @Override
        public void onFailure(String shardId, List<RecordEntry> records, long elapsedTimeMs, DatahubClientException e) {
            System.out.println("write failed");
        }
    };

    for (int i = 0; i < 10000; ++i) {
        try {
            // generate data by schema
            TupleRecordData data = new TupleRecordData(schema);
            data.setField("field1", "hello");
            data.setField("field2", 1234);
            RecordEntry recordEntry = new RecordEntry();
            recordEntry.setRecordData(data);
            // 給每條資料設定hash的內容
            recordEntry.setHashKey("test" + i);

            producer.sendAsync(recordEntry, callback, DefaultRecordPartitioner.INSTANCE);
            // 如果不需要關心資料是否發送成功,那麼就不需要註冊回調,直接發送
            // producer.sendAsync(recordEntry, null, DefaultRecordPartitioner.INSTANCE);
        } catch (DatahubClientException e) {
            // TODO 處理異常,一般是不可重試錯誤或者超過重試次數;
            Thread.sleep(1000);
        }
    }

    // 保證退出前,資料全部被發送完
    producer.flush(true);
    producer.close();
}

同步寫入

如果想自己控制攢批的方式,那可以採用同步寫入的方式。

public static void main(String[] args) throws InterruptedException {
    // 通過環境變數擷取AK資訊
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";

    // 初始化Producer,這裡直接使用預設配置
    ProducerConfig config = new ProducerConfig(endpoint, provider);
    DatahubProducer producer = new DatahubProducer(projectName, topicName, config);


    RecordSchema schema = producer.getTopicSchema();
    // 如果開啟了多version schema,這裡也可以擷取指定version的schema
    // RecordSchema schema = producer.getTopicSchema(3);

    List<RecordEntry> recordEntryList = new ArrayList<>();
    for (int i = 0; i < 1000; ++i) {
        // generate data by schema
        TupleRecordData data = new TupleRecordData(schema);
        data.setField("field1", "hello");
        data.setField("field2", 1234);
        RecordEntry recordEntry = new RecordEntry();
        recordEntry.setRecordData(data);
        recordEntryList.add(recordEntry);
    }

    // 寫入失敗會拋異常,一般是不可重試錯誤,或者是可重試錯誤超過了重試次數
    try {
        String shardId = producer.send(recordEntryList);
        System.out.println("write success, shardId: " + shardId);
    } catch (DatahubClientException e) {
        // TODO 處理異常,一般是不可重試錯誤或者超過重試次數;
    }

    producer.close();
}

Consumer 介紹

Consumer 用於資料的消費,可以自動分配 shard,一般稱為協同消費,具體介紹可以參考 協同消費

Consumer 實際請求是批量讀取,緩衝到本地,然後介面層面是單條資料返回的。

點位維護

Consumer 可以自動維護點位資訊,在啟動時會自動擷取服務端儲存的點位,然後從上次儲存的點位開始繼續消費,在消費的過程中,會周期性的(預設是 10 秒)把用戶端的資料點位提交到服務端。具體實現邏輯如下

每一條資料的點位都會對應一個 RecordKey 對象,消費完一條資料後,可以對 RecordKey 進行 ack 操作,ack 後表示這條資料已經消費完,可以更新點位,也可以選擇自動 ack。用戶端讀到資料以後,會把每一條資料的對應的RecordKey按順序維護到一個 queue 中。後台有一個提交點位到服務端的周期任務,每次會檢查隊列,如果隊首RecordKey 已經 ack,那麼會被彈出隊列,一直到隊首RecordKey 沒有被 ack 為止,那麼當前隊首的點位上一個點位就是本次需要提交到服務端的點位。

常見問題說明

1、 如果用戶端消費了某條資料,但是點位沒有來得及提交到服務端,這條資料是否會重複消費?

會,但是一般只會發生在異常退出的情況下,正常的調用 close 退出,是可以保證當前點位被提交的。

2、如果有三條資料,點位分別是 1~3,2 因為某些原因沒有 ack,但是 1 和 3 已經 ack,這個時候點位會更新到多少?

點位更新到 1,1 已經 ack,2 此時位於隊首不會被彈出,所以此時 點位會一直卡在 1。

3、如果某一條資料已經通過 read 讀出來,但是一直沒有 ack,這個時候 Consumer 是否會再次 read 到這條資料?

不會,並且位點也會一直卡住不更新,所以使用者必須保證每一條 read 的資料一定被 ack。如果某一條資料超過一定時間(預設是 60s),繼續調用 read 會拋出異常。

限制說明

  • Consumer 是安全執行緒的,一個進程每個 topic 只需要有一個 Consumer 對象即可

  • Consumer 數量一般不要超過 shard 數量,如果 Consumer 多於 shard 數,那麼會有 Consumer 因為分配不到 shard 一直在空跑,但是有其他 Consumer 退出以後,空跑的 Consumer 就因為可以拿到 shard 開始正常運行。

  • 指定 shard 列表消費,使用同一個訂閱 id 的不同 Consumer 不能消費同一個 shard

參數介紹

參數名稱

類型

是否必須

預設值

描述

maxAsyncThreadNum

16

讀取資料的線程池大小

userAgent

String

dcl-xxx

maxRetryCount

int

1

最大重試次數

maxRetryIntervalMs

int

1000

可重試錯誤的稍候再試,不包含限流報錯

maxRetryIntervalMsForLimit

int

100

讀資料被限流後的稍候再試

ProducerInterceptor

Object

-

讀取資料的時候可以添加攔截器做額外的處理,例如:過濾掉一些敏感資訊

HttpConfig

Object

-

Http 相關預設值較多,建議直接查看代碼

balanceRead

bool

false

true 表示在當前 consumer 所消費的 shard 中依次發送讀請求;false 表示在當前 consumer 所消費的 shard 中選擇點位最老的 shard 發送讀請求,主要是防止在資料扭曲情境下 shard 點位差距過大

autoCommit

bool

true

是否自動 ack 資料:true 表示資料 read 到以後自動 ack;false 表示資料 read 到以後,需要手動調用一下 RecordEntry.getKey().ack(),否則點位不會往前推動

sessionTimeoutMs

long

60000

consumer 會話最大時間,consumer 需要和服務端一直發送心跳來保證活躍,超過這個時間沒有發送心跳,會被服務端視為退出 consumer group,這個 consumer 的 shard 會被分配給其他 consumer

heartbeatRetryCount

int

1

Consumer 發送心跳來保證活躍,心跳發送失敗時的重試次數。

fetchNumber

int

500

單次請求讀取的最巨量資料條數

maxBufferRecords

int

500

本機快取資料條數,不足會向服務端發起請求,設定過大有可能會導致 OOM。

Consumer 樣本

協同消費(推薦)

協同消費就是 Consumer Group,服務端會給每個節點動態分配需要消費的 shard,使用者只需要關心資料處理即可,不需要關心點位維護和 shard 分配等事項。

自動 ack 消費

每條資料 read 到以後,就會自動 ack 確認,即表示可以更新點位,有一定可能造成資料丟失。

public static void main(String[] args) throws InterruptedException {
    // 通過環境變數擷取AK資訊
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";
    String subId = "1747966903774M787N";

    ConsumerConfig config = new ConsumerConfig(endpoint, provider);
    DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);

    while (true) {
        RecordEntry recordEntry = null;
        try {
            recordEntry = consumer.read(5000);
            if (recordEntry != null) {
                TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
                // handle data
                System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2"));
            }
        } catch (DatahubClientException e) {
            // TODO 處理異常,一般是不可重試錯誤或者超過重試次數;
        }
    }
}

手動 ack 消費

如果每一條資料都要求必須消費完了,才能提交點位,那麼推薦關閉 autoCommit,每條資料手動 ack。

public static void main(String[] args) throws InterruptedException {
  // 通過環境變數擷取AK資訊
  EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

  String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
  String projectName = "test_project";
  String topicName = "test_topic";
  String subId = "1747966903774M787N";

  ConsumerConfig config = new ConsumerConfig(endpoint, provider);
  // 設定資料消費成功後手動ack
  config.setAutoCommit(false);
  DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);

  while (true) {
    RecordEntry recordEntry = null;
    try {
      recordEntry = consumer.read(5000);
      if (recordEntry != null) {
        TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
        // handle data
        System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2"));
      }
    } catch (DatahubClientException e) {
      // TODO 處理異常,一般是不可重試錯誤或者超過重試次數;
    } finally {
      if (recordEntry != null) {
        // 處理完每條資料都要進行ack,否則點位無法推進
        recordEntry.getKey().ack();
      }
    }
  }
}

指定 shard 消費

指定 shard 消費,是需要使用者自己維護 shard 的分配,使用同一個訂閱 id 的不同 Consumer 不能消費同一個 shard ,否則會無法消費。這裡只給出一個自動 ack 的樣本,手動 ack 可以參考上面的樣本。

public static void main(String[] args) throws InterruptedException {
    // 通過環境變數擷取AK資訊
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";
    String subId = "1747966903774M787N";
    List<String> shardIds = Arrays.asList("0", "1");

    ConsumerConfig config = new ConsumerConfig(endpoint, provider);
    // 用戶端指定好需要消費的shard列表
    DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, shardIds, config);

    while (true) {
        RecordEntry recordEntry = null;
        try {
            recordEntry = consumer.read(5000);
            if (recordEntry != null) {
                TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
                // handle data
                System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2"));
            }
        } catch (DatahubClientException e) {
            // TODO 處理異常,一般是不可重試錯誤或者超過重試次數;
        }
    }
}