SDK DataHub untuk Java
1. Dependensi Maven dan JDK
POM Maven
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.19.0-public</version>
</dependency>Versi JDKjdk: >= 1.8
2. Catatan penggunaan
Jika versi SDK Anda saat ini diperbarui dari V2.9, metode setTimestampInms digantikan oleh metode lain. Perhatikan bahwa nilai timestamp di versi baru adalah nilai di V2.9 dikalikan dengan 1.000.
Secara umum, metode
putRecords atau putRecordsByShard, dangetRecordspaling sering dipanggil untuk membaca dan menulis data. Metode lainnya, sepertigetTopic,getCursor, danlistShard, hanya dipanggil selama inisialisasi.Anda dapat menginisialisasi satu atau lebih klien DataHub dalam sebuah proyek. Beberapa klien DataHub dapat digunakan secara bersamaan.
Paket yang berbeda mungkin berisi kelas dengan nama yang sama tetapi di direktori yang berbeda. DataHub SDK untuk Java V2.12 menggunakan kelas dalam paket com.aliyun.datahub.client, sedangkan kelas dengan nama yang sama dalam paket lain disediakan untuk versi sebelum V2.12. Contoh:
Paket com.aliyun.datahub.client.model.RecordSchema digunakan untuk DataHub SDK untuk Java V2.12.
Paket com.aliyun.datahub.common.data.RecordSchema berisi kode DataHub SDK untuk Java yang versinya lebih lama dari V2.12. Jika Anda memperbarui versi SDK Anda ke V2.12 atau lebih baru tetapi tidak memodifikasi kode, kode dalam paket tersebut masih dapat digunakan.
Jika muncul kesalahan "Parse body failed, Offset: 0", Anda dapat menyetel parameter enableBinary ke false.
3. Menggunakan DataHub SDK untuk Java
Inisialisasi
Anda dapat menggunakan akun Alibaba Cloud untuk mengakses DataHub. Untuk mengakses DataHub, Anda harus memberikan ID AccessKey dan Rahasia AccessKey Anda, serta titik akhir yang digunakan untuk mengakses DataHub. Kode sampel berikut memberikan contoh cara membuat klien DataHub dengan menggunakan titik akhir DataHub:
// Dalam contoh ini, titik akhir wilayah China (Hangzhou) digunakan. Anda juga dapat menggunakan titik akhir wilayah lain sesuai kebutuhan.
String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
String accessId = "<YourAccessKeyId>";
String accessKey = "<YourAccessKeySecret>";
// Buat klien DataHub.
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
// Tentukan apakah akan mengaktifkan transmisi data biner. Di DataHub SDK untuk Java V2.12 dan yang lebih baru, server mendukung transmisi data biner.
new AliyunAccount(accessId, accessKey), true))
// Jika terjadi kesalahan di Apsara Stack DataHub, atur parameter ini ke false.
// Parameter HttpConfig bersifat opsional. Jika Anda tidak mengatur parameter HttpConfig, nilai default akan digunakan.
.setHttpConfig(new HttpConfig()
.setCompressType(HttpConfig.CompressType.LZ4) // Saat membaca data dari atau menulis data ke DataHub, kami sarankan Anda menggunakan algoritma kompresi LZ4 untuk transmisi data.
.setConnTimeout(10000))
.build();Deskripsi konfigurasi: DatahubConfig
Parameter | Deskripsi |
endpoint | Titik akhir yang digunakan untuk mengakses DataHub. |
account | Informasi tentang akun Alibaba Cloud. |
enableBinary | Menentukan apakah akan melakukan transmisi data biner. Di DataHub SDK untuk Java V2.12 dan yang lebih baru, server mendukung transmisi data biner. Jika versi SDK lebih lama dari V2.12, atur parameter ini ke false. Jika terjadi kesalahan "Parse body failed, Offset:0" di Apsara Stack DataHub, atur parameter ini ke false. |
HttpConfig
Parameter | Deskripsi |
readTimeout | Periode timeout baca/tulis soket. Satuan: detik. Nilai default: 10. |
connTimeout | Periode timeout koneksi TCP. Satuan: detik. Nilai default: 10. |
maxRetryCount | Jumlah maksimum percobaan ulang setelah kegagalan permintaan. Nilai default: 1. Kami sarankan Anda tidak mengubah nilai ini. Lapisan bisnis atas melakukan percobaan ulang. |
debugRequest | Menentukan apakah akan menampilkan log permintaan. Nilai default: false. |
compressType | Mode kompresi untuk transmisi data. Secara default, tidak ada mode kompresi yang digunakan. Mode kompresi LZ4 dan deflate didukung. |
proxyUri | Pengenal sumber daya seragam (URI) dari host proxy. |
proxyUsername | Nama pengguna yang diverifikasi oleh server proxy. |
proxyPassword | Kata sandi yang diverifikasi oleh server proxy. |
Statistik SDK Anda dapat menggunakan DataHub SDK untuk Java untuk mengumpulkan statistik pada permintaan pembacaan/tulisan data, seperti jumlah kueri yang diinisiasi per detik. Anda dapat memanggil metode berikut untuk mengumpulkan statistik:
ClientMetrics.startMetrics();Secara default, statistik pada metrik ditampilkan di file log. Dalam hal ini, Anda harus mengonfigurasi Simple Logging Facade for Java (SLF4J). Paket metrik berikut digunakan: com.aliyun.datahub.client.metrics.
Tulis data ke DataHub
Dalam contoh berikut, data ditulis ke topik tuple di DataHub.
// Tulis catatan tuple.
public static void tupleExample(String project,String topic,int retryTimes) {
// Dapatkan skema.
RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();
// Hasilkan 10 catatan.
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// Anda dapat menentukan atribut tambahan, seperti alamat IP dan nama mesin server, untuk setiap catatan. Jika Anda tidak menentukan atribut tambahan, penulisan data tidak terpengaruh.
recordEntry.addAttribute("key1", "value1");
TupleRecordData data = new TupleRecordData(recordSchema);
data.setField("field1", "HelloWorld");
data.setField("field2", 1234567);
recordEntry.setRecordData(data);
recordEntries.add(recordEntry);
}
try {
PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
int i = result.getFailedRecordCount();
if (i > 0) {
retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
}
} catch (DatahubClientException e) {
System.out.println("requestId:" + e.getRequestId() + "\tpesan:" + e.getErrorMessage());
}
}
// Mekanisme percobaan ulang.
public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
boolean suc = false;
while (retryTimes != 0) {
retryTimes = retryTimes - 1;
PutRecordsResult recordsResult = client.putRecords(project, topic, records);
if (recordsResult.getFailedRecordCount() > 0) {
retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
}
suc = true;
break;
}
if (!suc) {
System.out.println("retryFailure");
}
}Buat langganan untuk mengonsumsi data DataHub
// Kode sampel berikut memberikan contoh cara mengonsumsi data dari offset yang disimpan dan mengirimkan offset selama konsumsi.
public static void example() {
String shardId = "0";
List<String> shardIds = Arrays.asList("0", "1");
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1. Dapatkan kursor dari catatan di offset saat ini. Jika catatan kedaluwarsa atau belum dikonsumsi, dapatkan kursor dari catatan pertama dalam waktu hidup (TTL) topik.
String cursor = null;
// Jika nomor urutan kurang dari 0, catatan belum dikonsumsi.
if (subscriptionOffset.getSequence() < 0) {
// Dapatkan kursor dari catatan pertama dalam TTL topik.
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// Dapatkan kursor dari catatan berikutnya.
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
// Jika kesalahan SeekOutOfRange dikembalikan setelah Anda mendapatkan kursor berdasarkan nomor urutan, catatan dari kursor saat ini kedaluwarsa.
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// Dapatkan kursor dari catatan pertama dalam TTL topik.
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2. Baca catatan dan simpan offset. Sebagai contoh, baca catatan tuple dan simpan offset setiap kali 1.000 catatan dibaca.
long recordCount = 0L;
// Baca 10 catatan setiap kali.
int fetchNum = 10;
while (true) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// Jika tidak ada catatan yang dapat dibaca, jeda thread selama 1.000 ms dan lanjutkan membaca catatan.
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
// Konsumsi data.
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// Simpan offset setelah data dikonsumsi.
++recordCount;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
if (recordCount % 1000 == 0) {
// Kirim offset.
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
System.out.println("commit offset berhasil");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
// Sesi langganan keluar. Offline: Langganan offline. SubscriptionSessionInvalid: Langganan juga digunakan di klien lain.
break;
} catch (SubscriptionOffsetResetException e) {
// Offset diatur ulang. Anda harus mendapatkan informasi offset langganan lagi. Dalam contoh ini, nomor urutan diatur ulang.
// Jika timestamp diatur ulang, Anda harus menggunakan parameter CursorType.SYSTEM_TIME untuk mendapatkan kursor.
subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (DatahubClientException e) {
// TODO: Tentukan apakah akan keluar saat terjadi kesalahan.
} catch (Exception e) {
break;
}
}
}4.Jenis kesalahan
Bagian ini menjelaskan jenis kesalahan yang terkait dengan DataHub SDK untuk Java V2.12 dan yang lebih baru. Anda dapat mengonfigurasi mekanisme try-catch untuk menangkap jenis kesalahan dan memproses kesalahan. Hanya kesalahan DatahubClientException dan LimitExceededException yang dapat diselesaikan dengan percobaan ulang. Beberapa kesalahan DatahubClientException, seperti kesalahan yang disebabkan karena server sibuk atau tidak tersedia, dapat diselesaikan dengan percobaan ulang. Kami sarankan Anda menambahkan logika percobaan ulang ke kode untuk kesalahan DatahubClientException dan LimitExceededException. Namun, jumlah percobaan ulang harus dibatasi. Tabel berikut menjelaskan jenis kesalahan yang terkait dengan DataHub SDK untuk Java V2.12 dan yang lebih baru. File kesalahan disimpan di paket berikut: com.aliyun.datahub.client.exception.
Jenis kesalahan | Pesan kesalahan | Deskripsi |
InvalidParameterException | InvalidParameter, InvalidCursor | Pesan kesalahan dikembalikan karena parameter yang ditentukan tidak valid. |
ResourceNotFoundException | ResourceNotFound, NoSuchProject, NoSuchTopic, NoSuchShard, NoSuchSubscription, NoSuchConnector, NoSuchMeteringInfo | Pesan kesalahan dikembalikan karena sumber daya yang akan diakses tidak ada. Jika Anda segera mengirim permintaan lain setelah membagi atau menggabungkan shard, pesan kesalahan ini dikembalikan. |
ResourceAlreadyExistException | ResourceAlreadyExist, ProjectAlreadyExist, TopicAlreadyExist, ConnectorAlreadyExist | Pesan kesalahan dikembalikan karena sumber daya sudah ada. Jika sumber daya yang ingin Anda buat sudah ada, pesan kesalahan ini dikembalikan. |
SeekOutOfRangeException | SeekOutOfRange | Pesan kesalahan dikembalikan karena nomor urutan yang ditentukan tidak valid atau timestamp yang ditentukan lebih lambat dari timestamp saat ini saat Anda mendapatkan kursor. Nomor urutan mungkin menjadi tidak valid karena catatan dari kursor kedaluwarsa. |
AuthorizationFailureException | Unauthorized | Pesan kesalahan dikembalikan karena terjadi kesalahan saat tanda tangan otorisasi sedang diuraikan. Periksa apakah pasangan AccessKey valid. |
NoPermissionException | NoPermission, OperationDenied | Pesan kesalahan dikembalikan karena Anda tidak memiliki izin. Periksa apakah konfigurasi RAM valid atau pengguna RAM telah diberi otorisasi. |
ShardSealedException | InvalidShardOperation | Pesan kesalahan dikembalikan karena shard ditutup dan data tidak dapat dibaca dari atau ditulis ke shard. Jika Anda terus menulis data ke shard atau terus membaca data setelah catatan data terakhir dibaca dari shard, pesan kesalahan ini dikembalikan. |
LimitExceededException | LimitExceeded | Pesan kesalahan dikembalikan karena batas DataHub SDK untuk Java telah dilampaui. Untuk informasi lebih lanjut, lihat Batasan. |
SubscriptionOfflineException | SubscriptionOffline | Pesan kesalahan dikembalikan karena langganan offline dan tidak dapat digunakan. |
SubscriptionSessionInvalidException | OffsetSessionChanged, OffsetSessionClosed | Pesan kesalahan dikembalikan karena sesi langganan tidak normal. Saat langganan digunakan, sesi dibuat untuk mengirimkan offset. Jika langganan juga digunakan di klien lain, pesan kesalahan ini dikembalikan. |
SubscriptionOffsetResetException | OffsetReseted | Pesan kesalahan dikembalikan karena offset langganan diatur ulang. |
MalformedRecordException | MalformedRecord,ShardNotReady | Pesan kesalahan dikembalikan karena format catatan tidak valid. Ini mungkin disebabkan karena skema tidak valid, karakter non-UTF-8 ada, atau klien menggunakan protokol buffer (PB) tetapi server tidak mendukung protokol PB. |
DatahubClientException | Semua kesalahan lainnya. Jenis kesalahan ini adalah kelas dasar dari semua kesalahan. | Pesan kesalahan dikembalikan karena kesalahan tidak termasuk dalam jenis kesalahan sebelumnya. Jenis kesalahan ini dapat diselesaikan dengan percobaan ulang. Namun, jumlah percobaan ulang harus dibatasi. |
5.Metode
Mengelola proyek
Proyek merupakan unit dasar untuk pengelolaan data di DataHub dan mencakup beberapa topik. Proyek di DataHub bersifat independen dari proyek di MaxCompute. Anda tidak dapat menggunakan kembali proyek MaxCompute di DataHub, sehingga harus membuat proyek baru di DataHub.
Buat Proyek
Sintaks: CreateProjectResult createProject(String projectName, String comment)
Saat membuat proyek, Anda harus menetapkan nama proyek dan memasukkan deskripsi proyek. Nama proyek harus 3 hingga 32 karakter panjangnya, dan dapat berisi huruf, angka, dan garis bawah (_). Nama proyek harus dimulai dengan huruf dan tidak peka huruf besar/kecil.
Parameter
projectName: nama proyek.
comment: komentar pada proyek.
Kesalahan
DatahubClientException
Kode Sampel
public static void createProject(String projectName,String projectComment) {
try {
datahubClient.createProject(projectName, projectComment);
System.out.println("proyek berhasil dibuat");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Hapus Proyek
Sintaks: DeleteProjectResult deleteProject(String projectName). Pastikan proyek tidak berisi topik sebelum Anda menghapus proyek. Parameter: projectName: nama proyek.
Kesalahan
DatahubClientException
NoPermissionException: Jika proyek berisi topik, kesalahan ini dikembalikan.
Kode Sampel
public static void deleteProject(String projectName) {
try {
datahubClient.deleteProject(projectName);
System.out.println("proyek berhasil dihapus");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Perbarui Proyek
Sintaks: UpdateProjectResult updateProject(String projectName, String comment). Anda hanya dapat memperbarui komentar pada proyek. Parameter: projectName: nama proyek. comment: komentar pada proyek.
Kesalahan
DatahubClientException
Kode Sampel
public static void updateProject(String projectName,String newComment) {
try {
datahubClient.updateProject(projectName, newComment);
System.out.println("proyek berhasil diperbarui");
} catch (DatahubClientException e) {
System.out.println("kesalahan lain");
}
}Daftar Proyek
Sintaks: ListProjectResult listProject(). Hasil kembalian metode listProject adalah objek ListProjectResult, yang berisi daftar nama proyek.
Parameter: tidak ada
Kesalahan
DatahubClientException
Kode Sampel
public static void listProject() {
try {
ListProjectResult listProjectResult = datahubClient.listProject();
if (listProjectResult.getProjectNames().size() > 0) {
for (String pName : listProjectResult.getProjectNames()) {
System.out.println(pName);
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Kueri Proyek
Sintaks: GetProjectResult getProject(String projectName). Anda dapat memanggil metode getProject untuk melihat informasi atribut proyek saat ini. Parameter: projectName: nama proyek.
Kesalahan
DatahubClientException
Kode Sampel
public static void getProject(String projectName) {
try {
GetProjectResult getProjectResult = datahubClient.getProject(projectName );
System.out.println(getProjectResult.getCreateTime() + "\t"
+ getProjectResult.getLastModifyTime() + "\t"
+ getProjectResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Mengelola topik
Topik adalah unit terkecil untuk langganan dan publikasi data di DataHub. Anda dapat menggunakan topik untuk membedakan berbagai jenis data streaming. Dua jenis topik didukung: tuple dan blob.
Anda dapat menulis blok data biner sebagai catatan ke topik blob.
Topik tuple berisi catatan yang mirip dengan catatan data di database. Setiap catatan berisi beberapa kolom. Anda harus menentukan skema catatan untuk topik tuple karena data dalam topik tuple ditransmisikan sebagai string melalui jaringan. Oleh karena itu, skema diperlukan untuk konversi tipe data. Tabel berikut menjelaskan tipe data yang didukung.
Tipe | Deskripsi | Rentang Nilai |
BIGINT | Bilangan bulat delapan byte bertanda. | -9223372036854775807 hingga 9223372036854775807. |
DOUBLE | Bilangan titik mengambang presisi ganda. Panjangnya delapan byte. | -1,0 _10^308 hingga 1,0 _10^308. |
BOOLEAN | Tipe Boolean. | True dan False, true dan false, atau 0 dan 1. |
TIMESTAMP | Tipe timestamp. | Timestamp yang akurat hingga mikrodetik. |
STRING | String. Hanya pengkodean UTF-8 yang didukung. | Ukuran semua nilai dalam kolom tipe STRING tidak boleh melebihi 2 MB. |
TINYINT | Bilangan bulat satu byte. | -128 hingga 127. |
SMALLINT | Bilangan bulat dua byte. | -32768 hingga 32767. |
INTEGER | Bilangan bulat empat byte. | -2147483648 hingga 2147483647. |
FLOAT | Bilangan titik mengambang presisi tunggal empat byte. | -3,40292347_10^38 hingga 3,40292347_10^38. |
DataHub SDK untuk Java V2.16.1-public dan yang lebih baru mendukung TINYINT, SMALLINT, INTEGER, dan FLOAT.
Buat Topik Tuple
Sintaks: CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, String comment)
Parameter
projectName: nama proyek tempat Anda ingin membuat topik.
topicName: nama topik.
shardCount: jumlah shard awal dalam topik.
lifeCycle: TTL dari data. Satuan: hari. Data yang ditulis sebelum waktu tersebut tidak dapat diakses.
recordType: jenis catatan yang ingin Anda tulis. Nilai valid: TUPLE dan BLOB.
recordSchema: skema catatan untuk topik.
comment: komentar pada topik.
Kesalahan
DatahubClientException
Kode Sampel
public static void createTupleTopic(String projectName, String topicName, int shardCount, int lifeCycle, String topicComment) {
RecordSchema schema = new RecordSchema();
schema.addField(new Field("bigint_field", FieldType.BIGINT));
schema.addField(new Field("double_field", FieldType.DOUBLE));
schema.addField(new Field("boolean_field", FieldType.BOOLEAN));
schema.addField(new Field("timestamp_field", FieldType.TIMESTAMP));
schema.addField(new Field("tinyint_field", FieldType.TINYINT));
schema.addField(new Field("smallint_field", FieldType.SMALLINT));
schema.addField(new Field("integer_field", FieldType.INTEGER));
schema.addField(new Field("floar_field", FieldType.FLOAT));
schema.addField(new Field("decimal_field", FieldType.DECIMAL));
schema.addField(new Field("string_field", FieldType.STRING));
try {
datahubClient.createTopic(projectName,topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicComment);
System.out.println("topik berhasil dibuat");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Buat Topik Blob
Sintaks: CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, String comment)
Parameter
projectName: nama proyek tempat Anda ingin membuat topik.
topicName: nama topik.
shardCount: jumlah shard awal dalam topik.
lifeCycle: TTL dari data. Satuan: hari. Data yang ditulis sebelum waktu tersebut tidak dapat diakses.
recordType: jenis catatan yang ingin Anda tulis. Nilai valid: TUPLE dan BLOB.
comment: komentar pada topik.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
ResourceAlreadyExistException
Kode Sampel
public static void createBlobTopic(String projectName, String topicName, int shardCount, int lifeCycle, String topicComment) {
try {
datahubClient.createTopic(projectName, blobTopicName, shardCount, lifeCycle, RecordType.BLOB, topicComment);
System.out.println("topik berhasil dibuat");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}Hapus Topik
Pastikan topik tidak berisi langganan atau DataConnector sebelum Anda menghapus topik. Jika tidak, kesalahan NoPermission akan dilaporkan.
Sintaks: DeleteTopicResult deleteTopic(String projectName, String topicName)
Parameter
projectName: nama proyek tempat Anda ingin menghapus topik.
topicName: nama topik.
Kesalahan
DatahubClientException
NoPermissionException: Jika topik berisi langganan atau DataConnector, kesalahan ini dikembalikan.
Kode Sampel
public static void deleteTopic(String projectName, String topicName) {
try {
datahubClient.deleteTopic(projectName, topicName);
System.out.println("topik berhasil dihapus");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Daftar Topik
Sintaks: ListTopicResult listTopic(String projectName)
Parameter
projectName: nama proyek tempat Anda ingin mencantumkan proyek.
Kode Sampel
public static void listTopic(String projectName ) {
try {
ListTopicResult listTopicResult = datahubClient.listTopic(projectName);
if (listTopicResult.getTopicNames().size() > 0) {
for (String tName : listTopicResult.getTopicNames()) {
System.out.println(tName);
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Perbarui Topik
Anda dapat memperbarui komentar dan TTL topik.
Sintaks: UpdateTopicResult updateTopic(String projectName, String topicName, int lifeCycle, String comment)
Parameter
projectName: nama proyek tempat Anda ingin memperbarui topik.
topicName: nama topik.
comment: komentar yang diperbarui.
lifeCycle: TTL topik.
Kesalahan
DatahubClientException
Kode Sampel
public static void updateTopic(String projectName, String topicName, int lifeCycle, String comment) {
try {
comment = "komentar topik baru";
lifeCycle = 1;
datahubClient.updateTopic(projectName, Constant.topicName,lifeCycle, comment);
System.out.println("topik berhasil diperbarui");
// Lihat hasil pembaruan.
GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
System.out.println(getTopicResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Kueri Topik
Sintaks: GetTopicResult getTopic(String projectName, String topicName). Anda dapat memanggil metode getTopic untuk mendapatkan informasi atribut tentang topik.
Parameter
projectName: nama proyek tempat Anda ingin menanyakan topik.
topicName: nama topik.
Kesalahan
DatahubClientException
Kode Sampel
public static void getTopic(String projectName, String topicName) {
try {
GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
System.out.println(getTopicResult.getShardCount() + "\t"
+ getTopicResult.getLifeCycle() + "\t"
+ getTopicResult.getRecordType() + "\t"
+ getTopicResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Tambahkan Bidang ke Topik Tuple
Anda dapat menambahkan satu bidang atau beberapa bidang sekaligus.
Sintaks: AppendFieldResult appendField(String projectName, String topicName, Field field)
Parameter
projectName: nama proyek tempat topik yang ingin Anda tambahkan bidangnya berada.
topicName: nama topik.
fields: bidang yang ingin ditambahkan. Semua bidang dapat disetel ke null.
Kesalahan
DatahubClientException
Kode Sampel
public static void appendNewField(String projectName,String topicName) {
try {
Field newField = new Field("newField", FieldType.STRING, true,"komentar");
datahubClient.appendField(projectName, topicName, newField);
System.out.println("bidang berhasil ditambahkan");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}AppendFieldResult appendField(String projectName, String topicName, List fields);
Parameter
projectName: nama proyek tempat topik yang ingin Anda tambahkan bidangnya berada.
topicName: nama topik.
fields: bidang yang ingin ditambahkan. Semua bidang dapat disetel ke null.
Kesalahan
DatahubClientException
Kode Sampel
public static void appendNewField(String projectName,String topicName) {
try {
List<Field> list = new ArrayList<>();
Field newField1 = new Field("newField1", FieldType.STRING, true,"komentar");
list.add(newField1);
datahubClient.appendField(projectName, topicName, list);
System.out.println("bidang berhasil ditambahkan");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Mengelola shard
Shard adalah terowongan konkuren yang digunakan untuk transmisi data dalam sebuah topik. Setiap shard memiliki ID dan dapat berada dalam status yang berbeda: Opening, ketika shard sedang dimulai; dan Active, ketika shard telah dimulai dan siap digunakan untuk menyediakan layanan. Setiap shard aktif mengonsumsi sumber daya server, oleh karena itu, disarankan untuk membuat shard sesuai dengan kebutuhan.
Daftar Shard
Sintaks: ListShardResult listShard(String projectName, String topicName)
Parameter
projectName: nama proyek.
topicName: nama topik.
Kesalahan
DatahubClientException
Kode Sampel
public static void listShard(String projectName, String topicName) {
try {
ListShardResult listShardResult = datahubClient.listShard(projectName, topicName);
if (listShardResult.getShards().size() > 0) {
for (ShardEntry entry : listShardResult.getShards()) {
System.out.println(entry.getShardId() + "\t"
+ entry.getState() + "\t"
+ entry.getLeftShardId() + "\t"
+ entry.getRightShardId());
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Pisahkan Shard
Anda dapat membagi shard aktif dari topik tertentu. Setelah shard dibagi, dua shard aktif baru dihasilkan, sedangkan shard asli ditutup. Anda hanya dapat membaca data dari shard yang ditutup, tetapi tidak dapat menulis data ke shard tersebut. Anda dapat menggunakan kunci pisah default atau menentukan kunci pisah untuk membagi shard.
Sintaks: SplitShardResult splitShard(String projectName, String topicName, String shardId), atau SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey)
Parameter
projectName: nama proyek.
topicName: nama topik.
shardId: ID shard yang ingin dibagi.
splitKey: kunci pisah yang digunakan untuk membagi shard.
Kesalahan
DatahubClientException
Kode Sampel
public static void splitShard(String projectName, String topicName, String shardId) {
try {
shardId = "0";
SplitShardResult splitShardResult = datahubClient.splitShard(projectName, topicName, shardId);
for (ShardEntry entry : splitShardResult.getNewShards()) {
System.out.println(entry.getShardId());
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Gabungkan Shard
Dua shard aktif yang ingin digabungkan dalam sebuah topik harus berdekatan satu sama lain. Untuk informasi lebih lanjut tentang dua shard yang berdekatan dari sebuah shard, lihat hasil yang dikembalikan oleh metode listShard.
Sintaks: MergeShardResult mergeShard(String projectName, String topicName, String shardId, String adjacentShardId)
Parameter
projectName: nama proyek.
topicName: nama topik.
shardId: ID shard yang ingin digabungkan.
adjacentShardId: ID shard yang berdekatan dengan shard yang ditentukan.
Kesalahan
DatahubClientException
Kode Sampel
public static void mergeShard() {
try {
String shardId = "7";
// Nilai parameter adjacentShardId dan shardId harus berdekatan. Untuk informasi lebih lanjut tentang shard yang berdekatan dari sebuah shard, lihat hasil yang dikembalikan oleh metode listShard.
String adjacentShardId = "8";
MergeShardResult mergeShardResult = datahubClient.mergeShard(Constant.projectName, Constant.topicName, shardId, adjacentShardId);
System.out.println("penggabungan berhasil");
System.out.println(mergeShardResult.getShardId());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Perluas Shard
Jumlah shard yang ingin diperluas harus lebih besar dari atau sama dengan jumlah shard awal.
Sintaks: ExtendShardResult extendShard(String projectName, String topicName, int shardCount)
Parameter
projectName: nama proyek.
topicName: nama topik.
shardCount: jumlah shard yang ingin diperluas.
adjacentShardId: ID shard yang berdekatan dengan shard yang ditentukan.
Kesalahan
DatahubClientException
Kode Sampel
public static void extendTopic(String projectName, String topicName, int shardCount) { try { ExtendShardResult extendShardResult = datahubClient.extendShard(projectName, topicName, shardCount); } catch (DatahubClientException e) { System.out.println(e.getErrorMessage()); } }
Baca dan tulis data
Anda dapat membaca data dari shard aktif dan tertutup, tetapi hanya dapat menulis data ke shard aktif.
Baca Data
Untuk membaca data, Anda harus terlebih dahulu mendapatkan kursor lalu meneruskan nilai kursor ke metode getRecords. Atau, Anda dapat menggunakan fitur langganan DataHub untuk langsung mengaitkan langganan untuk mengonsumsi data. Dalam hal ini, server secara otomatis menyimpan offset konsumsi. Jika Anda ingin mengambil sampel data untuk melihat kualitas data, Anda dapat membaca data.
Dapatkan Kursor
Untuk membaca data dari sebuah topik, tentukan shard dan kursor dari mana data mulai dibaca. Anda dapat mendapatkan kursor dengan menggunakan metode berikut: OLDEST, LATEST, SEQUENCE, dan SYSTEM_TIME.
OLDEST: kursor yang menunjuk ke catatan valid paling awal dalam shard yang ditentukan.
LATEST: kursor yang menunjuk ke catatan terbaru dalam shard yang ditentukan.
SEQUENCE: kursor yang menunjuk ke catatan nomor urutan yang ditentukan.
SYSTEM_TIME: Kursor yang menunjuk ke catatan pertama dengan nilai timestamp lebih besar dari atau sama dengan timestamp yang ditentukan.
Pilih Metode untuk Mendapatkan Kursor
Data yang ingin dibaca harus valid, yang berarti data harus berada dalam TTL. Jika tidak, kesalahan akan dilaporkan.
Skenario 1: Baca data dari awal shard. Dalam hal ini, kami sarankan Anda menggunakan metode OLDEST. Jika semua data dalam shard valid, data mulai dibaca dari catatan pertama.
Skenario 2: Ambil sampel data untuk memeriksa apakah data yang nilainya timestamp lebih besar dari timestamp yang ditentukan valid. Dalam hal ini, kami sarankan Anda menggunakan metode SYSTEM_TIME. Data mulai dibaca dari catatan pertama setelah catatan pada timestamp yang ditentukan.
Skenario 3: Lihat informasi data terbaru. Dalam hal ini, kami sarankan Anda menggunakan metode LATEST. Anda dapat menggunakan metode ini untuk membaca catatan terbaru atau N catatan terbaru. Untuk mendapatkan N catatan terbaru, Anda harus terlebih dahulu mendapatkan nomor urutan catatan terbaru. Kemudian, identifikasi N catatan sebelumnya dari catatan terbaru. Nomor urutan pertama dari N catatan sebelumnya adalah nomor urutan catatan terbaru dikurangi N.
Sintaks: GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type), atau GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param)
Parameter
projectName: nama proyek.
topicName: nama topik.
shardId: ID shard.
CursorType: jenis kursor.
Kesalahan
DatahubClientException
SeekOutOfRangeException
Kode Sampel
Jika Anda ingin mengambil sampel data, ubah waktu menjadi timestamp. Lalu, dapatkan kursor.
public static void getcursor(String projectName,String topicName) {
String shardId = "5";
try {
// Ubah waktu menjadi timestamp.
String time = "2019-07-01 10:00:00";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timestamp = 0L;
try {
Date date = simpleDateFormat.parse(time);
timestamp = date.getTime(); // Dapatkan timestamp yang sesuai dengan waktu.
//System.out.println(timestamp);
}
// Dapatkan kursor dari mana data mulai dibaca setelah timestamp.
String timeCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
System.out.println("dapatkan kursor berhasil");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
} catch (ParseException e) {
System.out.println(e.getErrorOffset());
}
}Baca catatan paling awal dalam shard yang ditentukan.
public static void getcursor(String projectName,String topicName) {
String shardId = "5";
try {
/* Gunakan metode OLDEST. */
String oldestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
System.out.println("dapatkan kursor berhasil");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Baca data terbaru yang ditulis ke shard yang ditentukan, yang melibatkan dua skenario berikut:
Baca catatan terbaru yang ditulis ke shard yang ditentukan.
Baca N catatan terbaru yang ditulis ke shard yang ditentukan.
Anda harus terlebih dahulu mendapatkan nomor urutan catatan terbaru lalu mendapatkan kursor.
public static void getcursor(String projectName,String topicName) {
String shardId = "5";
try {
/* Gunakan metode LATEST. */
String latestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getCursor();
/* Gunakan metode SEQUENCE. */
// Dapatkan nomor urutan catatan terbaru.
long seq = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getSequence();
// Dapatkan kursor untuk membaca 10 catatan terbaru.
String seqCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
}
catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Metode Pembacaan Data
Sintaks: GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit), atau GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit)
Parameter
projectName: nama proyek.
topicName: nama topik.
shardId: ID shard.
schema: skema yang diperlukan saat Anda membaca catatan dari topik tuple.
cursor: kursor dari mana data mulai dibaca.
limit: jumlah maksimum catatan yang ingin dibaca.
Kesalahan
DatahubClientException
Kode Sampel
Baca Catatan dari Topik Tuple
public static void example(String projectName,String topicName) {
// Jumlah maksimum catatan yang ingin dibaca setiap kali.
int recordLimit = 1000;
String shardId = "7";
// Dapatkan kursor dari catatan valid paling awal.
// Catatan: Secara umum, Anda hanya memanggil metode getCursor selama inisialisasi. Setelah itu, Anda dapat memanggil metode getNextCursor untuk terus mengonsumsi data.
String cursor = "";
try {
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
while (true) {
try {
GetRecordsResult result = datahubClient.getRecords(projectName, topicName, shardId, recordSchema, cursor, recordLimit);
if (result.getRecordCount() <= 0) {
// Jika tidak ada catatan yang dapat dibaca, jeda thread selama 10.000 ms dan lanjutkan membaca catatan.
Thread.sleep(10000);
continue;
}
for (RecordEntry entry : result.getRecords()) {
TupleRecordData data = (TupleRecordData) entry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
}
// Dapatkan kursor berikutnya.
cursor = result.getNextCursor();
} catch (InvalidCursorException ex) {
// Kursor tidak valid atau telah kedaluwarsa. Tentukan kursor lain untuk memulai konsumsi.
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());;
}
}
}Baca Catatan dari Topik Blob
public static void example(String projectName,String topicName) {
// Jumlah maksimum catatan yang ingin dibaca setiap kali.
int recordLimit = 1000;
String shardId = "7";
// Dapatkan kursor dari catatan valid paling awal.
// Catatan: Secara umum, Anda hanya memanggil metode getCursor selama inisialisasi. Setelah itu, Anda dapat memanggil metode getNextCursor untuk terus mengonsumsi data.
String cursor = "";
try {
cursor = datahubClient.getCursor(projectName, blobTopicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
while (true) {
try {
GetRecordsResult result = datahubClient.getRecords(projectName, blobTopicName, shardId, recordSchema, cursor, recordLimit);
if (result.getRecordCount() <= 0) {
// Jika tidak ada catatan yang dapat dibaca, jeda thread selama 10.000 ms dan lanjutkan membaca catatan.
Thread.sleep(10000);
continue;
}
/* Konsumsi data. */
for (RecordEntry record: result.getRecords()){
BlobRecordData data = (BlobRecordData) record.getRecordData();
System.out.println(new String(data.getData()));
}
// Dapatkan kursor berikutnya.
cursor = result.getNextCursor();
} catch (InvalidCursorException ex) {
// Kursor tidak valid atau telah kedaluwarsa. Tentukan kursor lain untuk memulai konsumsi.
cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}Tulis Data
Pada DataHub SDK for Java versi V2.12 dan yang lebih baru, server mendukung metode PutRecordsByShardResult. Pada versi sebelum V2.12, server mendukung metode putRecords. Untuk memanggil metode putRecordsByShard, Anda harus menentukan shard tempat data akan ditulis. Jika tidak ditentukan, data akan secara default ditulis ke shard aktif pertama. Parameter input untuk kedua metode tersebut adalah daftar catatan dengan tipe yang sama, seperti tuple atau blob. DataHub SDK for Java memungkinkan Anda menulis data per shard menggunakan metode putRecordsByShard atau menulis data dalam mode campuran melalui metode putRecords. Anda dapat menulis data ke DataHub per shard pada DataHub SDK for Java V2.12 dan yang lebih baru. Saat menggunakan metode putRecords, pastikan untuk memeriksa hasil kembalian guna memastikan bahwa data berhasil ditulis ke DataHub. Jika menggunakan metode putRecordsByShard tetapi penulisan data gagal, kesalahan akan dilaporkan. Jika server Anda mendukung metode putRecordsByShard, disarankan untuk menggunakan metode putRecordsByShard.
Sintaks: PutRecordsResult putRecords(String projectName, String topicName, List records), atau PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records).
Parameter
projectName: Nama proyek.
topicName: Nama topik.
shardId: ID shard.
records: Daftar catatan yang akan ditulis ke DataHub.
Kesalahan
DatahubClientException
Tulis Catatan ke Topik Tuple
// Tulis catatan tuple.
public static void tupleExample(String project,String topic,int retryTimes) {
// Dapatkan skema.
RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();
// Hasilkan 10 catatan.
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// Anda dapat menentukan atribut tambahan, seperti alamat IP dan nama mesin server, untuk setiap catatan. Jika Anda tidak menentukan atribut tambahan, penulisan data tidak terpengaruh.
recordEntry.addAttribute("key1", "value1");
TupleRecordData data = new TupleRecordData(recordSchema);
data.setField("field1", "HelloWorld");
data.setField("field2", 1234567);
recordEntry.setRecordData(data);
recordEntries.add(recordEntry);
}
try {
PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
int i = result.getFailedRecordCount();
if (i > 0) {
retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
}
} catch (DatahubClientException e) {
System.out.println("requestId:" + e.getRequestId() + "\tpesan:" + e.getErrorMessage());
}
}
// Mekanisme percobaan ulang.
public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
boolean suc = false;
while (retryTimes != 0) {
retryTimes = retryTimes - 1;
PutRecordsResult recordsResult = client.putRecords(project, topic, records);
if (recordsResult.getFailedRecordCount() > 0) {
retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
}
suc = true;
break;
}
if (!suc) {
System.out.println("kegagalan percobaan ulang");
}
}
'''Java
<br />
<br />** Tulis catatan ke topik blob**<br />
'''Java
// Tulis catatan blob.
public static void blobExample() {
// Hasilkan 10 catatan.
List<RecordEntry> recordEntries = new ArrayList<>();
String shardId = "4";
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// Tentukan atribut tambahan untuk setiap catatan.
recordEntry.addAttribute("key1", "value1");
BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
recordEntry.setRecordData(data);
recordEntry.setShardId(shardId);
recordEntries.add(recordEntry);
recordEntry.setShardId("0");
}
while (true) {
try {
// Di DataHub SDK untuk Java V2.12 dan yang lebih baru, server mendukung metode PutRecordsByShardResult. Jika versi SDK Anda lebih lama dari V2.12, gunakan metode putRecords.
//datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
System.out.println("penulisan data berhasil");
break;
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}Tulis catatan dalam berbagai mode
Jika versi SDK Anda lebih lama dari V2.12, Anda hanya dapat menulis catatan dengan memanggil metode putRecords. Kelas RecordEntry mencakup tiga atribut berikut: shardId, partitionKey, dan hashKey. Anda dapat menentukan nilai atribut tersebut untuk menentukan shard tempat catatan akan ditulis.
Pada DataHub SDK untuk Java versi 2.12 dan yang lebih baru, disarankan untuk menggunakan metode putRecordsByShard saat menulis catatan. Hal ini membantu mencegah penurunan performa akibat repartisi di server.
Tulis catatan berdasarkan ID shard. Mode ini direkomendasikan. Berikut adalah kode sampel:
RecordEntry entry = new RecordEntry();
entry.setShardId("0");Tulis catatan berdasarkan kunci hash. Dalam mode ini, tentukan nilai algoritma pesan ringkas 5 (MD5) 128-bit. Jika Anda menulis catatan berdasarkan kunci hash, parameter BeginHashKey dan EndHashKey digunakan untuk menentukan shard tempat catatan akan ditulis. Kode sampel:
RecordEntry entry = new RecordEntry();
entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");Tulis catatan berdasarkan kunci partisi. Dalam mode ini, tentukan string sebagai kunci partisi. Shard tempat catatan ditulis akan ditentukan berdasarkan nilai MD5 dari string tersebut dan parameter BeginHashKey dan EndHashKey. Berikut adalah kode sampel:
RecordEntry entry = new RecordEntry();
entry.setPartitionKey("TestPartitionKey");Kueri informasi metering
Kueri Informasi Metering
Sintaks: GetMeterInfoResult getMeterInfo(String projectName, String topicName, String shardId)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
shardId: ID shard.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Kode Sampel
public static void getMeter(String projectName,String topicName) {
String shardId = "5";
try {
GetMeterInfoResult getMeterInfoResult = datahubClient.getMeterInfo(projectName, topicName, shardId);
System.out.println("dapatkan meter berhasil");
System.out.println(getMeterInfoResult.getActiveTime() + "\t" + getMeterInfoResult.getStorage());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Mengelola langganan
DataHub memungkinkan server untuk menyimpan offset konsumsi langganan. Anda dapat memperoleh layanan penyimpanan offset dengan tingkat ketersediaan tinggi melalui konfigurasi yang sederhana.
Buat Langganan
Sintaks: CreateSubscriptionResult createSubscription(String projectName, String topicName, String comment)
Komentar pada langganan harus dalam format berikut: {"application":"Aplikasi","description":"Deskripsi"}.
Parameter
projectName: Nama proyek.
topicName: Nama topik.
comment: Komentar pada langganan.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Kode Sampel
public static void createSubscription(String projectName,String topicName) {
try {
CreateSubscriptionResult createSubscriptionResult = datahubClient.createSubscription(projectName, topicName, Constant.subscribtionComment);
System.out.println("langganan berhasil dibuat");
System.out.println(createSubscriptionResult.getSubId());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Hapus Langganan
Sintaks: DeleteSubscriptionResult deleteSubscription(String projectName, String topicName, String subId)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
subId: ID langganan.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Kode Sampel
public static void deleteSubscription(String projectName,String topicName,String subId) {
try {
datahubClient.deleteSubscription(projectName, topicName, subId);
System.out.println("langganan berhasil dihapus");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Perbarui Langganan
Anda hanya dapat memperbarui komentar pada langganan yang sudah ada.
Sintaks: UpdateSubscriptionResult updateSubscription(String projectName, String topicName, String subId, String comment)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
subId: ID langganan.
comment: Komentar yang ingin diperbarui.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Kode Sampel
public static void updateSubscription(String projectName, String topicName, String subId, String comment){
try {
datahubClient.updateSubscription(projectName,topicName,subId,comment)
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Daftar Langganan
Parameter pageNum dan pageSize pada metode listSubscription menentukan rentang langganan yang akan dicantumkan. Sebagai contoh, dengan mengatur pageNum dan pageSize ke 1 dan 10, Anda dapat mencantumkan 10 langganan pertama. Contoh lainnya adalah dengan mengatur pageNum dan pageSize ke 2 dan 5 untuk mencantumkan langganan keenam hingga kesepuluh.
Sintaks: ListSubscriptionResult listSubscription(String projectName, String topicName, int pageNum, int pageSize)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
pageNum: Nomor halaman yang akan dikembalikan.
pageSize: Jumlah entri yang akan dikembalikan per halaman.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Kode Sampel
Kode Contoh
public static void listSubscription(String projectName, String topicName, int pageNum, int pageSize) {
try {
ListSubscriptionResult listSubscriptionResult = datahubClient.listSubscription(projectName, topicName, pageNum, pageSize);
if (listSubscriptionResult.getSubscriptions().size() > 0) {
System.out.println(listSubscriptionResult.getTotalCount());
System.out.println(listSubscriptionResult.getSubscriptions().size());
for (SubscriptionEntry entry : listSubscriptionResult.getSubscriptions()) {
System.out.println(entry.getSubId() + "\t"
+ entry.getState() + "\t"
+ entry.getType() + "\t"
+ entry.getComment());
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Subscription Query
Sintaks: GetSubscriptionResult getSubscription(String projectName, String topicName, String subId)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
subId: ID langganan.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
public static void getSubscription(String projectName, String topicName, String subId) {
try {
GetSubscriptionResult getSubscriptionResult = datahubClient.getSubscription(projectName, topicName, subId);
System.out.println(getSubscriptionResult.getSubId() + "\t"
+ getSubscriptionResult.getState() + "\t"
+ getSubscriptionResult.getType() + "\t"
+ getSubscriptionResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Perbarui Status Langganan
Langganan dapat berada dalam status OFFLINE atau ONLINE, yang menunjukkan apakah langganan tersebut offline atau online.
Sintaks: UpdateSubscriptionStateResult updateSubscriptionState(String projectName, String topicName, String subId, SubscriptionState state)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
subId: ID langganan.
state: Status yang ingin diperbarui.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Kode Sampel
public static void updateSubscriptionState(String projectName, String topicName,String subId) {
try {
datahubClient.updateSubscriptionState(projectName, topicName, subId, SubscriptionState.ONLINE);
System.out.println("status langganan berhasil diperbarui");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Mengelola offset
Setelah langganan dibuat, awalnya belum dikonsumsi. Untuk menggunakan fitur penyimpanan offset langganan, lakukan operasi berikut pada offset:
Inisialisasi Offset
Untuk menginisialisasi offset, cukup panggil metode openSubscriptionSession satu kali. Pemanggilan metode ini akan menghasilkan ID sesi konsumsi baru. Setelah itu, sesi sebelumnya menjadi tidak valid, dan Anda tidak dapat mengirimkan offset.
Sintaks: OpenSubscriptionSessionResult openSubscriptionSession(String projectName, String topicName, String subId, List shardIds)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
subId: ID langganan.
shardIds: ID shard.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Kode Sampel
public static void openSubscriptionSession(String projectName, String topicName) {
shardId = "4";
shardIds = new ArrayList<String>();
shardIds.add("0");
shardIds.add("4");
try {
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
System.out.println(subscriptionOffset.getSessionId() + "\t"
+ subscriptionOffset.getVersionId() + "\t"
+ subscriptionOffset.getSequence());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Dapatkan Offset
Sintaks: GetSubscriptionOffsetResult getSubscriptionOffset(String projectName, String topicName, String subId, List shardIds)
Metode getSubscriptionOffset mengembalikan objek GetSubscriptionOffsetResult, yang secara fundamental serupa dengan hasil kembalian metode openSubscriptionSession. Namun, objek GetSubscriptionOffsetResult tidak mencakup ID sesi offset. Anda hanya dapat memanggil metode getSubscriptionOffset untuk melihat informasi terkait offset.
Parameter
projectName: Nama proyek.
topicName: Nama topik.
subId: ID langganan.
shardIds: ID shard.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Kode Sampel
// Dapatkan offset.
public static void getSubscriptionOffset(String projectName, String topicName,String subId) {
shardId = "4";
shardIds = new ArrayList<String>();
shardIds.add("0");
shardIds.add("4");
try {
GetSubscriptionOffsetResult getSubscriptionOffsetResult = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = getSubscriptionOffsetResult.getOffsets().get(shardId);
System.out.println(subscriptionOffset.getVersionId() + "\t"
+ subscriptionOffset.getSequence());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Kirim offset
Sintaks: CommitSubscriptionOffsetResult commitSubscriptionOffset(String projectName, String topicName, String subId, Map offsets)
Saat mengirim offset, DataHub akan memverifikasi nilai parameter versionId dan sessionId untuk memastikan nilainya sesuai dengan sesi saat ini. Informasi offset yang dikirimkan tidak terbatas. Disarankan untuk memasukkan nomor urutan dan timestamp aktual dari catatan.
Parameter
projectName: Nama proyek.
topicName: Nama topik.
subId: ID langganan.
offsets: Peta offset shard.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
SubscriptionOffsetResetException
SubscriptionSessionInvalidException
SubscriptionOfflineException
Kode Sampel
// Kirim offset.
public static void commitSubscriptionOffset(String projectName, String topicName,String subId) {
while (true) {
try {
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// Kode sampel ini hanya digunakan untuk pengujian. Untuk kode lengkap, lihat kode sampel yang memberikan contoh cara mengonsumsi data dari offset konsumsi yang disimpan dan mengirimkan offset selama konsumsi.
subscriptionOffset.setSequence(10);
subscriptionOffset.setTimestamp(100);
Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
offsets.put(shardId, subscriptionOffset);
// Kirim offset.
datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsets);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
}Atur Ulang Offset
Sintaks: ResetSubscriptionOffsetResult resetSubscriptionOffset(String projectName, String topicName, String shardId, Map offsets)
Anda dapat mengatur ulang offset ke titik waktu tertentu. Jika beberapa catatan terlibat pada titik waktu tersebut, offset yang diatur ulang akan menunjuk ke catatan pertama yang relevan. Setelah pengaturan ulang, informasi offset diperbarui, dan ID versi juga diperbarui. Jika tugas yang sedang berjalan mengirim offset menggunakan ID versi sebelumnya, kesalahan SubscriptionOffsetResetException akan dilaporkan. Untuk mendapatkan ID versi baru, Anda dapat memanggil metode getSubscriptionOffset.
Parameter
projectName: Nama proyek.
topicName: Nama topik.
subId: ID langganan.
offsets: Peta offset shard.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Kode Sampel
// Atur ulang offset.
public static void resetSubscriptionOffset(String projectName, String topicName) throws ParseException {
List<String> shardIds = Arrays.asList("0");
// Tentukan waktu ke mana Anda ingin mengatur ulang offset dan ubah waktu menjadi timestamp.
String time = "2019-07-09 10:00:00";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(time);
long timestamp = date.getTime(); // Dapatkan timestamp yang sesuai dengan waktu.
long sequence = client.getCursor(projectName, topicName, subId, CursorType.SYSTEM_TIME, timestamp).getSequence();
SubscriptionOffset offset = new SubscriptionOffset();
offset.setTimestamp(timestamp);
offset.setSequence(sequence);
Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
for (String shardId : shardIds) {
offsets.put(shardId, offset);
}
try {
datahubClient.resetSubscriptionOffset(projectName, topicName, subId, offsets);
System.out.println("atur ulang berhasil");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Asosiasikan langganan untuk mengonsumsi data di DataHub
Sebagaimana membaca data dari DataHub, Anda dapat mengaitkan langganan untuk mengonsumsi data di DataHub. Langganan ini menyimpan offset konsumsi, yang dapat Anda pilih sesuai kebutuhan.
Catatan Penggunaan:
Panggil metode openSubscriptionSession untuk menginisialisasi offset serta mendapatkan ID versi dan ID sesi langganan. Metode ini hanya dapat dipanggil sekali untuk inisialisasi offset. Jika dipanggil lebih dari sekali, sesi sebelumnya akan menjadi tidak valid, dan Anda tidak dapat mengirim offset.
Panggil metode getCursor untuk mendapatkan offset catatan dalam langganan guna mengonsumsi data. Setelah mengonsumsi catatan pertama, gunakan metode getNextCursor untuk mendapatkan offset catatan berikutnya dan lanjutkan proses konsumsi data.
Panggil metode commitSubscriptionOffset untuk mengirim offset. Saat mengirim offset, ID versi dan ID sesi langganan perlu diverifikasi. Pastikan ID versi dan ID sesi sesuai dengan yang ada di sesi saat ini.
// Kode sampel berikut memberikan contoh cara mengonsumsi data dari offset konsumsi yang disimpan dan mengirimkan offset selama konsumsi.
public static void example(String projectName, String topicName,String subId) {
String shardId = "0";
List<String> shardIds = Arrays.asList("0", "1");
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1. Dapatkan kursor dari catatan di offset saat ini. Jika catatan kedaluwarsa atau belum dikonsumsi, dapatkan kursor dari catatan pertama dalam waktu hidup (TTL) topik.
String cursor = null;
// Jika nomor urutan lebih kecil dari 0, catatan belum dikonsumsi.
if (subscriptionOffset.getSequence() < 0) {
// Dapatkan kursor dari catatan pertama dalam TTL topik.
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// Dapatkan kursor dari catatan berikutnya.
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
// Jika kesalahan SeekOutOfRange dikembalikan setelah Anda mendapatkan kursor berdasarkan nomor urutan, catatan dari kursor saat ini kedaluwarsa.
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// Dapatkan kursor dari catatan pertama dalam TTL topik.
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2. Baca catatan dan simpan offset. Sebagai contoh, baca catatan tuple dan simpan offset setiap kali 1.000 catatan dibaca.
long recordCount = 0L;
// Baca 10 catatan setiap kali.
int fetchNum = 10;
while (true) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// Jika tidak ada catatan yang dapat dibaca, jeda thread selama 1.000 ms dan lanjutkan membaca catatan.
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
// Konsumsi data.
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// Simpan offset setelah data dikonsumsi.
++recordCount;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
if (recordCount % 1000 == 0) {
// Kirim offset.
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
System.out.println("offset berhasil dikirim");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | OffsetSessionChangedException e) {
// Sesi langganan keluar. Offline: Langganan offline. SessionChange: Langganan juga digunakan di klien lain.
break;
} catch (OffsetResetedException e) {
// Offset diatur ulang. Anda harus mendapatkan informasi offset langganan lagi. Dalam contoh ini, nomor urutan diatur ulang.
// Jika timestamp diatur ulang, Anda harus menggunakan parameter CursorType.SYSTEM_TIME untuk mendapatkan kursor.
subscriptionOffset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (DatahubClientException e) {
// TODO: Tentukan apakah akan keluar saat terjadi kesalahan.
} catch (Exception e) {
break;
}
}
}Mengelola DataConnectors
DataConnector di DataHub menyinkronkan data streaming dari DataHub ke layanan cloud lainnya. Anda dapat menggunakan DataConnector untuk menyinkronkan data dari topik DataHub ke MaxCompute, Object Storage Service (OSS), ApsaraDB RDS for MySQL, Tablestore, Elasticsearch, dan Function Compute dalam mode real-time atau hampir real-time. Setelah DataConnector dikonfigurasi, data yang ditulis ke DataHub dapat digunakan di layanan Alibaba Cloud lainnya.
Buat DataConnector
Sintaks: CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, List columnFields, SinkConfig config), atau CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, long sinkStartTime, List columnFields, SinkConfig config).
Parameter
projectName: Nama proyek.
topicName: Nama topik.
ConnectorType: Jenis DataConnector yang ingin dibuat.
columnFields: Bidang yang akan disinkronkan.
sinkStartTime: Waktu mulai data disinkronkan ke DataHub. Satuan: milidetik.
config: Detail konfigurasi untuk tipe DataConnector tertentu.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Berikut ini adalah contoh kode yang menunjukkan cara membuat DataConnector untuk menyinkronkan data dari DataHub ke MaxCompute:
public static void createConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
SinkOdpsConfig config = new SinkOdpsConfig() {{
setEndpoint(Constant.odps_endpoint);
setProject(Constant.odps_project);
setTable(Constant.odps_table);
setAccessId(Constant.odps_accessId);
setAccessKey(Constant.odps_accessKey);
setPartitionMode(PartitionMode.SYSTEM_TIME);
setTimeRange(60);
}};
// Tentukan format partisi.
SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
addConfig("ds", "%Y%m%d");
addConfig("hh", "%H");
addConfig("mm", "%M");
}};
config.setPartitionConfig(partitionConfig);
try {
// Buat DataConnector.
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ODPS, columnFields, config);
System.out.println("DataConnector berhasil dibuat");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Berikut adalah contoh kode untuk membuat DataConnector guna menyinkronkan data dari DataHub ke OSS:
public static void createOssConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
SinkOssConfig config = new SinkOssConfig() {{
setAccessId(Constant.oss_accessId);
setAccessKey(Constant.oss_accessKey);
setAuthMode(AuthMode.STS);
setBucket(Constant.oss_bucket);
setEndpoint(Constant.oss_endpoint);
setPrefix(Constant.oss_prefix);
setTimeFormat(Constant.oss_timeFormat);
setTimeRange(60);
}};
try {
// Buat DataConnector.
datahubClient.createConnector(projectName,topicName, ConnectorType.SINK_OSS, columnFields, config);
System.out.println("DataConnector berhasil dibuat");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Berikut adalah contoh kode untuk membuat DataConnector guna menyinkronkan data dari DataHub ke Tablestore:
public static void createOtsConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkOtsConfig config = new SinkOtsConfig() {{
setAccessId(Constant.ots_accessId);
setAccessKey(Constant.ots_accessKey);
setEndpoint(Constant.ots_endpoint);
setInstance(Constant.ots_instance);
setTable(Constant.ots_table);
setAuthMode(AuthMode.AK);
}};
try {
// Buat DataConnector.
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_OTS, columnFields, config);
System.out.println("DataConnector berhasil dibuat");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Berikut adalah contoh kode yang menunjukkan cara membuat DataConnector untuk menyinkronkan data dari DataHub ke Hologres:
public static void createHoloConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkHologresConfig config = new SinkHologresConfig() {{
setAccessId(Constant.accessId);
setAccessKey(Constant.accessKey);
setProjectName(Constant.projectName);
setTopicName(Constant.topicName);
setAuthMode(AuthMode.AK);
setInstanceId(Constant.instanceId);
// Atur unit timestamp.
setTimestampUnit(TimestampUnit.MILLISECOND);
}};
try {
// Buat DataConnector.
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_HOLOGRES, columnFields, config);
System.out.println("DataConnector berhasil dibuat");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Berikut adalah contoh kode untuk membuat DataConnector guna menyinkronkan data dari DataHub ke Elasticsearch:
public static void createEsConnector(String projectName,String topicName){
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkEsConfig config = new SinkEsConfig() {{
setEndpoint(Constant.es_endpoint);
setIdFields(Constant.es_fields);
setIndex(Constant.es_index);
setPassword(Constant.es_password);
setProxyMode(Constant.es_proxyMode);
setTypeFields(Constant.es_typeFields);
setUser(Constant.es_user);
}};
try {
// Buat DataConnector.
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ES, columnFields, config);
System.out.println("DataConnector berhasil dibuat");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Berikut adalah contoh kode untuk membuat DataConnector guna menyinkronkan data dari DataHub ke Function Compute:
public static void createFcConnector(String projectName,String topicName){
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkFcConfig config = new SinkFcConfig() {{
setEndpoint(Constant.fc_endpoint);
setAccessId(Constant.fc_accessId);
setAccessKey(Constant.fc_accessKey);
setAuthMode(AuthMode.AK);
setFunction(Constant.fc_function);
setService(Constant.fc_service);
}};
try {
// Buat DataConnector.
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_FC, columnFields, config);
System.out.println("DataConnector berhasil dibuat");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Berikut adalah contoh kode untuk membuat DataConnector guna menyinkronkan data dari DataHub ke database MySQL:
public static void createMysqlConnector(String projectName,String topicName){
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkMysqlConfig config = new SinkMysqlConfig() {{
setDatabase( Constant.mysql_database);
setHost(Constant.mysql_host);
setInsertMode(InsertMode.OVERWRITE);
setPassword(Constant.mysql_password);
setPort(Constant.mysql_port);
setTable(Constant.mysql_table);
setUser(Constant.mysql_user);
}};
try {
// Buat DataConnector.
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_MYSQL, columnFields, config);
System.out.println("DataConnector berhasil dibuat");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Hapus DataConnector
Sintaks: DeleteConnectorResult deleteConnector(String projectName, String topicName, ConnectorType connectorType)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
ConnectorType: Jenis DataConnector yang ingin dihapus.
columnFields: Bidang yang akan disinkronkan.
sinkStartTime: Waktu mulai data disinkronkan ke DataHub. Satuan: milidetik.
config: Detail konfigurasi untuk jenis DataConnector tertentu.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Contoh Kode
public static void deleteConnector(String projectName,String topicName) {
try {
datahubClient.deleteConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println("DataConnector berhasil dihapus");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Query a DataConnector
Syntax: GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
ConnectorType: Jenis DataConnector yang ingin Anda periksa.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Contoh Kode
public static void getConnector(String projectName,String topicName) {
try {
GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println(getConnectorResult.getState() + "\t" + getConnectorResult.getSubId());
for (String fieldName : getConnectorResult.getColumnFields()) {
System.out.println(fieldName);
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Perbarui DataConnector
Anda dapat memperbarui konfigurasi DataConnector.
Sintaks: UpdateConnectorResult updateConnector(String projectName, String topicName, ConnectorType connectorType, SinkConfig config)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
ConnectorType: Jenis DataConnector yang ingin diperbarui.
config: Detail konfigurasi untuk jenis DataConnector tertentu.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Contoh Kode
public static void updateConnector(String projectName,String topicName) {
SinkOdpsConfig config = (SinkOdpsConfig) datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS).getConfig();
// Ubah pasangan AccessKey.
config.setTimeRange(100);
config.setAccessId(accessId);
config.setAccessKey(accessKey);
// Modifikasi tipe timestamp.
config.setTimestampUnit(ConnectorConfig.TimestampUnit.MICROSECOND);
try {
datahubClient.updateConnector(projectName, topicName, ConnectorType.SINK_ODPS, config);
System.out.println("DataConnector berhasil diperbarui");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Perbarui bidang yang akan disinkronkan menggunakan DataConnector
Sintaks: UpdateConnectorResult updateConnector(String projectName, String topicName, String connectorId, List columnFields)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
connectorId: ID DataConnector yang ingin diperbarui.
columnFields: Bidang yang akan disinkronkan.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Contoh Kode
public static void updateConnector(String projectName,String topicName) {
String connectorId = "";
// Parameter columnField menentukan semua bidang yang akan disinkronkan ke downstream, yang mencakup tetapi tidak terbatas pada bidang yang baru ditambahkan.
List<String> columnField = new ArrayList<>();
columnField.add("f1");
try {
batchClient.updateConnector(projectName, topicName,connectorId,columnField);
System.out.println("update connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Perbarui Status dari sebuah DataConnector
Sintaks: UpdateConnectorStateResult updateConnectorState(String projectName, String topicName, ConnectorType connectorType, ConnectorState connectorState)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
ConnectorType: Jenis DataConnector.
connectorState: Status DataConnector. Nilai yang valid: STOPPED dan RUNNING.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Contoh Kode
public static void updateConnectorState(String projectName,String topicName) {
try {
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
System.out.println("memperbarui status konektor berhasil");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Perbarui Offset dari sebuah DataConnector
Sintaks: UpdateConnectorOffsetResult updateConnectorOffset(String projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
ConnectorType: Jenis DataConnector.
shardId: ID shard. Jika parameter shardID disetel ke null, offset dari semua shard akan diperbarui.
offset: Offset DataConnector.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Contoh Kode
public static void updateConnectorOffset(String projectName,String topicName) {
ConnectorOffset offset = new ConnectorOffset() {{
setSequence(10);
setTimestamp(1000);
}};
try {
// Sebelum memperbarui offset dari sebuah DataConnector, hentikan DataConnector tersebut.
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
datahubClient.updateConnectorOffset(projectName, topicName, ConnectorType.SINK_ODPS, shardId, offset);
datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
System.out.println("pembaruan offset konektor berhasil");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}List DataConnectors
Sintaks: ListConnectorResult listConnector(String projectName, String topicName)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Contoh Kode
public static void listConnector(String projectName,String topicName) {
try {
ListConnectorResult listConnectorResult = datahubClient.listConnector(projectName, topicName);
for (String cName : listConnectorResult.getConnectorNames()) {
System.out.println(cName);
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Query the shard status of a DataConnector
Sintaks: GetConnectorShardStatusResult getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType), atau ConnectorShardStatusEntry getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType, String shardId).
Parameter
projectName: Nama proyek.
topicName: Nama topik.
ConnectorType: Tipe DataConnector.
shardId: ID shard.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Contoh Kode
public static void getConnectorShardStatusByShard(String projectName,String topicName,String shardId) {
try {
ConnectorShardStatusEntry connectorShardStatusEntry = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
System.out.println(connectorShardStatusEntry.getState() + "\t"
+ connectorShardStatusEntry.getCurrSequence() + "\t"
+ connectorShardStatusEntry.getDiscardCount() + "\t"
+ connectorShardStatusEntry.getUpdateTime());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
public static void getConnectorShardStatus(String projectName,String topicName) {
try {
GetConnectorShardStatusResult getConnectorShardStatusResult = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS);
for (Map.Entry<String, ConnectorShardStatusEntry> entry : getConnectorShardStatusResult.getStatusEntryMap().entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue().getState() + "\t"
+ entry.getValue().getCurrSequence() + "\t"
+ entry.getValue().getDiscardCount() + "\t"
+ entry.getValue().getUpdateTime());
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Restart a DataConnector
Sintaks: ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType), atau ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType, String shardId).
Parameter
projectName: Nama proyek.
topicName: Nama topik.
ConnectorType: Jenis DataConnector.
shardId: ID shard.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Contoh Kode
public static void reloadConnector(String projectName,String topicName ) {
try {
datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println("reload connector berhasil");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
public static void reloadConnectorByShard(String projectName,String topicName,String shardId) {
try {
datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
System.out.println("reload connector berhasil");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Query the completion time of a DataConnector
Sintaks: GetConnectorDoneTimeResult getConnectorDoneTime(String projectName, String topicName, ConnectorType connectorType)
Parameter
projectName: Nama proyek.
topicName: Nama topik.
ConnectorType: Tipe DataConnector.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Contoh Kode
public static void getDoneTime(String projectName,String topicName ) {
try {
GetConnectorDoneTimeResult getConnectorDoneTimeResult = datahubClient.getConnectorDoneTime(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println(getConnectorDoneTimeResult.getDoneTime());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Perbarui Daftar Putih VPC
Sintaks: UpdateProjectVpcWhitelistResult updateProjectVpcWhitelist(String projectName, String vpcIds)
Parameter
projectName: Nama dari Proyek.
vpcids: ID dari virtual private cloud (VPC).
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Contoh Kode
public static void updateProjectVpcWhitelist(String projectName) {
String vpcid = "12345";
try {
datahubClient.updateProjectVpcWhitelist(projectName, vpcid);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Tambahkan Bidang
Sintaks: AppendConnectorFieldResult appendConnectorField(String projectName, String topicName, ConnectorType connectorType, String fieldName)
Anda dapat menambahkan bidang yang ingin disinkronkan menggunakan DataConnector, selama tabel MaxCompute mencakup bidang tersebut.
Parameter
projectName: Nama proyek.
topicName: Nama topik.
ConnectorType: Jenis DataConnector.
fieldName: Nama bidang yang ingin ditambahkan. Bidang ini dapat disetel ke null.
Kesalahan
DatahubClientException
InvalidParameterException
AuthorizationFailureException
ResourceNotFoundException
Kode Sampel
public static void appendConnectorField(String projectName,String topicName) {
String newField = "newfield";
try {
// Baik topik maupun tabel MaxCompute berisi bidang yang ingin ditambahkan. Selain itu, skema untuk topik sama dengan skema tabel MaxCompute.
datahubClient.appendConnectorField(projectName, topicName, ConnectorType.SINK_ODPS, newField);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}Mengelola beberapa objek sekaligus
Kami menyarankan Anda menggunakan alat baris perintah di konsol DataHub.