All Products
Search
Document Center

AnalyticDB:Import data into AnalyticDB for MySQL using Logstash

Last Updated:Mar 30, 2026

If your data already flows through an Apache Kafka pipeline, you can route it directly into AnalyticDB for MySQL Data Warehouse Edition (V3.0) using Logstash. Because AnalyticDB for MySQL is fully compatible with MySQL, the standard Logstash JDBC output plug-in connects without additional drivers or adapters.

How Logstash processes data

Logstash moves data through three sequential plug-in stages:

Stage Plug-in type Role
Collect Input Reads events from a source (Kafka, files, HTTP, and others)
Transform Filter Parses, enriches, and reshapes events (Grok parsing, IP geolocation, PII anonymization)
Write Output Sends transformed events to a destination (AnalyticDB for MySQL via JDBC)

Prerequisites

Before you begin, make sure you have:

  • Logstash 1.5 or later (version 1.5 introduced built-in Apache Kafka integration)

  • An Apache Kafka cluster with at least one topic containing data to import

  • An AnalyticDB for MySQL Data Warehouse Edition (V3.0) cluster endpoint, database name, username, and password

  • The MySQL JDBC driver (com.mysql.jdbc.Driver) on the Logstash classpath

Import Kafka data into AnalyticDB for MySQL

Step 1: Install and update plug-ins

In the root directory of the Apache Kafka server, run:

bin/plugin install
bin/plugin update

For the full list of available Logstash plug-ins, see the Logstash plug-ins repository on GitHub.

Step 2: Configure the pipeline

Create a Logstash configuration file, for example config/kafka-to-adb.conf, with an input block and an output block.

Input configuration

The following example configures a Kafka consumer:

input {
    kafka {
        zk_connect => "localhost:2181"
        group_id => "Logstash"
        topic_id => "test"
        codec => plain
        reset_beginning => false # boolean (optional), default: false
        consumer_threads => 5  # number (optional), default: 1
        decorate_events => true # boolean (optional), default: false
    }
}

Input parameters

Parameter Type Required Default Description
zk_connect string Yes ZooKeeper connection string, for example localhost:2181.
group_id string Yes Consumer group ID. Consumption across different consumer groups is isolated from each other.
topic_id string Yes The Kafka topic to subscribe to and consume from.
reset_beginning boolean No false Offset position when Logstash starts: false resumes from the last committed offset (or the earliest offset if no prior offset exists); true starts from the earliest offset and switches to follow mode (tail -F) after consuming the last message.
consumer_threads number No 1 Number of parallel consumer threads.
decorate_events boolean No false When true, attaches metadata to each event: message size, topic source, and consumer group.
rebalance_max_retries number No Number of retries for registering a partition owner node in ZooKeeper during a consumer group rebalance.
consumer_timeout_ms number No Timeout period for receiving messages. Change this value only if the default causes issues in your environment.

For a complete parameter reference, see the logstash-kafka README on GitHub.

Note

To consume messages from the same topic in parallel, divide the topic into multiple partitions and assign the same group_id and topic_id to each consumer. This ensures messages are consumed in order.

Output configuration

The following example writes events to AnalyticDB for MySQL using the JDBC output plug-in:

output {
    jdbc {
        driver_class => "com.mysql.jdbc.Driver"
        connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
        statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
    }
}

Replace the placeholders in connection_string:

Placeholder Description
HOSTNAME Endpoint of the AnalyticDB for MySQL cluster
DATABASE Target database name
USER Database username
PASSWORD Database password

Output parameters

Parameter Type Required Description
driver_class string Yes JDBC driver class name. Use com.mysql.jdbc.Driver for AnalyticDB for MySQL.
connection_string string Yes JDBC connection URL for the AnalyticDB for MySQL cluster.
statement array Yes An array where the first element is the INSERT statement with ? placeholders, followed by the Logstash field names that map to each placeholder in order.

For a complete parameter reference, see the logstash-kafka README on GitHub.

Step 3: Start the pipeline

In the Logstash installation directory, run:

bin/Logstash -f config/kafka-to-adb.conf

Logstash starts consuming messages from the configured Kafka topic and writing them to AnalyticDB for MySQL.