If your data already flows through an Apache Kafka pipeline, you can route it directly into AnalyticDB for MySQL Data Warehouse Edition (V3.0) using Logstash. Because AnalyticDB for MySQL is fully compatible with MySQL, the standard Logstash JDBC output plug-in connects without additional drivers or adapters.
How Logstash processes data
Logstash moves data through three sequential plug-in stages:
| Stage | Plug-in type | Role |
|---|---|---|
| Collect | Input | Reads events from a source (Kafka, files, HTTP, and others) |
| Transform | Filter | Parses, enriches, and reshapes events (Grok parsing, IP geolocation, PII anonymization) |
| Write | Output | Sends transformed events to a destination (AnalyticDB for MySQL via JDBC) |
Prerequisites
Before you begin, make sure you have:
-
Logstash 1.5 or later (version 1.5 introduced built-in Apache Kafka integration)
-
An Apache Kafka cluster with at least one topic containing data to import
-
An AnalyticDB for MySQL Data Warehouse Edition (V3.0) cluster endpoint, database name, username, and password
-
The MySQL JDBC driver (
com.mysql.jdbc.Driver) on the Logstash classpath
Import Kafka data into AnalyticDB for MySQL
Step 1: Install and update plug-ins
In the root directory of the Apache Kafka server, run:
bin/plugin install
bin/plugin update
For the full list of available Logstash plug-ins, see the Logstash plug-ins repository on GitHub.
Step 2: Configure the pipeline
Create a Logstash configuration file, for example config/kafka-to-adb.conf, with an input block and an output block.
Input configuration
The following example configures a Kafka consumer:
input {
kafka {
zk_connect => "localhost:2181"
group_id => "Logstash"
topic_id => "test"
codec => plain
reset_beginning => false # boolean (optional), default: false
consumer_threads => 5 # number (optional), default: 1
decorate_events => true # boolean (optional), default: false
}
}
Input parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
zk_connect |
string | Yes | — | ZooKeeper connection string, for example localhost:2181. |
group_id |
string | Yes | — | Consumer group ID. Consumption across different consumer groups is isolated from each other. |
topic_id |
string | Yes | — | The Kafka topic to subscribe to and consume from. |
reset_beginning |
boolean | No | false |
Offset position when Logstash starts: false resumes from the last committed offset (or the earliest offset if no prior offset exists); true starts from the earliest offset and switches to follow mode (tail -F) after consuming the last message. |
consumer_threads |
number | No | 1 |
Number of parallel consumer threads. |
decorate_events |
boolean | No | false |
When true, attaches metadata to each event: message size, topic source, and consumer group. |
rebalance_max_retries |
number | No | — | Number of retries for registering a partition owner node in ZooKeeper during a consumer group rebalance. |
consumer_timeout_ms |
number | No | — | Timeout period for receiving messages. Change this value only if the default causes issues in your environment. |
For a complete parameter reference, see the logstash-kafka README on GitHub.
To consume messages from the same topic in parallel, divide the topic into multiple partitions and assign the same group_id and topic_id to each consumer. This ensures messages are consumed in order.
Output configuration
The following example writes events to AnalyticDB for MySQL using the JDBC output plug-in:
output {
jdbc {
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}
Replace the placeholders in connection_string:
| Placeholder | Description |
|---|---|
HOSTNAME |
Endpoint of the AnalyticDB for MySQL cluster |
DATABASE |
Target database name |
USER |
Database username |
PASSWORD |
Database password |
Output parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
driver_class |
string | Yes | JDBC driver class name. Use com.mysql.jdbc.Driver for AnalyticDB for MySQL. |
connection_string |
string | Yes | JDBC connection URL for the AnalyticDB for MySQL cluster. |
statement |
array | Yes | An array where the first element is the INSERT statement with ? placeholders, followed by the Logstash field names that map to each placeholder in order. |
For a complete parameter reference, see the logstash-kafka README on GitHub.
Step 3: Start the pipeline
In the Logstash installation directory, run:
bin/Logstash -f config/kafka-to-adb.conf
Logstash starts consuming messages from the configured Kafka topic and writing them to AnalyticDB for MySQL.