概要
Java 高レベル SDK は、一般的にクライアントライブラリと呼ばれ、主に Producer と Consumer コンポーネントに分かれています。このドキュメントでは、両方のコンポーネントに関連するパラメーターと一般的な使用パターンについて説明します。
認証
AccessKey ペアは Alibaba Cloud によって提供され、API 操作を呼び出す際に ID 認証を完了するために使用されます。各 AccessKey ペアは、AccessKey ID と AccessKey シークレットで構成されます。AccessKey ペアは機密に保つ必要があります。AccessKey ペアが漏洩すると、アカウント内のすべてのリソースのセキュリティが脅かされます。Alibaba Cloud OpenAPI にアクセスする際、コードに AccessKey ペアをハードコーディングすると、不適切なリポジトリの権限管理により AccessKey が漏洩する可能性があります。
Alibaba Cloud Credentials は、Alibaba Cloud が開発者向けに提供する ID 認証情報管理ツールです。Credentials のデフォルトの認証情報チェーンを構成すると、Alibaba Cloud OpenAPI にアクセスする際にコードに AccessKey ペアをハードコーディングする必要がなくなり、アカウント内のクラウドリソースのセキュリティを効果的に確保できます。
前提条件
Resource Access Management (RAM) ユーザーの AccessKey ID と AccessKey シークレットを取得済みであること。詳細については、「RAM ユーザーの AccessKey 情報を表示する」をご参照ください。
Cloud SDK Credentials 依存関係を追加済みであること。(Credentials の 最新バージョン の使用を推奨します):
構成オプション
このドキュメントでは、環境変数を使用して AccessKey 情報を取得します。詳細については、「アクセス認証情報を管理する」をご参照ください。
構成ファイルオプションを使用する場合、環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET がシステムに存在しないことを確認してください。存在する場合、構成ファイルは有効になりません。
Alibaba Cloud SDK は、環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET を定義することで、デフォルトのアクセス認証情報を作成することをサポートしています。API 操作を呼び出すと、システムはデフォルトの認証情報から AccessKey ペアを読み取り、その AccessKey ペアを使用して認証を完了します。
構成方法
環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET を構成します。
Linux および macOS の構成方法
次のコマンドを実行します:
export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret><access_key_id> を準備した AccessKey ID に、<access_key_secret> を AccessKey シークレットに置き換えます。
Windows の構成方法
環境変数ファイルを作成し、環境変数
ALIBABA_CLOUD_ACCESS_KEY_IDとALIBABA_CLOUD_ACCESS_KEY_SECRETを追加して、準備した AccessKey ID と AccessKey シークレットを書き込みます。Windows を再起動します。
コード例
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();Producer の概要
制限事項
Producer はスレッドセーフです。理論的には、同じプロセス内の 1 つの Topic には 1 つの Producer のみが必要です。
パラメーターの説明
Producer のパラメーターは ProducerConfig を通じて設定されます。たとえば、maxAsyncThreadNum パラメーターを設定するには、ProducerConfig の setMaxAsyncThreadNum メソッドを呼び出す必要があります。
パラメーター名 | タイプ | 必須 | デフォルト値 | 説明 |
maxAsyncThreadNum | 16 | データ送信用のスレッドプールサイズ | ||
userAgent | String | いいえ | dcl-xxx | |
maxRetryCount | int | いいえ | 1 | 最大リトライ回数 |
maxRetryIntervalMs | int | いいえ | 1000 | リトライ可能なエラーのリトライ間隔 (速度制限エラーを除く) |
maxRetryIntervalMsForLimit | int | いいえ | 100 | 書き込み速度制限後のリトライ間隔 |
ProducerInterceptor | Object | いいえ | - | データ書き込み時、追加の属性情報を加えるなどの追加処理を行うためのインターセプター |
HttpConfig | Object | いいえ | - | HTTP 関連のデフォルト構成は多数あるため、直接コードを確認することをお勧めします |
maxAsyncBufferRecords | int | いいえ | INT_MAX | 非同期モードでのバッチ送信の最大レコード数。通常はサイズで制御されるため、デフォルト値は INT_MAX です |
maxAsyncBufferTimeMs | long | いいえ | 10000 | 非同期送信の最大キャッシュ時間 |
maxAsyncBufferSize | long | いいえ | 4 * 1024 * 1024 | 非同期送信の最大バッチサイズ |
maxAsyncQueueSize | long | いいえ | 16 | 非同期モードでバッチ処理された後に送信されるリクエストの数。これを超えると送信インターフェイスがブロックされます。主にメモリ不足を防ぐためです |
useTTFormat | | | | |
enableHeartbeat | bool | いいえ | false | ハートビートパケットを送信するかどうか。通常は不要です |
heartbeatGenerator | Object | いいえ | DefaultBlobHeartbeatGenerator | ハートビート送信が有効な場合、ユーザー設定の heartbeatGenerator が構成されていればそれが優先的に使用され、そうでなければデフォルトで DefaultBlobHeartbeatGenerator が使用されます |
Producer の例
依存関係
<!-- ゼロトラスト認証情報関連 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>credentials-java</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>datahub-client-library</artifactId>
<version>1.4.11</version>
</dependency>非同期書き込み (推奨)
非同期書き込みの利点は、自分でデータをバッチ処理する必要がなく、バッチ処理方法をパラメーターで構成できることです。最適化については、上記のパラメーターの説明をご参照ください。
public static void main(String[] args) throws InterruptedException {
// 環境変数を通じて AccessKey 情報を取得
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint ="https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
// デフォルト構成で Producer を初期化
ProducerConfig config = new ProducerConfig(endpoint, provider);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = producer.getTopicSchema();
// 多バージョン スキーマが有効な場合、特定バージョンのスキーマも取得可能
// RecordSchema schema = producer.getTopicSchema(3);
// 非同期書き込みの場合、必要に応じてコールバック関数を登録可能
WriteCallback callback = new WriteCallback() {
@Override
public void onSuccess(String shardId, List<RecordEntry> records, long elapsedTimeMs, long sendTimeMs) {
System.out.println("write success"); // 書き込み成功
}
@Override
public void onFailure(String shardId, List<RecordEntry> records, long elapsedTimeMs, DatahubClientException e) {
System.out.println("write failed"); // 書き込み失敗
}
};
for (int i = 0; i < 10000; ++i) {
try {
// スキーマに従ってデータを生成
TupleRecordData data = new TupleRecordData(schema);
data.setField("field1", "hello");
data.setField("field2", 1234);
RecordEntry recordEntry = new RecordEntry();
recordEntry.setRecordData(data);
producer.sendAsync(recordEntry, callback);
// データが正常に送信されたかを知る必要がない場合、コールバックを登録する必要はありません
// producer.sendAsync(recordEntry, null);
} catch (DatahubClientException e) {
// TODO 例外を処理します。通常はリトライ不可能なエラーまたはリトライ回数超過です
Thread.sleep(1000);
}
}
// 終了する前にすべてのデータが送信されることを確認
producer.flush(true);
producer.close();
}ハッシュ書き込み
データを順序付ける必要がある場合は、特定の情報に基づいてハッシュ化する必要があります。同じハッシュ値を持つデータは同じシャードに書き込まれます。単一のシャード内のデータは順序を保証できます。ハッシュ書き込みは非同期で実装することをお勧めします。
public static void main(String[] args) throws InterruptedException {
// 環境変数を通じて AccessKey 情報を取得
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
// デフォルト構成で Producer を初期化
ProducerConfig config = new ProducerConfig(endpoint, provider);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = producer.getTopicSchema();
// 多バージョン スキーマが有効な場合、特定バージョンのスキーマも取得可能
// RecordSchema schema = producer.getTopicSchema(3);
// 非同期書き込みの場合、コールバック関数を登録可能
WriteCallback callback = new WriteCallback() {
@Override
public void onSuccess(String shardId, List<RecordEntry> records, long elapsedTimeMs, long sendTimeMs) {
System.out.println("write success"); // 書き込み成功
}
@Override
public void onFailure(String shardId, List<RecordEntry> records, long elapsedTimeMs, DatahubClientException e) {
System.out.println("write failed"); // 書き込み失敗
}
};
for (int i = 0; i < 10000; ++i) {
try {
// スキーマに従ってデータを生成
TupleRecordData data = new TupleRecordData(schema);
data.setField("field1", "hello");
data.setField("field2", 1234);
RecordEntry recordEntry = new RecordEntry();
recordEntry.setRecordData(data);
// 各レコードにハッシュコンテンツを設定
recordEntry.setHashKey("test" + i);
producer.sendAsync(recordEntry, callback, DefaultRecordPartitioner.INSTANCE);
// データが正常に送信されたかを知る必要がない場合、コールバックを登録する必要はありません
// producer.sendAsync(recordEntry, null, DefaultRecordPartitioner.INSTANCE);
} catch (DatahubClientException e) {
// TODO 例外を処理します。通常はリトライ不可能なエラーまたはリトライ回数超過です
Thread.sleep(1000);
}
}
// 終了する前にすべてのデータが送信されることを確認
producer.flush(true);
producer.close();
}同期書き込み
バッチ処理方法を自分で制御したい場合は、同期書き込みを使用できます。
public static void main(String[] args) throws InterruptedException {
// 環境変数を通じて AccessKey 情報を取得
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
// デフォルト構成で Producer を初期化
ProducerConfig config = new ProducerConfig(endpoint, provider);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = producer.getTopicSchema();
// 多バージョン スキーマが有効な場合、特定バージョンのスキーマも取得可能
// RecordSchema schema = producer.getTopicSchema(3);
List<RecordEntry> recordEntryList = new ArrayList<>();
for (int i = 0; i < 1000; ++i) {
// スキーマに従ってデータを生成
TupleRecordData data = new TupleRecordData(schema);
data.setField("field1", "hello");
data.setField("field2", 1234);
RecordEntry recordEntry = new RecordEntry();
recordEntry.setRecordData(data);
recordEntryList.add(recordEntry);
}
// 書き込みに失敗すると例外がスローされます。通常はリトライ不可能なエラー、またはリトライ回数を超えたリトライ可能なエラーです
try {
String shardId = producer.send(recordEntryList);
System.out.println("write success, shardId: " + shardId); // 書き込み成功、shardId:
} catch (DatahubClientException e) {
// TODO 例外を処理します。通常はリトライ不可能なエラーまたはリトライ回数超過です
}
producer.close();
}Consumer の概要
Consumer はデータ消費に使用され、シャードを自動的に割り当てることができます。これは一般的に協調コンシュームと呼ばれます。詳細については、「協調コンシューム」をご参照ください。
Consumer は実際にはデータをバッチで読み取り、ローカルにキャッシュしてから、インターフェイスレベルでデータを 1 つずつ返します。
チェックポイントのメンテナンス
Consumer はチェックポイント情報を自動的に維持できます。起動時に、サーバーに保存されているチェックポイントを自動的に取得し、最後に保存されたチェックポイントから消費を続行します。消費中、クライアントのデータチェックポイントを定期的 (デフォルトは 10 秒) にサーバーに送信します。具体的な実装ロジックは次のとおりです:
各データポイントは RecordKey オブジェクトに対応します。データを消費した後、RecordKey に対して ack 操作を実行できます。ack 後、このデータが消費され、チェックポイントが更新可能であることを示します。自動 ack を選択することもできます。クライアントがデータを読み取った後、各データに対応する RecordKey をキュー内で順番に維持します。バックグラウンドには、チェックポイントをサーバーに送信する定期的なタスクがあります。毎回キューをチェックし、キューの先頭にある RecordKey が ack されていれば、キューからポップされます。この処理は、キューの先頭にある RecordKey が ack されなくなるまで続きます。そして、現在の先頭の RecordKey の前の位置のチェックポイントが、今回サーバーに送信する必要のあるチェックポイントになります。
よくある質問
1. クライアントがデータを消費したが、チェックポイントが時間内にサーバーに送信されなかった場合、このデータは繰り返し消費されますか?
はい、ただし、これは通常、異常終了の状況でのみ発生します。close を呼び出して正常に終了すると、現在のチェックポイントが送信されることが保証されます。
2. チェックポイントが 1〜3 の 3 つのデータがあり、何らかの理由で 2 が ack されず、1 と 3 が ack された場合、チェックポイントは何に更新されますか?
チェックポイントは 1 に更新されます。1 は ack されているため、2 がキューの先頭になり、ポップされません。そのため、チェックポイントは 1 でスタックしたままになります。
3. データが read を通じて読み取られたが、一度も ack されなかった場合、Consumer はこのデータを再度読み取りますか?
いいえ、チェックポイントもスタックしたままで更新されません。そのため、ユーザーは読み取ったすべてのデータが ack されることを保証する必要があります。データが特定の時間 (デフォルトは 60s) を超えて ack されない場合、read の呼び出しを続けると例外がスローされます。
制限事項
Consumer はスレッドセーフです。1 つのプロセス内の各 Topic には、1 つの Consumer オブジェクトのみが必要です。
Consumer の数は、通常、シャードの数を超えないようにしてください。Consumer の数がシャードの数より多い場合、一部の Consumer はシャードを割り当てられないためアイドル状態になりますが、他の Consumer が終了してシャードが利用可能になると正常に実行を開始します。
指定されたシャードのリストを消費する場合、同じサブスクリプション ID を持つ異なる Consumer は同じシャードを消費できません。
パラメーターの説明
パラメーター名 | タイプ | 必須 | デフォルト値 | 説明 |
maxAsyncThreadNum | 16 | データ読み取り用のスレッドプールサイズ | ||
userAgent | String | いいえ | dcl-xxx | |
maxRetryCount | int | いいえ | 1 | 最大リトライ回数 |
maxRetryIntervalMs | int | いいえ | 1000 | リトライ可能なエラーのリトライ間隔 (速度制限エラーを除く) |
maxRetryIntervalMsForLimit | int | いいえ | 100 | 読み取り速度制限後のリトライ間隔 |
ProducerInterceptor | Object | いいえ | - | データ読み取り時、機密情報をフィルタリングするなどの追加処理を行うためのインターセプター |
HttpConfig | Object | いいえ | - | HTTP 関連のデフォルト構成は多数あるため、直接コードを確認することをお勧めします |
balanceRead | bool | いいえ | false | true は、現在のコンシューマーが消費するシャードに順次読み取りリクエストを送信することを意味します。false は、現在のコンシューマーが消費するシャードの中で最も古いチェックポイントを持つシャードを選択して読み取りリクエストを送信することを意味します。主にデータスキューのシナリオで大きなチェックポイントのギャップを防ぐためです |
autoCommit | bool | いいえ | true | データを自動的に ack するかどうか: true は、データが読み取られた後に自動的に ack されることを意味します。false は、データが読み取られた後、手動で RecordEntry.getKey().ack() を呼び出す必要があることを意味します。そうしないと、チェックポイントは進みません |
sessionTimeoutMs | long | いいえ | 60000 | 最大コンシューマーセッション時間。コンシューマーはアクティブな状態を維持するために、サーバーに継続的にハートビートを送信する必要があります。この時間より長くハートビートが送信されない場合、サーバーによってコンシューマーグループから退出したと見なされ、そのシャードは他のコンシューマーに割り当てられます |
heartbeatRetryCount | int | いいえ | 1 | コンシューマーがアクティブな状態を維持するためのハートビート送信に失敗した場合のリトライ回数。 |
fetchNumber | int | いいえ | 500 | 1 回のリクエストで読み取る最大レコード数 |
maxBufferRecords | int | いいえ | 500 | ローカルにキャッシュされるレコード数。不足している場合、リクエストがサーバーに送信されます。これを高く設定しすぎると、メモリ不足になる可能性があります。 |
Consumer の例
協調コンシューム (推奨)
協調コンシュームは Consumer Group とも呼ばれます。サーバーは、消費のために各ノードにシャードを動的に割り当てます。チェックポイントのメンテナンスやシャードの割り当てを気にすることなく、データ処理に集中するだけで済みます。
自動 ack 消費
各レコードは読み取り後に自動的に ack され、チェックポイントが更新可能であることを示します。これにより、場合によってはデータ損失が発生する可能性があります。
public static void main(String[] args) throws InterruptedException {
// 環境変数を通じて AccessKey 情報を取得
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
String subId = "1747966903774M787N";
ConsumerConfig config = new ConsumerConfig(endpoint, provider);
DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);
while (true) {
RecordEntry recordEntry = null;
try {
recordEntry = consumer.read(5000);
if (recordEntry != null) {
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
// データを処理
System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2")); // レコードを読み取り:
}
} catch (DatahubClientException e) {
// TODO 例外を処理します。通常はリトライ不可能なエラーまたはリトライ回数超過です
}
}
}手動 ack 消費
チェックポイントを送信する前に各レコードを完全に消費する必要がある場合は、autoCommit を無効にして、各レコードを手動で ack することをお勧めします。
public static void main(String[] args) throws InterruptedException {
// 環境変数を通じて AccessKey 情報を取得
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
String subId = "1747966903774M787N";
ConsumerConfig config = new ConsumerConfig(endpoint, provider);
// データ消費が成功した後に手動で ack を設定
config.setAutoCommit(false);
DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);
while (true) {
RecordEntry recordEntry = null;
try {
recordEntry = consumer.read(5000);
if (recordEntry != null) {
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
// データを処理
System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2")); // レコードを読み取り:
}
} catch (DatahubClientException e) {
// TODO 例外を処理します。通常はリトライ不可能なエラーまたはリトライ回数超過です
} finally {
if (recordEntry != null) {
// 各レコードは処理後に ack する必要があります。そうしないと、チェックポイントが進みません
recordEntry.getKey().ack();
}
}
}
}指定シャード消費
指定シャード消費では、シャードの割り当てを自分で維持する必要があります。同じサブスクリプション ID を持つ異なる Consumer は同じシャードを消費できません。そうしないと、消費は失敗します。以下は自動 ack の例です。手動 ack については、上記の例をご参照ください。
public static void main(String[] args) throws InterruptedException {
// 環境変数を通じて AccessKey 情報を取得
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
String subId = "1747966903774M787N";
List<String> shardIds = Arrays.asList("0", "1");
ConsumerConfig config = new ConsumerConfig(endpoint, provider);
// クライアントが消費するシャードのリストを指定
DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, shardIds, config);
while (true) {
RecordEntry recordEntry = null;
try {
recordEntry = consumer.read(5000);
if (recordEntry != null) {
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
// データを処理
System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2")); // レコードを読み取り:
}
} catch (DatahubClientException e) {
// TODO 例外を処理します。通常はリトライ不可能なエラーまたはリトライ回数超過です
}
}
}