DataHub SDK for Java
1. Maven 依存関係と JDK
Maven POM
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.19.0-public</version>
</dependency> JDK バージョン jdk: >= 1.8
2. 使用上の注意
現在の SDK バージョンが V2.9 から更新された場合、setTimestampInms メソッドは別のメソッドに置き換えられます。新しいバージョンのタイムスタンプ値は、V2.9 の値に 1,000 を掛けた値であることに注意してください。
一般的に、
putRecords または putRecordsByShardメソッドとgetRecordsメソッドは、データの読み取りと書き込みのために最も頻繁に呼び出されます。getTopic、getCursor、listShardなどの他のメソッドは、初期化中にのみ呼び出されます。1 つのプロジェクトで 1 つ以上の DataHub クライアントを初期化できます。複数の DataHub クライアントを同時に使用できます。
異なるパッケージに、同じ名前だが異なるディレクトリにあるクラスが含まれている場合があります。DataHub SDK for Java V2.12 は com.aliyun.datahub.client パッケージ内のクラスを使用しますが、他のパッケージ内の同じ名前のクラスは V2.12 より前のバージョン用に提供されています。例:
com.aliyun.datahub.client.model.RecordSchema パッケージは、DataHub SDK for Java V2.12 に使用されます。
com.aliyun.datahub.common.data.RecordSchema パッケージには、バージョンが V2.12 より前の DataHub SDK for Java のコードが含まれています。SDK バージョンを V2.12 以降に更新してもコードを変更しない場合、パッケージ内のコードは引き続き使用できます。
「Parse body failed, Offset: 0」エラーが発生した場合は、enableBinary パラメーターを false に設定できます。
3. DataHub SDK for Java の使用
初期化
Alibaba Cloud アカウントを使用して DataHub にアクセスできます。DataHub にアクセスするには、AccessKey ID と AccessKey シークレット、および DataHub へのアクセスに使用するエンドポイントを指定する必要があります。次のサンプルコードは、DataHub エンドポイントを使用して DataHub クライアントを作成する方法の例を示しています。
// この例では、中国 (杭州) リージョンのエンドポイントが使用されています。必要に応じて、別のリージョンのエンドポイントも使用できます。
String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
String accessId = "<YourAccessKeyId>";
String accessKey = "<YourAccessKeySecret>";
// DataHub クライアントを作成します。
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
// バイナリデータ転送を有効にするかどうかを指定します。DataHub SDK for Java V2.12 以降では、サーバーはバイナリデータ転送をサポートしています。
new AliyunAccount(accessId, accessKey), true))
// Apsara Stack DataHub でエラーが発生した場合は、このパラメーターを false に設定します。
// HttpConfig パラメーターはオプションです。HttpConfig パラメーターを設定しない場合は、デフォルト値が使用されます。
.setHttpConfig(new HttpConfig()
.setCompressType(HttpConfig.CompressType.LZ4) // DataHub からデータを読み取ったり、DataHub にデータを書き込んだりする場合は、データ転送に LZ4 圧縮アルゴリズムを使用することをお勧めします。
.setConnTimeout(10000))
.build();構成の説明: DatahubConfig
パラメーター | 説明 |
endpoint | DataHub にアクセスするために使用されるエンドポイント。 |
account | Alibaba Cloud アカウントに関する情報。 |
enableBinary | バイナリデータ転送を実行するかどうかを指定します。DataHub SDK for Java V2.12 以降では、サーバーはバイナリデータ転送をサポートしています。SDK バージョンが V2.12 より前の場合は、このパラメーターを false に設定します。Apsara Stack DataHub で「Parse body failed, Offset:0」エラーが発生した場合は、このパラメーターを false に設定します。 |
HttpConfig
パラメーター | 説明 |
readTimeout | ソケットの読み取り/書き込みのタイムアウト期間。単位:秒。デフォルト値:10。 |
connTimeout | TCP 接続のタイムアウト期間。単位:秒。デフォルト値:10。 |
maxRetryCount | リクエスト失敗後の最大リトライ回数。デフォルト値:1。値を変更しないことをお勧めします。上位のビジネスレイヤーがリトライを実行します。 |
debugRequest | リクエストログを表示するかどうかを指定します。デフォルト値:false。 |
compressType | データ転送の圧縮モード。デフォルトでは、圧縮モードは使用されません。LZ4 および deflate 圧縮モードがサポートされています。 |
proxyUri | プロキシホストの URI。 |
proxyUsername | プロキシサーバーによって検証されるユーザー名。 |
proxyPassword | プロキシサーバーによって検証されるパスワード。 |
SDK 統計 DataHub SDK for Java を使用して、1 秒あたりに開始されるクエリなど、データの読み取り/書き込みリクエストの統計を収集できます。統計を収集するには、次のメソッドを呼び出すことができます。
ClientMetrics.startMetrics();デフォルトでは、メトリックの統計はログファイルに表示されます。この場合、Simple Logging Facade for Java (SLF4J) を構成する必要があります。次のメトリックパッケージが使用されます:com.aliyun.datahub.client.metrics。
DataHub へのデータの書き込み
次の例では、DataHub のタプル Topic にデータが書き込まれます。
// タプルレコードを書き込みます。
public static void tupleExample(String project,String topic,int retryTimes) {
// スキーマを取得します。
RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();
// 10 レコードを生成します。
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// 各レコードに追加の属性 (サーバーの IP アドレスやマシン名など) を指定できます。追加の属性を指定しない場合、データの書き込みは影響を受けません。
recordEntry.addAttribute("key1", "value1");
TupleRecordData data = new TupleRecordData(recordSchema);
data.setField("field1", "HelloWorld");
data.setField("field2", 1234567);
recordEntry.setRecordData(data);
recordEntries.add(recordEntry);
}
try {
PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
int i = result.getFailedRecordCount();
if (i > 0) {
retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
}
} catch (DatahubClientException e) {
System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());
}
}
// リトライメカニズム。
public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
boolean suc = false;
while (retryTimes != 0) {
retryTimes = retryTimes - 1;
PutRecordsResult recordsResult = client.putRecords(project, topic, records);
if (recordsResult.getFailedRecordCount() > 0) {
retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
}
suc = true;
break;
}
if (!suc) {
System.out.println("retryFailure");
}
}DataHub データを消費するためのサブスクリプションの作成
// 次のサンプルコードは、保存されたオフセットからデータを消費し、消費中にオフセットを送信する方法の例を示しています。
public static void example() {
String shardId = "0";
List<String> shardIds = Arrays.asList("0", "1");
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1. 現在のオフセットにあるレコードのカーソルを取得します。レコードの有効期限が切れているか、消費されていない場合は、Topic の Time To Live (TTL) 内の最初のレコードのカーソルを取得します。
String cursor = null;
// シーケンス番号が 0 より小さい場合、レコードは消費されていません。
if (subscriptionOffset.getSequence() < 0) {
// Topic の TTL 内の最初のレコードのカーソルを取得します。
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// 次のレコードのカーソルを取得します。
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
// シーケンス番号に基づいてカーソルを取得した後に SeekOutOfRange エラーが返された場合、現在のカーソルのレコードの有効期限が切れています。
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// Topic の TTL 内の最初のレコードのカーソルを取得します。
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2. レコードを読み取り、オフセットを保存します。たとえば、タプルレコードを読み取り、1,000 レコードを読み取るたびにオフセットを保存します。
long recordCount = 0L;
// 一度に 10 レコードを読み取ります。
int fetchNum = 10;
while (true) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// 読み取ることができるレコードがない場合は、スレッドを 1,000 ミリ秒間一時停止し、レコードの読み取りを続行します。
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
// データを消費します。
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// データが消費された後にオフセットを保存します。
++recordCount;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
if (recordCount % 1000 == 0) {
// オフセットを送信します。
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
System.out.println("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
// サブスクリプションセッションが終了しました。オフライン:サブスクリプションはオフラインです。SubscriptionSessionInvalid:サブスクリプションは他のクライアントでも使用されています。
break;
} catch (SubscriptionOffsetResetException e) {
// オフセットがリセットされます。サブスクリプションのオフセット情報を再度取得する必要があります。この例では、シーケンス番号がリセットされます。
// タイムスタンプがリセットされた場合は、CursorType.SYSTEM_TIME パラメーターを使用してカーソルを取得する必要があります。
subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (DatahubClientException e) {
// TODO: エラーが発生した場合に終了するかどうかを指定します。
} catch (Exception e) {
break;
}
}
}4. エラータイプ
このセクションでは、DataHub SDK for Java V2.12 以降に関連するエラーのタイプについて説明します。try-catch メカニズムを構成してエラータイプをキャッチし、エラーを処理できます。リトライで解決できるエラーは、DatahubClientException と LimitExceededException のみです。サーバーがビジー状態または使用不可であることが原因で発生するエラーなど、一部の DatahubClientException エラーは、リトライで解決できます。 DatahubClientException エラーと LimitExceededException エラーのコードにリトライロジックを追加することをお勧めします。ただし、リトライ回数は制限する必要があります。次の表に、DataHub SDK for Java V2.12 以降に関連するエラーのタイプを示します。エラーファイルは、次のパッケージに格納されています:com.aliyun.datahub.client.exception。
エラータイプ | エラーメッセージ | 説明 |
InvalidParameterException | InvalidParameter, InvalidCursor | 指定されたパラメーターが無効であるために返されるエラーメッセージ。 |
ResourceNotFoundException | ResourceNotFound, NoSuchProject, NoSuchTopic, NoSuchShard, NoSuchSubscription, NoSuchConnector, NoSuchMeteringInfo | アクセスしようとするリソースが存在しないため返されるエラーメッセージ。シャードを分割またはマージした後にすぐに別のリクエストを送信すると、このエラーメッセージが返されます。 |
ResourceAlreadyExistException | ResourceAlreadyExist, ProjectAlreadyExist, TopicAlreadyExist, ConnectorAlreadyExist | リソースが既に存在するために返されるエラーメッセージ。作成しようとするリソースが既に存在する場合、このエラーメッセージが返されます。 |
SeekOutOfRangeException | SeekOutOfRange | カーソルの取得時に、指定されたシーケンス番号が無効であるか、指定されたタイムスタンプが現在のタイムスタンプより後であるために返されるエラーメッセージ。カーソルのレコードの有効期限が切れているため、シーケンス番号が無効になる場合があります。 |
AuthorizationFailureException | Unauthorized | 認証署名の解析中にエラーが発生したために返されるエラーメッセージ。AccessKey ペアが有効かどうかを確認してください。 |
NoPermissionException | NoPermission, OperationDenied | 権限がないために返されるエラーメッセージ。RAM 構成が有効かどうか、または RAM ユーザーが承認されているかどうかを確認してください。 |
ShardSealedException | InvalidShardOperation | シャードが閉じられており、シャードからデータを読み取ったり、シャードにデータを書き込んだりできないために返されるエラーメッセージ。シャードにデータを書き込み続けたり、シャードから最後のデータレコードを読み取った後にデータの読み取りを続けたりすると、このエラーメッセージが返されます。 |
LimitExceededException | LimitExceeded | DataHub SDK for Java の制限を超えたために返されるエラーメッセージ。詳細については、「制限」をご参照ください。 |
SubscriptionOfflineException | SubscriptionOffline | サブスクリプションがオフラインで使用できないために返されるエラーメッセージ。 |
SubscriptionSessionInvalidException | OffsetSessionChanged, OffsetSessionClosed | サブスクリプションセッションが異常であるために返されるエラーメッセージ。サブスクリプションが使用されている場合、オフセットを送信するためのセッションが確立されます。サブスクリプションが別のクライアントでも使用されている場合、このエラーメッセージが返されます。 |
SubscriptionOffsetResetException | OffsetReseted | サブスクリプションのオフセットがリセットされたために返されるエラーメッセージ。 |
MalformedRecordException | MalformedRecord,ShardNotReady | レコード形式が無効であるために返されるエラーメッセージ。これは、スキーマが無効であるか、非 UTF-8 文字が存在するか、クライアントが Protocol Buffer (PB) プロトコルを使用しているが、サーバーが PB プロトコルをサポートしていないことが原因である可能性があります。 |
DatahubClientException | その他すべてのエラー。このエラータイプは、すべてのエラーの基本クラスです。 | エラーが上記のエラータイプに該当しないために返されるエラーメッセージ。このタイプのエラーは、リトライで解決できます。ただし、リトライ回数は制限する必要があります。 |
5. メソッド
プロジェクトの管理
プロジェクトは、DataHub でデータを管理するための基本単位です。1 つのプロジェクトには複数の Topic が含まれています。DataHub のプロジェクトは、MaxCompute のプロジェクトとは独立しています。DataHub で MaxCompute プロジェクトを再利用することはできません。DataHub でプロジェクトを作成する必要があります。
プロジェクトの作成
構文:CreateProjectResult createProject(String projectName, String comment)
プロジェクトを作成するときは、プロジェクト名を設定し、プロジェクトの説明を入力する必要があります。プロジェクト名は 3 ~ 32 文字の長さで、文字、数字、アンダースコア (_) を使用できます。プロジェクト名は文字で始める必要があり、大文字と小文字は区別されません。
パラメーター
projectName:プロジェクトの名前。
comment:プロジェクトに関するコメント。
エラー
DatahubClientException
サンプルコード
public static void createProject(String projectName,String projectComment) {
try {
datahubClient.createProject(projectName, projectComment);
System.out.println("create project successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}プロジェクトの削除
構文:DeleteProjectResult deleteProject(String projectName)。プロジェクトを削除する前に、プロジェクトに Topic が含まれていないことを確認してください。パラメーター:projectName:プロジェクトの名前。
エラー
DatahubClientException
NoPermissionException:プロジェクトに Topic が含まれている場合、このエラーが返されます。
サンプルコード
public static void deleteProject(String projectName) {
try {
datahubClient.deleteProject(projectName);
System.out.println("delete project successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}プロジェクトの更新
構文:UpdateProjectResult updateProject(String projectName, String comment)。プロジェクトのコメントのみを更新できます。パラメーター:projectName:プロジェクトの名前。comment:プロジェクトに関するコメント。
エラー
DatahubClientException
サンプルコード
public static void updateProject(String projectName,String newComment) {
try {
datahubClient.updateProject(projectName, newComment);
System.out.println("update project successful");
} catch (DatahubClientException e) {
System.out.println("other error");
}
}プロジェクトの一覧表示
構文:ListProjectResult listProject()。listProject メソッドの戻り値は ListProjectResult オブジェクトで、プロジェクト名のリストが含まれています。
パラメーター:なし
エラー
DatahubClientException
サンプルコード
public static void listProject() {
try {
ListProjectResult listProjectResult = datahubClient.listProject();
if (listProjectResult.getProjectNames().size() > 0) {
for (String pName : listProjectResult.getProjectNames()) {
System.out.println(pName);
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}プロジェクトのクエリ
構文:GetProjectResult getProject(String projectName)。getProject メソッドを呼び出して、現在のプロジェクトの属性情報を表示できます。パラメーター:projectName:プロジェクトの名前。
エラー
DatahubClientException
サンプルコード
public static void getProject(String projectName) {
try {
GetProjectResult getProjectResult = datahubClient.getProject(projectName );
System.out.println(getProjectResult.getCreateTime() + "\t"
+ getProjectResult.getLastModifyTime() + "\t"
+ getProjectResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}トピックの管理
トピックは、DataHub でデータのサブスクライブとパブリッシュを行う最小単位です。トピックを使用して、さまざまなタイプのストリーミングデータを区別できます。タプルと BLOB の 2 種類のトピックがサポートされています。
バイナリデータのブロックをレコードとして BLOB トピックに書き込むことができます。
タプルトピックには、データベースのデータレコードに似たレコードが含まれています。各レコードには複数の列が含まれています。タプルトピックのデータはネットワーク経由で文字列として転送されるため、タプルトピックのレコードスキーマを指定する必要があります。そのため、データ型の変換にはスキーマが必要です。次の表に、サポートされているデータ型を示します。
タイプ | 説明 | 値の範囲 |
BIGINT | 8 バイトの符号付き整数。 | -9223372036854775807 ~ 9223372036854775807。 |
DOUBLE | 倍精度浮動小数点数。長さは 8 バイトです。 | -1.0 _10^308 ~ 1.0 _10^308。 |
BOOLEAN | ブール型。 | True と False、true と false、または 0 と 1。 |
TIMESTAMP | タイムスタンプのタイプ。 | マイクロ秒単位のタイムスタンプ。 |
STRING | 文字列。UTF-8 エンコーディングのみがサポートされています。 | STRING 型の列のすべての値のサイズは 2 MB を超えることはできません。 |
TINYINT | 1 バイトの整数。 | -128 ~ 127。 |
SMALLINT | 2 バイトの整数。 | -32768 ~ 32767。 |
INTEGER | 4 バイトの整数。 | -2147483648 ~ 2147483647。 |
FLOAT | 4 バイトの単精度浮動小数点数。 | -3.40292347_10^38 ~ 3.40292347_10^38。 |
DataHub SDK for Java V2.16.1-public 以降では、TINYINT、SMALLINT、INTEGER、FLOAT がサポートされています。
タプルトピックの作成
構文:CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, String comment)
パラメーター
projectName:トピックを作成するプロジェクトの名前。
topicName:トピックの名前。
shardCount:トピックの初期シャード数。
lifeCycle:データの TTL。単位:日。その時間より前に書き込まれたデータにはアクセスできません。
recordType:書き込むレコードのタイプ。有効な値:TUPLE および BLOB。
recordSchema:トピックのレコードスキーマ。
comment:トピックに関するコメント。
エラー
DatahubClientException
サンプルコード
public static void createTupleTopic(String projectName, String topicName, int shardCount, int lifeCycle, String topicComment) {
RecordSchema schema = new RecordSchema();
schema.addField(new Field("bigint_field", FieldType.BIGINT));
schema.addField(new Field("double_field", FieldType.DOUBLE));
schema.addField(new Field("boolean_field", FieldType.BOOLEAN));
schema.addField(new Field("timestamp_field", FieldType.TIMESTAMP));
schema.addField(new Field("tinyint_field", FieldType.TINYINT));
schema.addField(new Field("smallint_field", FieldType.SMALLINT));
schema.addField(new Field("integer_field", FieldType.INTEGER));
schema.addField(new Field("floar_field", FieldType.FLOAT));
schema.addField(new Field("decimal_field", FieldType.DECIMAL));
schema.addField(new Field("string_field", FieldType.STRING));
try {
datahubClient.createTopic(projectName,topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicComment);
System.out.println("create topic successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}BLOB トピックの作成
構文:CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, String comment)
パラメーター
projectName:トピックを作成するプロジェクトの名前。
topicName:トピックの名前。
shardCount:トピックの初期シャード数。
lifeCycle:データの TTL。単位:日。その時間より前に書き込まれたデータにはアクセスできません。
recordType:書き込むレコードのタイプ。有効な値:TUPLE および BLOB。
comment:トピックに関するコメント。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
ResourceAlreadyExistException
サンプルコード
public static void createBlobTopic(String projectName, String topicName, int shardCount, int lifeCycle, String topicComment) {
try { datahubClient.createTopic(projectName, topicName, shardCount, lifeCycle, RecordType.BLOB, topicComment);
System.out.println("create topic successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}トピックの削除
トピックを削除する前に、トピックにサブスクリプションまたは DataConnector が含まれていないことを確認してください。そうでない場合、NoPermission エラーが報告されます。
構文:DeleteTopicResult deleteTopic(String projectName, String topicName)
パラメーター
projectName:トピックを削除するプロジェクトの名前。
topicName:トピックの名前。
エラー
DatahubClientException
NoPermissionException:トピックにサブスクリプションまたは DataConnector が含まれている場合、このエラーが返されます。
サンプルコード
public static void deleteTopic(String projectName, String topicName) {
try {
datahubClient.deleteTopic(projectName, topicName);
System.out.println("delete topic successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}トピックの一覧表示
構文:ListTopicResult listTopic(String projectName)
パラメーター
projectName:プロジェクトを一覧表示するプロジェクトの名前。
サンプルコード
public static void listTopic(String projectName ) {
try {
ListTopicResult listTopicResult = datahubClient.listTopic(projectName);
if (listTopicResult.getTopicNames().size() > 0) {
for (String tName : listTopicResult.getTopicNames()) {
System.out.println(tName);
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}トピックの更新
トピックのコメントと TTL を更新できます。
構文:UpdateTopicResult updateTopic(String projectName, String topicName, int lifeCycle, String comment)
パラメーター
projectName:トピックを更新するプロジェクトの名前。
topicName:トピックの名前。
comment:更新するコメント。
lifeCycle:トピックの TTL。
エラー
DatahubClientException
サンプルコード
public static void updateTopic(String projectName, String topicName, int lifeCycle, String comment) {
try {
comment = "新しいトピックのコメント";
lifeCycle = 1;
datahubClient.updateTopic(projectName, topicName,lifeCycle, comment);
System.out.println("update topic successful");
// 更新された結果を表示します。
GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
System.out.println(getTopicResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}トピックのクエリ
構文:GetTopicResult getTopic(String projectName, String topicName)。getTopic メソッドを呼び出して、トピックに関する属性情報を取得できます。
パラメーター
projectName:トピックをクエリするプロジェクトの名前。
topicName:トピックの名前。
エラー
DatahubClientException
サンプルコード
public static void getTopic(String projectName, String topicName) {
try {
GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
System.out.println(getTopicResult.getShardCount() + "\t"
+ getTopicResult.getLifeCycle() + "\t"
+ getTopicResult.getRecordType() + "\t"
+ getTopicResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}タプルトピックへのフィールドの追加
単一のフィールドを追加するか、一度に複数のフィールドを追加できます。
構文:AppendFieldResult appendField(String projectName, String topicName, Field field)
パラメーター
projectName:フィールドを追加するトピックが存在するプロジェクトの名前。
topicName:トピックの名前。
fields:追加するフィールド。すべてのフィールドを null に設定できます。
エラー
DatahubClientException
サンプルコード
public static void appendNewField(String projectName,String topicName) {
try {
Field newField = new Field("newField", FieldType.STRING, true,"comment");
datahubClient.appendField(projectName, topicName, newField);
System.out.println("append field successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}AppendFieldResult appendField(String projectName, String topicName, List fields);
パラメーター
projectName:フィールドを追加するトピックが存在するプロジェクトの名前。
topicName:トピックの名前。
fields:追加するフィールド。すべてのフィールドを null に設定できます。
エラー
DatahubClientException
サンプルコード
public static void appendNewField(String projectName,String topicName) {
try {
List<Field> list = new ArrayList<>();
Field newField1 = new Field("newField1", FieldType.STRING, true,"comment");
list.add(newField1);
datahubClient.appendField(projectName, topicName, list);
System.out.println("append field successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}シャードの管理
シャードは、トピックでのデータ転送に使用される同時トンネルです。各シャードには ID があります。シャードはさまざまな状態になる可能性があります。Opening:シャードが開始されています。Active:シャードが開始され、サービスを提供するために使用できます。アクティブな各シャードはサーバーリソースを消費します。必要に応じてシャードを作成することをお勧めします。
シャードの一覧表示
構文:ListShardResult listShard(String projectName, String topicName)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
エラー
DatahubClientException
サンプルコード
public static void listShard(String projectName, String topicName) {
try {
ListShardResult listShardResult = datahubClient.listShard(projectName, topicName);
if (listShardResult.getShards().size() > 0) {
for (ShardEntry entry : listShardResult.getShards()) {
System.out.println(entry.getShardId() + "\t"
+ entry.getState() + "\t"
+ entry.getLeftShardId() + "\t"
+ entry.getRightShardId());
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}シャードの分割
指定されたトピックのアクティブなシャードを分割できます。シャードが分割されると、2 つの新しいアクティブなシャードが生成され、元のシャードは閉じられます。閉じたシャードからはデータを読み取ることのみが可能で、書き込むことはできません。デフォルトの分割キーを使用するか、分割キーを指定してシャードを分割できます。
構文:SplitShardResult splitShard(String projectName, String topicName, String shardId)、または SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
shardId:分割するシャードの ID。
splitKey:シャードの分割に使用する分割キー。
エラー
DatahubClientException
サンプルコード
public static void splitShard(String projectName, String topicName, String shardId) {
try {
shardId = "0";
SplitShardResult splitShardResult = datahubClient.splitShard(projectName, topicName, shardId);
for (ShardEntry entry : splitShardResult.getNewShards()) {
System.out.println(entry.getShardId());
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}シャードのマージ
トピック内でマージされる 2 つのアクティブなシャードは、互いに隣接している必要があります。シャードの 2 つの隣接シャードの詳細については、listShard メソッドによって返される結果を参照してください。
構文:MergeShardResult mergeShard(String projectName, String topicName, String shardId, String adjacentShardId)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
shardId:マージするシャードの ID。
adjacentShardId:指定されたシャードに隣接するシャードの ID。
エラー
DatahubClientException
サンプルコード
public static void mergeShard() {
try {
String shardId = "7";
// adjacentShardId パラメーターと shardId パラメーターの値は隣接している必要があります。シャードの隣接シャードの詳細については、listShard メソッドによって返される結果を参照してください。
String adjacentShardId = "8";
MergeShardResult mergeShardResult = datahubClient.mergeShard(Constant.projectName, Constant.topicName, shardId, adjacentShardId);
System.out.println("merge successful");
System.out.println(mergeShardResult.getShardId());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}シャードの拡張
拡張するシャードの数は、元のシャードの数以上である必要があります。
構文:ExtendShardResult extendShard(String projectName, String topicName, int shardCount)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
shardCount:拡張するシャードの数。
adjacentShardId:指定されたシャードに隣接するシャードの ID。
エラー
DatahubClientException
サンプルコード
public static void extendTopic(String projectName, String topicName, int shardCount) { try { ExtendShardResult extendShardResult = datahubClient.extendShard(projectName, topicName, shardCount); } catch (DatahubClientException e) { System.out.println(e.getErrorMessage()); } }
データの読み取りと書き込み
アクティブなシャードと閉じたシャードからデータを読み取ることができます。ただし、アクティブなシャードにのみデータを書き込むことができます。
データの読み取り
データを読み取るには、まずカーソルを取得し、getRecords メソッドにカーソル値を渡す必要があります。または、DataHub のサブスクリプション機能を使用して、サブスクリプションを直接関連付けてデータを消費することもできます。この場合、サーバーは消費オフセットを自動的に保存します。データをサンプリングしてデータの品質を確認する場合は、データを読み取ることができます。
カーソルの取得
トピックからデータを読み取るには、シャードと、データの読み取りを開始するカーソルを指定します。カーソルは、OLDEST、LATEST、SEQUENCE、SYSTEM_TIME のメソッドを使用して取得できます。
OLDEST:指定されたシャードの最も古い有効なレコードを指すカーソル。
LATEST:指定されたシャードの最新のレコードを指すカーソル。
SEQUENCE:指定されたシーケンス番号のレコードを指すカーソル。
SYSTEM_TIME:タイムスタンプ値が指定されたタイムスタンプ値以上である最初のレコードを指すカーソル。
カーソルを取得するメソッドの選択
読み取るデータは有効である必要があります。つまり、データは TTL 内である必要があります。そうでない場合、エラーが報告されます。
シナリオ 1:シャードの先頭からデータを読み取ります。この場合、OLDEST メソッドを使用することをお勧めします。シャード内のすべてのデータが有効な場合、最初のレコードからデータの読み取りが開始されます。
シナリオ 2:データをサンプリングして、タイムスタンプ値が指定されたタイムスタンプ値より大きいデータが有効かどうかを確認します。この場合、SYSTEM_TIME メソッドを使用することをお勧めします。指定されたタイムスタンプのレコードの後にある最初のレコードからデータの読み取りが開始されます。
シナリオ 3:最新のデータ情報を表示します。この場合、LATEST メソッドを使用することをお勧めします。このメソッドを使用して、最新のレコードまたは最新の N レコードを読み取ることができます。最新の N レコードを取得するには、まず最新のレコードのシーケンス番号を取得する必要があります。次に、最新のレコードの前の N レコードを特定します。前の N レコードの最初のシーケンス番号は、最新のレコードのシーケンス番号から N を引いたものです。
構文:GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type)、または GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
shardId:シャードの ID。
CursorType:カーソルのタイプ。
エラー
DatahubClientException
SeekOutOfRangeException
サンプルコード
データをサンプリングする場合は、時間をタイムスタンプに変換します。次に、カーソルを取得します。
public static void getcursor(String projectName,String topicName) {
String shardId = "5";
try {
// 時間をタイムスタンプに変換します。
String time = "2019-07-01 10:00:00";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timestamp = 0L;
try {
Date date = simpleDateFormat.parse(time);
timestamp = date.getTime(); // 時間に対応するタイムスタンプを取得します。
//System.out.println(timestamp);
}
// タイムスタンプの後でデータの読み取りを開始するカーソルを取得します。
String timeCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
} catch (ParseException e) {
System.out.println(e.getErrorOffset());
}
}指定されたシャードの最も古いレコードからデータを読み取ります。
public static void getcursor(String projectName,String topicName) {
String shardId = "5";
try {
/* OLDEST メソッドを使用します。*/
String oldestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}指定されたシャードに書き込まれた最新のデータを読み取ります。これには、次の 2 つのシナリオが含まれます。
指定されたシャードに書き込まれた最新のレコードを読み取ります。
指定されたシャードに書き込まれた最新の N レコードを読み取ります。
まず最新のレコードのシーケンス番号を取得し、次にカーソルを取得する必要があります。
public static void getcursor(String projectName,String topicName) {
String shardId = "5";
try {
/* LATEST メソッドを使用します。*/
String latestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getCursor();
/* SEQUENCE メソッドを使用します。*/
// 最新のレコードのシーケンス番号を取得します。
long seq = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getSequence();
// 最新の 10 レコードを読み取るためのカーソルを取得します。
String seqCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
}
catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}データ読み取りメソッド
構文:GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit)、または GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
shardId:シャードの ID。
schema:タプルトピックからレコードを読み取るときに必要なスキーマ。
cursor:データの読み取りを開始するカーソル。
limit:読み取るレコードの最大数。
エラー
DatahubClientException
サンプルコード
タプルトピックからレコードを読み取る
public static void example(String projectName,String topicName) {
// 一度に読み取るレコードの最大数。
int recordLimit = 1000;
String shardId = "7";
// 最も古い有効なレコードのカーソルを取得します。
// 注:通常、getCursor メソッドは初期化中にのみ呼び出します。その後、getNextCursor メソッドを呼び出してデータの消費を続行できます。
String cursor = "";
try {
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
while (true) {
try {
GetRecordsResult result = datahubClient.getRecords(projectName, topicName, shardId, recordSchema, cursor, recordLimit);
if (result.getRecordCount() <= 0) {
// 読み取ることができるレコードがない場合は、スレッドを 10,000 ミリ秒間一時停止し、レコードの読み取りを続行します。
Thread.sleep(10000);
continue;
}
for (RecordEntry entry : result.getRecords()) {
TupleRecordData data = (TupleRecordData) entry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
}
// 次のカーソルを取得します。
cursor = result.getNextCursor();
} catch (InvalidCursorException ex) {
// カーソルが無効であるか、有効期限が切れています。別のカーソルを指定して消費を開始します。
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}BLOB トピックからレコードを読み取る
public static void example(String projectName,String topicName) {
// 一度に読み取るレコードの最大数。
int recordLimit = 1000;
String shardId = "7";
// 最も古い有効なレコードのカーソルを取得します。
// 注:通常、getCursor メソッドは初期化中にのみ呼び出します。その後、getNextCursor メソッドを呼び出してデータの消費を続行できます。
String cursor = "";
try {
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
while (true) {
try {
GetRecordsResult result = datahubClient.getRecords(projectName, topicName, shardId, recordSchema, cursor, recordLimit);
if (result.getRecordCount() <= 0) {
// 読み取ることができるレコードがない場合は、スレッドを 10,000 ミリ秒間一時停止し、レコードの読み取りを続行します。
Thread.sleep(10000);
continue;
}
/* データを消費します。*/
for (RecordEntry record: result.getRecords()){
BlobRecordData data = (BlobRecordData) record.getRecordData();
System.out.println(new String(data.getData()));
}
// 次のカーソルを取得します。
cursor = result.getNextCursor();
} catch (InvalidCursorException ex) {
// カーソルが無効であるか、有効期限が切れています。別のカーソルを指定して消費を開始します。
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}データの書き込み
DataHub SDK for Java V2.12 以降では、サーバーは PutRecordsByShardResult メソッドをサポートしています。V2.12 より前のバージョンでは、サーバーは putRecords メソッドをサポートしています。putRecordsByShard メソッドを呼び出すには、データを書き込むシャードを指定する必要があります。そうでない場合、データはデフォルトで最初のアクティブなシャードに書き込まれます。上記の 2 つのメソッドの入力パラメーターは、タプル型や BLOB 型など、同じ型のレコードのリストです。DataHub SDK for Java では、putRecordsByShard メソッドを呼び出すことによってシャードごとにデータを書き込むか、putRecords メソッドを呼び出すことによってハイブリッドモードでデータを書き込むことができます。DataHub SDK for Java V2.12 以降では、シャードごとに DataHub にデータを書き込むことができます。putRecords メソッドを呼び出して DataHub にデータを書き込む場合は、戻り値をチェックして、データが DataHub に書き込まれたかどうかを確認する必要があります。putRecordsByShard メソッドを呼び出して DataHub にデータを書き込もうとしたときにデータの書き込みに失敗した場合は、エラーが報告されます。サーバーが putRecordsByShard メソッドをサポートしている場合は、putRecordsByShard メソッドを使用することをお勧めします。
構文:PutRecordsResult putRecords(String projectName, String topicName, List records)、または PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
shardId:シャードの ID。
records:DataHub に書き込むレコードのリスト。
エラー
DatahubClientException
タプルトピックにレコードを書き込む
// タプルレコードを書き込みます。
public static void tupleExample(String project,String topic,int retryTimes) {
// スキーマを取得します。
RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();
// 10 レコードを生成します。
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// 各レコードに追加の属性 (サーバーの IP アドレスやマシン名など) を指定できます。追加の属性を指定しない場合、データの書き込みは影響を受けません。
recordEntry.addAttribute("key1", "value1");
TupleRecordData data = new TupleRecordData(recordSchema);
data.setField("field1", "HelloWorld");
data.setField("field2", 1234567);
recordEntry.setRecordData(data);
recordEntries.add(recordEntry);
}
try {
PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
int i = result.getFailedRecordCount();
if (i > 0) {
retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
}
} catch (DatahubClientException e) {
System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());
}
}
// リトライメカニズム。
public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
boolean suc = false;
while (retryTimes != 0) {
retryTimes = retryTimes - 1;
PutRecordsResult recordsResult = client.putRecords(project, topic, records);
if (recordsResult.getFailedRecordCount() > 0) {
retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
}
suc = true;
break;
}
if (!suc) {
System.out.println("retryFailure");
}
}
'''Java
<br />
<br />** BLOB トピックにレコードを書き込む**<br />
'''Java
// BLOB レコードを書き込みます。
public static void blobExample() {
// 10 レコードを生成します。
List<RecordEntry> recordEntries = new ArrayList<>();
String shardId = "4";
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// 各レコードに追加の属性を指定します。
recordEntry.addAttribute("key1", "value1");
BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
recordEntry.setRecordData(data);
recordEntry.setShardId(shardId);
recordEntries.add(recordEntry);
recordEntry.setShardId("0");
}
while (true) {
try {
// DataHub SDK for Java V2.12 以降では、サーバーは PutRecordsByShardResult メソッドをサポートしています。SDK バージョンが V2.12 より前の場合は、putRecords メソッドを使用します。
//datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
System.out.println("write data successful");
break;
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}さまざまなモードでのレコードの書き込み
SDK バージョンが V2.12 より前の場合、putRecords メソッドを呼び出すことによってのみレコードを書き込むことができます。RecordEntry クラスには、shardId、partitionKey、hashKey の 3 つの属性が含まれています。上記の属性の値を指定して、レコードを書き込むシャードを決定できます。
DataHub SDK for Java V2.12 以降では、putRecordsByShard メソッドを呼び出してレコードを書き込むことをお勧めします。これにより、サーバーでの再パーティション化によって発生するパフォーマンスの低下を防ぎます。
シャード ID でレコードを書き込みます。このモードをお勧めします。サンプルコード:
RecordEntry entry = new RecordEntry();
entry.setShardId("0");ハッシュキーでレコードを書き込みます。このモードでは、128 ビットの Message-Digest Algorithm 5 (MD5) 値を指定します。ハッシュキーでレコードを書き込む場合、BeginHashKey パラメーターと EndHashKey パラメーターの値を使用して、レコードを書き込むシャードが決定されます。サンプルコード:
RecordEntry entry = new RecordEntry();
entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");パーティションキーでレコードを書き込みます。このモードでは、パーティションキーとして文字列を指定します。次に、文字列の MD5 値と、BeginHashKey パラメーターと EndHashKey パラメーターの値に基づいて、レコードを書き込むシャードが決定されます。サンプルコード:
RecordEntry entry = new RecordEntry();
entry.setPartitionKey("TestPartitionKey");メータリング情報のクエリ
メータリング情報のクエリ
構文:GetMeterInfoResult getMeterInfo(String projectName, String topicName, String shardId)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
shardId:シャードの ID。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void getMeter(String projectName,String topicName) {
String shardId = "5";
try {
GetMeterInfoResult getMeterInfoResult = datahubClient.getMeterInfo(projectName, topicName, shardId);
System.out.println("get meter successful");
System.out.println(getMeterInfoResult.getActiveTime() + "\t" + getMeterInfoResult.getStorage());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}サブスクリプションの管理
DataHub では、サーバーがサブスクリプションの消費オフセットを保存できます。単純な構成を実行することで、可用性の高いオフセットストレージサービスを取得できます。
サブスクリプションの作成
構文:CreateSubscriptionResult createSubscription(String projectName, String topicName, String comment)
サブスクリプションのコメントは、{"application":"アプリケーション","description":"説明"} の形式です。
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
comment:サブスクリプションに関するコメント。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void createSubscription(String projectName,String topicName) {
try {
CreateSubscriptionResult createSubscriptionResult = datahubClient.createSubscription(projectName, topicName, Constant.subscribtionComment);
System.out.println("create subscription successful");
System.out.println(createSubscriptionResult.getSubId());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}サブスクリプションの削除
構文:DeleteSubscriptionResult deleteSubscription(String projectName, String topicName, String subId)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
subId:サブスクリプションの ID。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void deleteSubscription(String projectName,String topicName,String subId) {
try {
datahubClient.deleteSubscription(projectName, topicName, subId);
System.out.println("delete subscription successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}サブスクリプションの更新
既存のサブスクリプションのコメントのみを更新できます。
構文:UpdateSubscriptionResult updateSubscription(String projectName, String topicName, String subId, String comment)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
subId:サブスクリプションの ID。
comment:更新するコメント。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void updateSubscription(String projectName, String topicName, String subId, String comment){
try {
datahubClient.updateSubscription(projectName,topicName,subId,comment);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}サブスクリプションの一覧表示
listSubscription メソッドの pageNum パラメーターと pageSize パラメーターは、一覧表示するサブスクリプションの範囲を指定します。たとえば、pageNum パラメーターと pageSize パラメーターを 1 と 10 に設定して、最初の 10 個のサブスクリプションを一覧表示できます。別の例として、pageNum パラメーターと pageSize パラメーターを 2 と 5 に設定して、6 番目から 10 番目のサブスクリプションを一覧表示できます。
構文:ListSubscriptionResult listSubscription(String projectName, String topicName, int pageNum, int pageSize)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
pageNum:返すページ番号。
pageSize:各ページに返すエントリの数。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
サンプルコード
public static void listSubscription(String projectName, String topicName, int pageNum, int pageSize) {
try {
ListSubscriptionResult listSubscriptionResult = datahubClient.listSubscription(projectName, topicName, pageNum, pageSize);
if (listSubscriptionResult.getSubscriptions().size() > 0) {
System.out.println(listSubscriptionResult.getTotalCount());
System.out.println(listSubscriptionResult.getSubscriptions().size());
for (SubscriptionEntry entry : listSubscriptionResult.getSubscriptions()) {
System.out.println(entry.getSubId() + "\t"
+ entry.getState() + "\t"
+ entry.getType() + "\t"
+ entry.getComment());
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}サブスクリプションのクエリ
構文:GetSubscriptionResult getSubscription(String projectName, String topicName, String subId)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
subId:サブスクリプションの ID。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
public static void getSubscription(String projectName, String topicName, String subId) {
try {
GetSubscriptionResult getSubscriptionResult = datahubClient.getSubscription(projectName, topicName, subId);
System.out.println(getSubscriptionResult.getSubId() + "\t"
+ getSubscriptionResult.getState() + "\t"
+ getSubscriptionResult.getType() + "\t"
+ getSubscriptionResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}サブスクリプションのステータスの更新
サブスクリプションは、OFFLINE または ONLINE の状態にすることができます。これは、オフラインまたはオンラインのサブスクリプションを示します。
構文:UpdateSubscriptionStateResult updateSubscriptionState(String projectName, String topicName, String subId, SubscriptionState state)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
subId:サブスクリプションの ID。
state:更新する状態。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void updateSubscriptionState(String projectName, String topicName,String subId) {
try {
datahubClient.updateSubscriptionState(projectName, topicName, subId, SubscriptionState.ONLINE);
System.out.println("update subscription state successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}オフセットの管理
サブスクリプションが作成されると、最初は未消費です。サブスクリプションのオフセットストレージ機能を使用するには、オフセットに対して次の操作を実行します。
オフセットの初期化
オフセットを初期化するには、openSubscriptionSession メソッドを 1 回だけ呼び出す必要があります。このメソッドを 2 回目に呼び出すと、新しい消費セッション ID が生成されます。この場合、前のセッションは無効になり、オフセットを送信できなくなります。
構文:OpenSubscriptionSessionResult openSubscriptionSession(String projectName, String topicName, String subId, List shardIds)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
subId:サブスクリプションの ID。
shardIds:シャードの ID。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void openSubscriptionSession(String projectName, String topicName) {
shardId = "4";
shardIds = new ArrayList<String>();
shardIds.add("0");
shardIds.add("4");
try {
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
System.out.println(subscriptionOffset.getSessionId() + "\t"
+ subscriptionOffset.getVersionId() + "\t"
+ subscriptionOffset.getSequence());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}オフセットの取得
構文:GetSubscriptionOffsetResult getSubscriptionOffset(String projectName, String topicName, String subId, List shardIds)
getSubscriptionOffset メソッドの戻り値は GetSubscriptionOffsetResult オブジェクトで、基本的には openSubscriptionSession メソッドの戻り値と同じです。ただし、GetSubscriptionOffsetResult オブジェクトには、オフセットのセッション ID は含まれていません。getSubscriptionOffset メソッドを呼び出して、オフセットに関する情報を表示することのみができます。
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
subId:サブスクリプションの ID。
shardIds:シャードの ID。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
// オフセットを取得します。
public static void getSubscriptionOffset(String projectName, String topicName,String subId) {
shardId = "4";
shardIds = new ArrayList<String>();
shardIds.add("0");
shardIds.add("4");
try {
GetSubscriptionOffsetResult getSubscriptionOffsetResult = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = getSubscriptionOffsetResult.getOffsets().get(shardId);
System.out.println(subscriptionOffset.getVersionId() + "\t"
+ subscriptionOffset.getSequence());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}オフセットの送信
構文:CommitSubscriptionOffsetResult commitSubscriptionOffset(String projectName, String topicName, String subId, Map offsets)
オフセットを送信すると、DataHub は versionId パラメーターと sessionId パラメーターの値を検証します。値が現在のセッションの値と同じであることを確認してください。送信するオフセット情報に制限はありません。レコードの実際のシーケンス番号とタイムスタンプを入力することをお勧めします。
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
subId:サブスクリプションの ID。
offsets:シャードのオフセットマップ。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
SubscriptionOffsetResetException
SubscriptionSessionInvalidException
SubscriptionOfflineException
サンプルコード
// オフセットを送信します。
public static void commitSubscriptionOffset(String projectName, String topicName,String subId) {
while (true) {
try {
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// サンプルコードはテストにのみ使用されます。完全なコードについては、保存された消費オフセットからデータを消費し、消費中にオフセットを送信する方法の例を示すサンプルコードを参照してください。
subscriptionOffset.setSequence(10);
subscriptionOffset.setTimestamp(100);
Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
offsets.put(shardId, subscriptionOffset);
// オフセットを送信します。
datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsets);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}オフセットのリセット
構文:ResetSubscriptionOffsetResult resetSubscriptionOffset(String projectName, String topicName, String shardId, Map offsets)
オフセットを特定の時点にリセットできます。この時点で複数のレコードが関係している場合、リセットオフセットはこの時点で関係している最初のレコードを指します。オフセットがリセットされると、オフセット情報が変更され、バージョン ID が更新されます。実行中のタスクが以前のバージョン ID を使用してオフセットを送信すると、SubscriptionOffsetResetException エラーが報告されます。getSubscriptionOffset メソッドを呼び出して、新しいバージョン ID を取得できます。
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
subId:サブスクリプションの ID。
offsets:シャードのオフセットマップ。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
// オフセットをリセットします。
public static void resetSubscriptionOffset(String projectName, String topicName) throws ParseException {
List<String> shardIds = Arrays.asList("0");
// オフセットをリセットする時間を指定し、時間をタイムスタンプに変換します。
String time = "2019-07-09 10:00:00";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(time);
long timestamp = date.getTime(); // 時間に対応するタイムスタンプを取得します。
long sequence = client.getCursor(projectName, topicName, subId, CursorType.SYSTEM_TIME, timestamp).getSequence();
SubscriptionOffset offset = new SubscriptionOffset();
offset.setTimestamp(timestamp);
offset.setSequence(sequence);
Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
for (String shardId : shardIds) {
offsets.put(shardId, offset);
}
try {
datahubClient.resetSubscriptionOffset(projectName, topicName, subId, offsets);
System.out.println("reset successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}DataHub でデータを消費するためのサブスクリプションの関連付け
DataHub からデータを読み取るのと同様に、DataHub でデータを消費するためにサブスクリプションを関連付けることができます。サブスクリプションは消費オフセットを保存します。必要に応じて消費オフセットを選択できます。
使用上の注意:
openSubscriptionSession メソッドを呼び出してオフセットを初期化し、このサブスクリプションのバージョン ID とセッション ID を取得します。オフセットを初期化するために、このメソッドは 1 回だけ呼び出すことができます。このメソッドを複数回呼び出すと、前のセッションは無効になります。この場合、オフセットを送信できません。
getCursor メソッドを呼び出して、データを消費するサブスクリプション内のレコードのオフセットを取得します。最初のレコードを消費した後、getNextCursor メソッドを呼び出して次のレコードのオフセットを取得し、データの消費を続けます。
commitSubscriptionOffset メソッドを呼び出してオフセットを送信します。オフセットを送信すると、このサブスクリプションのバージョン ID とセッション ID を検証する必要があります。そのため、バージョン ID とセッション ID が現在のセッションのバージョン ID とセッション ID と同じであることを確認してください。
// 次のサンプルコードは、保存されたオフセットからデータを消費し、消費中にオフセットを送信する方法の例を示しています。
public static void example(String projectName, String topicName,String subId) {
String shardId = "0";
List<String> shardIds = Arrays.asList("0", "1");
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1. 現在のオフセットにあるレコードのカーソルを取得します。レコードの有効期限が切れているか、消費されていない場合は、トピックの TTL 内の最初のレコードのカーソルを取得します。
String cursor = null;
// シーケンス番号が 0 より小さい場合、レコードは消費されていません。
if (subscriptionOffset.getSequence() < 0) {
// トピックの TTL 内の最初のレコードのカーソルを取得します。
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// 次のレコードのカーソルを取得します。
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
// シーケンス番号に基づいてカーソルを取得した後に SeekOutOfRange エラーが返された場合、現在のカーソルのレコードの有効期限が切れています。
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// トピックの TTL 内の最初のレコードのカーソルを取得します。
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2. レコードを読み取り、オフセットを保存します。たとえば、タプルレコードを読み取り、1,000 レコードを読み取るたびにオフセットを保存します。
long recordCount = 0L;
// 一度に 10 レコードを読み取ります。
int fetchNum = 10;
while (true) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// 読み取ることができるレコードがない場合は、スレッドを 1,000 ミリ秒間一時停止し、レコードの読み取りを続行します。
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
// データを消費します。
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// データが消費された後にオフセットを保存します。
++recordCount;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
if (recordCount % 1000 == 0) {
// オフセットを送信します。
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
System.out.println("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
// サブスクリプションセッションが終了しました。オフライン:サブスクリプションはオフラインです。SessionChange:サブスクリプションは他のクライアントでも使用されています。
break;
} catch (SubscriptionOffsetResetException e) {
// オフセットがリセットされます。サブスクリプションのオフセット情報を再度取得する必要があります。この例では、シーケンス番号がリセットされます。
// タイムスタンプがリセットされた場合は、CursorType.SYSTEM_TIME パラメーターを使用してカーソルを取得する必要があります。
subscriptionOffset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (DatahubClientException e) {
// TODO: エラーが発生した場合に終了するかどうかを指定します。
} catch (Exception e) {
break;
}
}
}DataConnector の管理
DataHub の DataConnector は、DataHub から他のクラウドサービスにストリーミングデータを同期します。DataConnector を使用して、DataHub トピックから MaxCompute、Object Storage Service (OSS)、ApsaraDB RDS for MySQL、Tablestore、Elasticsearch、Function Compute に、リアルタイムまたはほぼリアルタイムのモードでデータを同期できます。DataConnector が構成されると、DataHub に書き込んだデータを他の Alibaba Cloud サービスで使用できます。
DataConnector の作成
構文:CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, List columnFields, SinkConfig config)、または CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, long sinkStartTime, List columnFields, SinkConfig config)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
ConnectorType:作成する DataConnector のタイプ。
columnFields:同期するフィールド。
sinkStartTime:DataHub へのデータの同期の開始時刻。単位:ミリ秒。
config:特定のタイプの DataConnector の構成の詳細。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
次のサンプルコードは、DataHub から MaxCompute にデータを同期する DataConnector を作成する方法の例を示しています。
public static void createConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
SinkOdpsConfig config = new SinkOdpsConfig() {{
setEndpoint(Constant.odps_endpoint);
setProject(Constant.odps_project);
setTable(Constant.odps_table);
setAccessId(Constant.odps_accessId);
setAccessKey(Constant.odps_accessKey);
setPartitionMode(PartitionMode.SYSTEM_TIME);
setTimeRange(60);
}};
// パーティション形式を指定します。
SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
addConfig("ds", "%Y%m%d");
addConfig("hh", "%H");
addConfig("mm", "%M");
}};
config.setPartitionConfig(partitionConfig);
try {
// DataConnector を作成します。
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ODPS, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}次のサンプルコードは、DataHub から OSS にデータを同期する DataConnector を作成する方法の例を示しています。
public static void createOssConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
SinkOssConfig config = new SinkOssConfig() {{
setAccessId(Constant.oss_accessId);
setAccessKey(Constant.oss_accessKey);
setAuthMode(AuthMode.STS);
setBucket(Constant.oss_bucket);
setEndpoint(Constant.oss_endpoint);
setPrefix(Constant.oss_prefix);
setTimeFormat(Constant.oss_timeFormat);
setTimeRange(60);
}};
try {
// DataConnector を作成します。
datahubClient.createConnector(projectName,topicName, ConnectorType.SINK_OSS, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}次のサンプルコードは、DataHub から Tablestore にデータを同期する DataConnector を作成する方法の例を示しています。
public static void createOtsConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkOtsConfig config = new SinkOtsConfig() {{
setAccessId(Constant.ots_accessId);
setAccessKey(Constant.ots_accessKey);
setEndpoint(Constant.ots_endpoint);
setInstance(Constant.ots_instance);
setTable(Constant.ots_table);
setAuthMode(AuthMode.AK);
}};
try {
// DataConnector を作成します。
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_OTS, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}次のサンプルコードは、DataHub から Hologres にデータを同期する DataConnector を作成する方法の例を示しています。
public static void createHoloConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkHologresConfig config = new SinkHologresConfig() {{
setAccessId(Constant.accessId);
setAccessKey(Constant.accessKey);
setProjectName(Constant.projectName);
setTopicName(Constant.topicName);
setAuthMode(AuthMode.AK);
setInstanceId(Constant.instanceId);
// タイムスタンプ単位を設定します。
setTimestampUnit(TimestampUnit.MILLISECOND);
}};
try {
// DataConnector を作成します。
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_HOLOGRES, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}次のサンプルコードは、DataHub から Elasticsearch にデータを同期する DataConnector を作成する方法の例を示しています。
public static void createEsConnector(String projectName,String topicName){
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkEsConfig config = new SinkEsConfig() {{
setEndpoint(Constant.es_endpoint);
setIdFields(Constant.es_fields);
setIndex(Constant.es_index);
setPassword(Constant.es_password);
setProxyMode(Constant.es_proxyMode);
setTypeFields(Constant.es_typeFields);
setUser(Constant.es_user);
}};
try {
// DataConnector を作成します。
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ES, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}次のサンプルコードは、DataHub から Function Compute にデータを同期する DataConnector を作成する方法の例を示しています。
public static void createFcConnector(String projectName,String topicName){
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkFcConfig config = new SinkFcConfig() {{
setEndpoint(Constant.fc_endpoint);
setAccessId(Constant.fc_accessId);
setAccessKey(Constant.fc_accessKey);
setAuthMode(AuthMode.AK);
setFunction(Constant.fc_function);
setService(Constant.fc_service);
}};
try {
// DataConnector を作成します。
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_FC, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}次のサンプルコードは、DataHub から MySQL データベースにデータを同期する DataConnector を作成する方法の例を示しています。
public static void createMysqlConnector(String projectName,String topicName){
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkMysqlConfig config = new SinkMysqlConfig() {{
setDatabase( Constant.mysql_database);
setHost(Constant.mysql_host);
setInsertMode(InsertMode.OVERWRITE);
setPassword(Constant.mysql_password);
setPort(Constant.mysql_port);
setTable(Constant.mysql_table);
setUser(Constant.mysql_user);
}};
try {
// DataConnector を作成します。
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_MYSQL, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}DataConnector の削除
構文:DeleteConnectorResult deleteConnector(String projectName, String topicName, ConnectorType connectorType)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
ConnectorType:削除する DataConnector のタイプ。
columnFields:同期するフィールド。
sinkStartTime:DataHub へのデータの同期の開始時刻。単位:ミリ秒。
config:特定のタイプの DataConnector の構成の詳細。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void deleteConnector(String projectName,String topicName) {
try {
datahubClient.deleteConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println("delete connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}DataConnector のクエリ
構文:GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName,topicName,ConnectorType.SINK_ODPS)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
ConnectorType:クエリする DataConnector のタイプ。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void getConnector(String projectName,String topicName) {
try {
GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println(getConnectorResult.getState() + "\t" + getConnectorResult.getSubId());
for (String fieldName : getConnectorResult.getColumnFields()) {
System.out.println(fieldName);
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}DataConnector の更新
DataConnector の構成を更新できます。
構文:UpdateConnectorResult updateConnector(String projectName, String topicName, ConnectorType connectorType, SinkConfig config)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
ConnectorType:更新する DataConnector のタイプ。
config:特定のタイプの DataConnector の構成の詳細。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void updateConnector(String projectName,String topicName) {
SinkOdpsConfig config = (SinkOdpsConfig) datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS).getConfig();
// AccessKey ペアを変更します。
config.setTimeRange(100);
config.setAccessId(accessId);
config.setAccessKey(accessKey);
// タイムスタンプタイプを変更します。
config.setTimestampUnit(ConnectorConfig.TimestampUnit.MICROSECOND);
try {
datahubClient.updateConnector(projectName, topicName, ConnectorType.SINK_ODPS, config);
System.out.println("update connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}DataConnector を使用して同期するフィールドの更新
構文:UpdateConnectorResult updateConnector(String projectName, String topicName, String connectorId, List columnFields)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
connectorId:更新する DataConnector の ID。
columnFields:同期するフィールド。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void updateConnector(String projectName,String topicName) {
String connectorId = "";
// columnField パラメーターは、ダウンストリームに同期されるすべてのフィールドを指定します。これには、新しく追加されたフィールドが含まれますが、これらに限定されません。
List<String> columnField = new ArrayList<>();
columnField.add("f1");
try {
batchClient.updateConnector(projectName, topicName,connectorId,columnField);
System.out.println("update connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}DataConnector のステータスの更新
構文:UpdateConnectorStateResult updateConnectorState(String projectName, String topicName, ConnectorType connectorType, ConnectorState connectorState)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
ConnectorType:DataConnector のタイプ。
connectorState:DataConnector の状態。有効な値:STOPPED および RUNNING。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void updateConnectorState(String projectName,String topicName) {
try {
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
System.out.println("update connector state successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}DataConnector のオフセットの更新
構文:UpdateConnectorOffsetResult updateConnectorOffset(String projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
ConnectorType:DataConnector のタイプ。
shardId:シャードの ID。shardID パラメーターが null に設定されている場合、すべてのシャードのオフセットが更新されます。
offset:DataConnector のオフセット。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void updateConnectorOffset(String projectName,String topicName) {
ConnectorOffset offset = new ConnectorOffset() {{
setSequence(10);
setTimestamp(1000);
}};
try {
// DataConnector のオフセットを更新する前に、DataConnector を停止します。
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
datahubClient.updateConnectorOffset(projectName, topicName, ConnectorType.SINK_ODPS, shardId, offset);
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
System.out.println("update connector offset successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}DataConnector の一覧表示
構文:ListConnectorResult listConnector(String projectName, String topicName)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void listConnector(String projectName,String topicName) {
try {
ListConnectorResult listConnectorResult = datahubClient.listConnector(projectName, topicName);
for (String cName : listConnectorResult.getConnectorNames()) {
System.out.println(cName);
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}DataConnector のシャードステータスのクエリ
構文:GetConnectorShardStatusResult getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType)、または ConnectorShardStatusEntry getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType, String shardId)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
ConnectorType:DataConnector のタイプ。
shardId:シャードの ID。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void getConnectorShardStatusByShard(String projectName,String topicName,String shardId) {
try {
ConnectorShardStatusEntry connectorShardStatusEntry = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
System.out.println(connectorShardStatusEntry.getState() + "\t"
+ connectorShardStatusEntry.getCurrSequence() + "\t"
+ connectorShardStatusEntry.getDiscardCount() + "\t"
+ connectorShardStatusEntry.getUpdateTime());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
public static void getConnectorShardStatus(String projectName,String topicName) {
try {
GetConnectorShardStatusResult getConnectorShardStatusResult = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS);
for (Map.Entry<String, ConnectorShardStatusEntry> entry : getConnectorShardStatusResult.getStatusEntryMap().entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue().getState() + "\t"
+ entry.getValue().getCurrSequence() + "\t"
+ entry.getValue().getDiscardCount() + "\t"
+ entry.getValue().getUpdateTime());
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}DataConnector の再起動
構文:ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType)、または ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType, String shardId)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
ConnectorType:DataConnector のタイプ。
shardId:シャードの ID。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void reloadConnector(String projectName,String topicName ) {
try {
datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println("reload connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
public static void reloadConnectorByShard(String projectName,String topicName,String shardId) {
try {
datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
System.out.println("reload connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}DataConnector の完了時刻のクエリ
構文:GetConnectorDoneTimeResult getConnectorDoneTime(String projectName, String topicName, ConnectorType connectorType)
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
ConnectorType:DataConnector のタイプ。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void getDoneTime(String projectName,String topicName ) {
try {
GetConnectorDoneTimeResult getConnectorDoneTimeResult = datahubClient.getConnectorDoneTime(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println(getConnectorDoneTimeResult.getDoneTime());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}VPC ホワイトリストの更新
構文:UpdateProjectVpcWhitelistResult updateProjectVpcWhitelist(String projectName, String vpcIds)
パラメーター
projectName:プロジェクトの名前。
vpcids:仮想プライベートクラウド (VPC) の ID。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void updateProjectVpcWhitelist(String projectName) {
String vpcid = "12345";
try {
datahubClient.updateProjectVpcWhitelist(projectName, vpcid);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}フィールドの追加
構文:AppendConnectorFieldResult appendConnectorField(String projectName, String topicName, ConnectorType connectorType, String fieldName)
MaxCompute テーブルにフィールドが含まれている場合は、DataConnector を使用して同期するフィールドを追加できます。
パラメーター
projectName:プロジェクトの名前。
topicName:トピックの名前。
ConnectorType DataConnector のタイプ。
fieldName:追加するフィールドの名前。フィールドは null に設定できます。
エラー
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
サンプルコード
public static void appendConnectorField(String projectName,String topicName) {
String newField = "newfield";
try {
// トピックと MaxCompute テーブルの両方に、追加するフィールドが含まれています。さらに、トピックのスキーマは MaxCompute テーブルのスキーマと同じです。
datahubClient.appendConnectorField(projectName, topicName, ConnectorType.SINK_ODPS, newField);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}複数のオブジェクトを同時に管理する
DataHub コンソールの コマンドラインツールを使用することをお勧めします。