DataWorks menyediakan Tablestore Stream Reader untuk membaca data inkremental dari sumber data Stream Tablestore. Topik ini menjelaskan kemampuan sinkronisasi data dari sumber data Stream Tablestore.
Siapkan lingkungan Tablestore Stream sebelum sinkronisasi data
Sebelum menggunakan Tablestore Stream Reader, pastikan fitur Stream diaktifkan untuk tabel sumber Anda. Secara default, fitur Stream diaktifkan untuk tabel seri waktu. Anda dapat mengaktifkan fitur Stream saat membuat tabel atau memanggil operasi UpdateTable dalam SDK Tablestore setelah tabel dibuat. Contoh kode berikut menunjukkan cara mengaktifkan fitur Stream:
SyncClient client = new SyncClient("", "", "", "");
#Metode 1: Aktifkan fitur Stream saat Anda membuat tabel.
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // Nilai 24 menunjukkan bahwa Tablestore menyimpan data inkremental selama 24 jam.
client.createTable(createTableRequest);
#Metode 2: Jika Anda tidak mengaktifkan fitur Stream saat membuat tabel, Anda dapat memanggil operasi UpdateTable untuk mengaktifkan fitur ini setelah tabel dibuat.
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);Perhatikan hal-hal berikut saat memanggil operasi UpdateTable dalam SDK Tablestore untuk mengaktifkan fitur Stream:
Anda dapat menentukan waktu kedaluwarsa untuk data inkremental. Dengan cara ini, Tablestore Stream Reader dapat membaca data inkremental dari Tablestore. Setelah fitur Stream diaktifkan, server Tablestore menyimpan log operasi tabel Anda. Setiap partisi mengantre log operasi secara berurutan. Setiap log operasi didaur ulang setelah waktu kedaluwarsa yang ditentukan berlalu.
SDK Tablestore menyediakan beberapa operasi API terkait Stream yang digunakan untuk membaca log operasi. Tablestore Stream Reader memanggil operasi API ini untuk membaca data inkremental. Jika Anda menggunakan Tablestore Stream Reader untuk membaca data dalam mode kolom, data inkremental dikonversi menjadi beberapa 6-tupel. Setiap 6-tupel terdiri dari pk, colName, version, colValue, opType, dan sequenceInfo. Jika Anda membaca data dalam mode baris, Tablestore Stream Reader membaca data inkremental per baris.
Mode baca dan tipe data yang didukung
Tablestore Stream Reader memungkinkan Anda membaca data inkremental dari Tablestore dalam mode kolom atau baris. Bagian ini menjelaskan proses pembacaan data dan tipe data yang didukung oleh mode baca tersebut.
Membaca data dalam mode kolom
Dalam mode multi-versi Tablestore, data tabel diorganisasikan dalam struktur tiga tingkat: baris, kolom, dan versi. Satu baris dapat memiliki beberapa kolom dengan nama kolom yang tidak tetap. Setiap kolom dapat memiliki beberapa versi, dan setiap versi memiliki timestamp tertentu sebagai nomor versi.
Anda dapat memanggil operasi API Tablestore untuk membaca dan menulis data. Tablestore mencatat operasi tulis dan modifikasi terbaru pada data tabel untuk mencatat data inkremental. Oleh karena itu, data inkremental dapat dianggap sebagai kumpulan catatan operasi.
Tablestore mendukung jenis-jenis operasi berikut:
PutRow: Menulis sebuah baris. Jika baris tersebut ada, baris tersebut akan ditimpa.
UpdateRow: Memperbarui sebuah baris tanpa memodifikasi data lain dari baris aslinya. Anda dapat menggunakan UpdateRow untuk menambahkan nilai kolom, menimpa nilai kolom jika versi kolom ada, atau menghapus versi tertentu atau semua versi kolom.
DeleteRow: Menghapus sebuah baris.
Tablestore menghasilkan catatan data inkremental berdasarkan setiap jenis operasi. Tablestore Stream Reader membaca catatan-catatan ini dan mengonversinya ke format yang didukung oleh Data Integration.
Tablestore mendukung kolom dinamis dan mode multi-versi. Oleh karena itu, sebuah baris yang dihasilkan oleh Tablestore Stream Reader adalah versi dari sebuah kolom, bukan baris di Tablestore. Setelah Tablestore Stream Reader membaca data dari sebuah baris di Tablestore, data tersebut dikonversi menjadi beberapa baris. Setiap baris mencakup nilai kunci utama, nama kolom, timestamp versi untuk kolom (nomor versi), nilai versi, dan tipe operasi. Jika parameter isExportSequenceInfo disetel ke true, informasi deret waktu juga disertakan.
Jenis-jenis operasi berikut didefinisikan untuk data yang dikonversi ke format yang didukung oleh Data Integration:
U (UPDATE): Menulis versi sebuah kolom.
DO (DELETE_ONE_VERSION): Menghapus versi sebuah kolom.
DA (DELETE_ALL_VERSION): Menghapus semua versi sebuah kolom berdasarkan nilai kunci utama dan nama kolom.
DR (DELETE_ROW): Menghapus sebuah baris berdasarkan nilai kunci utama.
Tabel berikut mencantumkan data yang dikonversi oleh Tablestore Stream Reader dari sebuah tabel yang memiliki dua kolom kunci utama (pkName1 dan pkName2).
pkName1 | pkName2 | columnName | timestamp | columnValue | opType |
pk1_V1 | pk2_V1 | col_a | 1441803688001 | col_val1 | U |
pk1_V1 | pk2_V1 | col_a | 1441803688002 | col_val2 | U |
pk1_V1 | pk2_V1 | col_b | 1441803688003 | col_val3 | U |
pk1_V2 | pk2_V2 | col_a | 1441803688000 | - | DO |
pk1_V2 | pk2_V2 | col_b | - | - | DA |
pk1_V3 | pk2_V3 | - | - | - | DR |
pk1_V3 | pk2_V3 | col_a | 1441803688005 | col_val1 | U |
Pada contoh sebelumnya, tiga baris dalam tabel Tablestore dibaca dan dikonversi menjadi tujuh baris. Kunci utama untuk ketiga baris tersebut adalah (pk1_V1, pk2_V1), (pk1_V2, pk2_V2), dan (pk1_V3, pk2_V3).
Untuk baris yang kunci utamanya adalah (pk1_V1, pk2_V1), dua versi kolom col_a dan satu versi kolom col_b ditulis.
Untuk baris yang kunci utamanya adalah (pk1_V2, pk2_V2), satu versi kolom col_a dan semua versi kolom col_b dihapus.
Untuk baris yang kunci utamanya adalah (pk1_V3, pk2_V3), satu versi kolom col_a ditulis, dan baris tersebut dihapus.
Membaca data dalam mode baris
Tabel Lebar
Anda juga dapat menggunakan Tablestore Stream Reader untuk membaca data dalam mode baris. Dalam mode ini, Tablestore Stream Reader membaca catatan operasi sebagai baris. Anda harus mengonfigurasi parameter mode dan menentukan kolom-kolom yang ingin dibaca.
"parameter": { # Setel mode ke single_version_and_update_only, setel isExportSequenceInfo ke false, dan konfigurasikan parameter lainnya, seperti datasource dan table, berdasarkan kebutuhan bisnis Anda. "mode": "single_version_and_update_only", # Mode baca. "column":[ # Kolom-kolom yang ingin Anda baca datanya dari Tablestore. Anda dapat menentukan kolom berdasarkan kebutuhan bisnis Anda. { "name": "uid" # Nama kolom, yang bisa berupa kolom kunci utama atau kolom properti. }, { "name": "name" # Nama kolom, yang bisa berupa kolom kunci utama atau kolom properti. }, ], "isExportSequenceInfo": false, # Menentukan apakah membaca informasi deret waktu. Jika Anda menyetel parameter mode ke single_version_and_update_only, parameter ini hanya dapat disetel ke false. }Tabel Seri Waktu
Fitur Stream secara otomatis diaktifkan saat Anda membuat tabel seri waktu.
Tablestore Stream Reader memungkinkan Anda membaca data inkremental dari tabel seri waktu. Jika tabel tersebut merupakan tabel seri waktu, Anda harus mengonfigurasi parameter berikut:
"parameter": { # Konfigurasikan parameter berikut dan parameter lainnya, seperti datasource dan table, berdasarkan kebutuhan bisnis Anda. "mode": "single_version_and_update_only", # Mode baca. "isTimeseriesTable":"true", # Menentukan apakah tabel tersebut merupakan tabel seri waktu. "column":[ # Kolom-kolom yang ingin Anda baca datanya dari Tablestore. Anda dapat menentukan kolom berdasarkan kebutuhan bisnis Anda. { "name": "_m_name" # Nama kolom nama metrik. }, { "name": "_data_source" # Nama kolom sumber data. }, { "name": "_tags" # Nama kolom tag. Tag diubah menjadi data tipe string. }, { "name": "tag1", # Nama kunci tag. "is_timeseries_tag":"true" # Menentukan apakah bidang tersebut merupakan bidang internal dari tag. }, { "name": "time" # Nama kolom timestamp. }, { "name": "name" # Nama kolom properti. }, ], "isExportSequenceInfo": false, # Menentukan apakah membaca informasi deret waktu. Jika Anda menyetel parameter mode ke single_version_and_update_only, parameter ini hanya dapat disetel ke false. }Data yang dibaca per baris lebih dekat dengan data dalam baris aslinya, memudahkan pemrosesan data lebih lanjut. Jika Anda membaca data per baris, perhatikan poin-poin berikut:
Baris-baris yang dibaca diekstraksi dari catatan operasi. Setiap baris sesuai dengan operasi tulis atau pembaruan. Jika Anda hanya memperbarui beberapa kolom untuk sebuah baris, catatan operasi hanya berisi kolom-kolom yang diperbarui.
Nomor versi setiap kolom, yaitu timestamp setiap kolom, tidak dapat dibaca atau dihapus.
Pemetaan tipe data
Tablestore Stream Reader mendukung semua tipe data Tablestore. Tabel berikut mencantumkan pemetaan tipe data berdasarkan mana Tablestore Stream Reader mengonversi tipe data.
Kategori | Tipe data Tablestore |
Integer | INTEGER |
Floating point | DOUBLE |
String | STRING |
Boolean | BOOLEAN |
Binary | BINARY |
Mengembangkan tugas sinkronisasi data: Panduan prosedur untuk sinkronisasi inkremental dari Tablestore
Untuk informasi tentang prosedur konfigurasi, lihat Konfigurasikan Tugas Sinkronisasi Batch Menggunakan UI Tanpa Kode dan Konfigurasikan Tugas Sinkronisasi Batch Menggunakan Editor Kode.
Untuk informasi tentang semua parameter yang dikonfigurasi dan kode yang dijalankan saat menggunakan editor kode untuk mengonfigurasi tugas sinkronisasi batch, lihat Lampiran: Kode dan Parameter.
Lampiran: Kode dan parameter
Lampiran: Konfigurasikan tugas sinkronisasi batch menggunakan editor kode
Kode untuk Tablestore Stream Reader
Membaca Data dalam Mode Kolom
{ "type":"job", "version":"2.0",// Nomor versi. "steps":[ { "stepType":"otsstream",// Nama plugin. "parameter":{ "datasource":"$srcDatasource",// Nama sumber data. "dataTable":"",// Nama tabel. "statusTable":"TableStoreStreamReaderStatusTable",// Nama tabel yang digunakan oleh Tablestore Stream Reader untuk menyimpan catatan status. "maxRetries":30,// Jumlah maksimum percobaan ulang untuk setiap permintaan membaca data inkremental dari Tablestore. Nilai default: 30. "isExportSequenceInfo":false,// Menentukan apakah membaca informasi deret waktu. "startTimeString":"${startTime}${hh}",// Waktu mulai data inkremental, yang sama dengan waktu ketika tugas sinkronisasi mulai dijalankan. Konfigurasikan parameter ini dalam format yyyymmddhh24miss. "endTimeString":"${endTime}${hh}"// Waktu akhir data inkremental, yang sama dengan waktu ketika tugas sinkronisasi selesai dijalankan. Konfigurasikan parameter ini dalam format yyyymmddhh24miss. }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"// Jumlah maksimum catatan data kotor yang diizinkan. }, "speed":{ "throttle":true,// Menentukan apakah throttling diaktifkan. Nilai false menunjukkan bahwa throttling dinonaktifkan, dan nilai true menunjukkan bahwa throttling diaktifkan. Parameter mbps hanya berlaku saat parameter throttle disetel ke true. "concurrent":1,// Jumlah maksimum thread paralel. "mbps":"12"// Laju transmisi maksimum. Unit: MB/s. } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }Membaca Data dari Tabel Lebar dalam Mode Baris
{ "type":"job", "version":"2.0",// Nomor versi. "steps":[ { "stepType":"otsstream",// Nama plugin. "parameter":{ "datasource":"$srcDatasource",// Nama sumber data. "dataTable":"",// Nama tabel. "statusTable":"TableStoreStreamReaderStatusTable",// Nama tabel yang digunakan oleh Tablestore Stream Reader untuk menyimpan catatan status. "maxRetries":30,// Jumlah maksimum percobaan ulang untuk setiap permintaan membaca data inkremental dari Tablestore. Nilai default: 30. "isExportSequenceInfo":false,// Menentukan apakah membaca informasi deret waktu. "startTimeString":"${startTime}${hh}",// Waktu mulai data inkremental, yang sama dengan waktu ketika tugas sinkronisasi mulai dijalankan. Konfigurasikan parameter ini dalam format yyyymmddhh24miss. "endTimeString":"${endTime}${hh}"// Waktu akhir data inkremental, yang sama dengan waktu ketika tugas sinkronisasi selesai dijalankan. Konfigurasikan parameter ini dalam format yyyymmddhh24miss. "mode": "single_version_and_update_only", "column":[ { "name":"pId" }, { "name": "uId" }, { "name":"col0" }, { "name": "col1" } ], }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"// Jumlah maksimum catatan data kotor yang diizinkan. }, "speed":{ "throttle":true,// Menentukan apakah throttling diaktifkan. Nilai false menunjukkan bahwa throttling dinonaktifkan, dan nilai true menunjukkan bahwa throttling diaktifkan. Parameter mbps hanya berlaku saat parameter throttle disetel ke true. "concurrent":1,// Jumlah maksimum thread paralel. "mbps":"12" // Laju transmisi maksimum. } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }Membaca Data dari Tabel Seri Waktu dalam Mode Baris
{ "type":"job", "version":"2.0",// Nomor versi. "steps":[ { "stepType":"otsstream",// Nama plugin. "parameter":{ "datasource":"$srcDatasource",// Nama sumber data. "dataTable":"",// Nama tabel. "statusTable":"TableStoreStreamReaderStatusTable",// Nama tabel yang digunakan oleh Tablestore Stream Reader untuk menyimpan catatan status. "maxRetries":30,// Jumlah maksimum percobaan ulang untuk setiap permintaan membaca data inkremental dari Tablestore. Nilai default: 30. "isExportSequenceInfo":false,// Menentukan apakah membaca informasi deret waktu. "startTimeString":"${startTime}${hh}",// Waktu mulai data inkremental, yang sama dengan waktu ketika tugas sinkronisasi mulai dijalankan. Konfigurasikan parameter ini dalam format yyyymmddhh24miss. "endTimeString":"${endTime}${hh}"// Waktu akhir data inkremental, yang sama dengan waktu ketika tugas sinkronisasi selesai dijalankan. Konfigurasikan parameter ini dalam format yyyymmddhh24miss. "mode": "single_version_and_update_only", "isTimeseriesTable":"true", "column": [ { "name": "_m_name" }, { "name": "_data_source", }, { "name": "_tags", }, { "name": "string_column", } ] }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"// Jumlah maksimum catatan data kotor yang diizinkan. }, "speed":{ "throttle":true,// Menentukan apakah throttling diaktifkan. Nilai false menunjukkan bahwa throttling dinonaktifkan, dan nilai true menunjukkan bahwa throttling diaktifkan. Parameter mbps hanya berlaku saat parameter throttle disetel ke true. "concurrent":1,// Jumlah maksimum thread paralel. "mbps":"12"// Laju transmisi maksimum. Unit: MB/s. } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Parameter dalam kode untuk Tablestore Stream Reader
Parameter | Deskripsi | Diperlukan | Nilai default |
datasource | Nama sumber data. Harus sama dengan nama sumber data yang telah ditambahkan. Anda dapat menambahkan sumber data menggunakan editor kode. | Ya | Tidak ada nilai default |
dataTable | Nama tabel dari mana Anda ingin membaca data inkremental. Fitur Stream harus diaktifkan untuk tabel tersebut. Anda dapat mengaktifkan fitur Stream untuk sebuah tabel saat membuat tabel tersebut. Anda juga dapat memanggil operasi UpdateTable untuk mengaktifkan fitur ini setelah tabel dibuat. | Ya | Tidak ada nilai default |
statusTable | Nama tabel yang digunakan oleh Tablestore Stream Reader untuk menyimpan catatan status. Catatan-catatan ini membantu menemukan data yang tidak diperlukan dan meningkatkan efisiensi pembacaan. Jika tabel yang ditentukan tidak ada, Tablestore Stream Reader secara otomatis membuat tabel tersebut. Setelah tugas pembacaan offline selesai, Anda tidak perlu menghapus tabel tersebut. Catatan status dalam tabel dapat digunakan untuk tugas pembacaan berikutnya.
Anda dapat mengonfigurasi nama yang mirip dengan TableStoreStreamReaderStatusTable. Pastikan nama tersebut berbeda dari nama tabel terkait bisnis. | Ya | Tidak ada nilai default |
startTimestampMillis | Waktu mulai data inkremental, dalam milidetik. Waktu mulai adalah batas kiri rentang waktu tertutup-kiri, terbuka-kanan dari data inkremental.
| Tidak | Tidak ada nilai default |
endTimestampMillis | Waktu akhir data inkremental, dalam milidetik. Waktu akhir adalah batas kanan rentang waktu tertutup-kiri, terbuka-kanan dari data inkremental.
| Tidak | Tidak ada nilai default |
date | Tanggal ketika data yang ingin Anda baca dihasilkan. Konfigurasikan parameter ini dalam format yyyyMMdd, seperti 20151111. Anda harus mengonfigurasi parameter date, parameter startTimestampMillis dan endTimestampMillis, atau parameter startTimeString dan endTimeString. Sebagai contoh, Pusat Pemrosesan Data Alibaba Cloud menjadwalkan tugas per hari. Oleh karena itu, Anda perlu mengonfigurasi parameter date dan tidak perlu mengonfigurasi parameter startTimestampMillis dan endTimestampMillis atau parameter startTimeString dan endTimeString. | Tidak | Tidak ada nilai default |
isExportSequenceInfo | Menentukan apakah membaca informasi deret waktu. Informasi deret waktu mencakup waktu ketika data ditulis. Nilai default adalah false, yang menunjukkan bahwa informasi deret waktu tidak dibaca. | Tidak | false |
maxRetries | Jumlah maksimum percobaan ulang untuk setiap permintaan membaca data inkremental dari Tablestore. Nilai default: 30. Percobaan ulang dilakukan pada interval tertentu. Durasi total 30 percobaan ulang adalah sekitar 5 menit. Anda dapat mempertahankan pengaturan default. | Tidak | 30 |
startTimeString | Waktu mulai data inkremental, dalam detik. Waktu mulai adalah batas kiri rentang waktu tertutup-kiri, terbuka-kanan dari data inkremental. Konfigurasikan parameter ini dalam format | Tidak | Tidak ada nilai default |
endTimeString | Waktu akhir data inkremental, dalam detik. Waktu akhir adalah batas kanan rentang waktu tertutup-kiri, terbuka-kanan dari data inkremental. Konfigurasikan parameter ini dalam format | Tidak | Tidak ada nilai default |
enableSeekIterator | Menentukan apakah akan menentukan offset dari mana Tablestore Stream Reader mulai membaca data inkremental. Jika data inkremental sering dibaca, Tablestore Stream Reader secara otomatis menentukan offset berdasarkan offset dari mana data dibaca terakhir kali. Jika Tablestore Stream Reader belum dijalankan sebelumnya, data dibaca dari waktu mulai data inkremental. Secara default, data inkremental disimpan selama tujuh hari. Sebelum waktu mulai tiba, tidak ada data yang diekspor. Anda dapat menambahkan | Tidak | false |
mode | Mode baca. Jika parameter ini disetel ke single_version_and_update_only, data dibaca dalam mode baris. | Tidak | Tidak ada nilai default |
isTimeseriesTable | Menentukan apakah tabel tersebut merupakan tabel seri waktu. Parameter ini berlaku hanya jika parameter mode disetel ke single_version_and_update_only. | Tidak | false |
column | Nama kolom dari mana Anda ingin membaca data ketika Anda menyetel parameter mode ke
Catatan Jika Anda ingin membaca data dalam mode baris, Anda harus mengonfigurasi parameter ini. Jika tidak, data tidak dapat dibaca. |
| Tidak ada nilai default |