Anda dapat menggunakan paket kafka-connect-tablestore untuk menyinkronkan data dari Apache Kafka ke tabel seri waktu di Tablestore. Topik ini menjelaskan cara mengonfigurasi sinkronisasi data dari Kafka ke tabel seri waktu di Tablestore.
Prasyarat
Apache Kafka telah diinstal dan diaktifkan, serta ZooKeeper telah diaktifkan. Untuk informasi lebih lanjut, lihat Dokumentasi Kafka.
Layanan Tablestore telah diaktifkan, serta sebuah instance dan tabel seri waktu telah dibuat. Untuk informasi lebih lanjut, lihat Memulai dengan Model TimeSeries.
CatatanAnda juga dapat menggunakan Tablestore Sink Connector untuk secara otomatis membuat tabel seri waktu tujuan. Untuk membuat tabel seri waktu ini, atur auto.create ke true.
Pasangan AccessKey telah diperoleh. Untuk informasi lebih lanjut, lihat Membuat Pasangan AccessKey.
Informasi latar belakang
Tablestore dapat menyimpan data seri waktu dan mendukung analitik pada data tersebut. Untuk informasi lebih lanjut, lihat Model TimeSeries.
Langkah 1: Menyebarkan Tablestore Sink Connector
Dapatkan paket Tablestore Sink Connector dengan salah satu metode berikut:
Unduh kode sumber dari Kode Sumber Tablestore Sink Connector di GitHub dan kompilasi kode tersebut.
Jalankan perintah berikut untuk mengunduh kode sumber Tablestore Sink Connector menggunakan alat Git:
git clone https://github.com/aliyun/kafka-connect-tablestore.gitPergi ke direktori tempat kode sumber yang Anda unduh disimpan, lalu jalankan perintah berikut untuk mengemas kode sumber menggunakan Maven:
mvn clean package -DskipTestsSetelah kompilasi selesai, paket yang dihasilkan akan disimpan di direktori target. Sebagai contoh, paket kafka-connect-tablestore-1.0.jar digunakan.
Unduh paket kafka-connect-tablestore yang telah dikompilasi.
Salin paket ke direktori $KAFKA_HOME/libs di setiap node.
Langkah 2: Memulai Tablestore Sink Connector
Tablestore Sink Connector dapat bekerja dalam mode standalone atau terdistribusi. Anda dapat memilih mode sesuai dengan kebutuhan bisnis Anda.
Untuk menulis data seri waktu ke Tablestore, catatan pesan di Kafka harus dalam format JSON. Oleh karena itu, JsonConverter diperlukan untuk memulai Tablestore Sink Connector. Anda tidak perlu mengekstraksi skema atau memasukkan kunci, tetapi Anda harus mengonfigurasi item konfigurasi di connect-standalone.properties atau connect-distributed.properties. Contoh kode berikut menunjukkan cara mengonfigurasi item konfigurasi.
Jika Anda memasukkan kunci, Anda harus mengonfigurasi key.converter dan key.converter.schemas.enable berdasarkan format kunci.
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=falseBagian ini menjelaskan cara menyinkronkan data ke tabel seri waktu di Tablestore dalam mode standalone. Prosedur untuk menyinkronkan data ke tabel seri waktu di Tablestore dalam mode terdistribusi serupa dengan prosedur untuk menyinkronkan data ke tabel data di Tablestore dalam mode terdistribusi. Namun, Anda perlu memodifikasi item konfigurasi sebelumnya di file konfigurasi pekerja connect-distributed.properties dan memodifikasi item konfigurasi terkait seri waktu di file konfigurasi konektor connect-tablestore-sink-quickstart.json. Untuk informasi lebih lanjut, lihat prosedur konfigurasi dalam mode terdistribusi di Langkah 2: Memulai Tablestore Sink Connector.
Untuk menggunakan Tablestore Sink Connector dalam mode standalone, ikuti langkah-langkah berikut:
Modifikasi file konfigurasi pekerja connect-standalone.properties dan file konfigurasi konektor connect-tablestore-sink-quickstart.properties sesuai kebutuhan Anda.
Contoh cara memodifikasi file konfigurasi pekerja connect-standalone.properties
File konfigurasi pekerja berisi item konfigurasi. Item-item ini mencakup parameter koneksi Kafka, format serialisasi, dan frekuensi pengiriman offset. Contoh kode berikut adalah contoh yang diberikan oleh Apache Kafka tentang cara memodifikasi file konfigurasi pekerja. Untuk informasi lebih lanjut, lihat Kafka Connect.
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=localhost:9092 # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply # it to key.converter.schemas.enable=true value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Note: symlinks will be followed to discover dependencies or plugins. # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, #plugin.path=Contoh cara memodifikasi file konfigurasi konektor connect-tablestore-sink-quickstart.properties
File konfigurasi konektor berisi item konfigurasi. Item-item ini mencakup kelas konektor, parameter koneksi Tablestore, dan pemetaan data. Untuk informasi lebih lanjut, lihat Deskripsi Konfigurasi.
# Specify the connector name. name=tablestore-sink # Specify the connector class. connector.class=TableStoreSinkConnector # Specify the maximum number of tasks. tasks.max=1 # Specify the list of Kafka topics from which you want to export data. topics=test # Specify values for the following Tablestore connection parameters: # The endpoint of the Tablestore instance. tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com # The authentication mode. tablestore.auth.mode=aksk # The AccessKey ID and the AccessKey secret. If tablestore.auth.mode is set to aksk, you need to specify the AccessKey ID and the AccessKey secret. tablestore.access.key.id=xxx tablestore.access.key.secret=xxx # The name of the Tablestore instance. tablestore.instance.name=xxx ## The configuration items related to Security Token Service (STS) authentication. If STS authentication is used, the following configuration items must be specified. You must also specify ACCESS_ID and ACCESS_KEY in environment variables when STS authentication is used. #sts.endpoint= #region= #account.id= #role.name= # Specify the format string for the name of the destination Tablestore table. You can use <topic> in the string as a placeholder for the topic from which you want to export data. # topics.assign.tables is assigned higher priority than table.name.format. If topics.assign.tables is specified, ignore the configuration of table.name.format. # For example, if table.name.format is set to kafka_<topic> and the name of the Kafka topic from which you want to export data is test, Kafka message records from the test topic are mapped to the table named kafka_test in Tablestore. table.name.format=<topic> # Specify the mapping between the Kafka topic and the destination Tablestore table. The value must be in the <topic>:<tablename> format. The topic name and table name are separated with a colon (:). If you want to specify multiple mappings, separate multiple mappings with commas (,). # If the mapping is not specified, the configuration of table.name.format is used. # topics.assign.tables=test:test_kafka # Specify whether to automatically create a destination table. Default value: false. auto.create=true # Specify how to process dirty data: # An error may occur when the Kafka message records are parsed or written to the time series table. You can specify the following two parameters to determine how to fix the error: # Specify the fault tolerance capability. Valid values: none and all. Default value: none. # none: An error causes the data import task that uses Tablestore Sink Connector to fail. # all: The message records for which errors are reported are skipped and logged. runtime.error.tolerance=none # Specify how dirty data is logged. Valid values: ignore, kafka, and tablestore. Default value: ignore. # ignore: All errors are ignored. # kafka: The message records for which errors are reported and the error messages are stored in a different Kafka topic. # tablestore: The message records for which errors are reported and the error messages are stored in a Tablestore data table. runtime.error.mode=ignore # If you set runtime.error.mode to kafka, you must specify the Kafka cluster address and the topic. # runtime.error.bootstrap.servers=localhost:9092 # runtime.error.topic.name=errors # If you set runtime.error.mode to tablestore, you must specify the name of the Tablestore data table. # runtime.error.table.name=errors ## The following configuration items are specific to data synchronization from Apache Kafka to time series tables in Tablestore. # The connector working mode. Default value: normal. tablestore.mode=timeseries # Mappings of the primary key field in the time series table. tablestore.timeseries.test.measurement=m tablestore.timeseries.test.dataSource=d tablestore.timeseries.test.tags=region,level # Mappings of the time field in the time series table. tablestore.timeseries.test.time=timestamp tablestore.timeseries.test.time.unit=MILLISECONDS # Specify whether to convert the column names of the time series data field to lowercase letters. Default value: true. The names of columns in the time series tables in the TimeSeries model do not support uppercase letters. If tablestore.timeseries.toLowerCase is set to false and the column name contains uppercase letters, an error is reported when data is written to the time series table. tablestore.timeseries.toLowerCase=true # Specify whether to store fields other than the primary key field and the time field as the time series data field in the time series table. Default value: true. If tablestore.timeseries.mapAll is set to false, only fields that are specified by using tablestore.timeseries.test.field.name are stored in the time series table as the time series data field. tablestore.timeseries.mapAll=true # Specify the name of the field that is contained in the time series data field. If you specify multiple fields that are contained in the time series data field, separate multiple field names with commas (,). tablestore.timeseries.test.field.name=cpu # Specify the type of the field that is contained in the time series data field. Valid values: double, integer, string, binary, and boolean. # If multiple fields are contained in the time series data field, the field types and the field names must be configured in pairs. Separate multiple field types with commas (,). tablestore.timeseries.test.field.type=double
Pergi ke direktori $KAFKA_HOME dan jalankan perintah berikut untuk mengaktifkan mode standalone:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-tablestore-sink-quickstart.properties
Langkah 3: Menghasilkan catatan pesan
Pergi ke direktori $KAFKA_HOME dan jalankan perintah berikut untuk memulai klien produser konsol:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTabel berikut menjelaskan parameter yang perlu Anda konfigurasikan untuk memulai klien produser konsol.
Parameter
Contoh
Deskripsi
--broker-list
localhost:9092
Alamat dan port broker di kluster Kafka.
--topic
test
Nama topik. Secara default, topik dibuat secara otomatis saat Anda memulai Tablestore Sink Connector. Anda juga dapat membuat topik secara manual.
Tulis pesan ke topik bernama test.
PentingUntuk mengimpor data ke tabel seri waktu, Anda harus menulis data dalam format JSON ke topik Kafka.
{"m":"cpu","d":"127.0.0.1","region":"shanghai","level":1,"timestamp":1638868699090,"io":5.5,"cpu":"3.5"}Masuk ke Konsol Tablestore untuk melihat data.