This topic describes how to start two Apache Kafka clusters and then a Replicator process to replicate data between the clusters.
Background information
As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends that you deploy Confluent Platform in Apache Kafka Raft (KRaft) mode. For information about how to run Apache Kafka in KRaft mode, see KRaft Overview and Quick Start for Confluent Platform.
In the examples of this topic, sample code for deployment in KRaft mode and ZooKeeper mode is provided.
For KRaft, the sample code for the combined mode is provided. In this mode, a Kafka node acts as a broker and a KRaft controller. The combination mode is not supported for production workloads. It is used in the sample code of this topic to simplify the procedure. If you want to run a broker and a controller on different Kafka nodes, use the isolated mode in Confluent Platform. To learn more, see KRaft Overview and KRaft mode.
Prerequisites
Confluent Platform is installed. For more information, see Install Confluent Platform On-Premises.
Java 8, 11, or 17 is installed. For more information, see Java Downloads.
Ports for Kafka brokers and Confluent Platform components
KRaft mode
Component | Origin | Destination |
Kafka brokers | 9082 | 9092 |
KRaft controllers | 9071 | 9093 |
Metadata server listeners (in brokers) | 8091 | 8090 |
Connect Replicator worker | 8083 | |
Control Center | 9021 |
ZooKeeper mode
Component | Origin | Destination |
Kafka brokers | 9082 | 9092 |
ZooKeeper | 2171 | 2181 |
Metadata server listeners (in brokers) | 8091 | 8090 |
Connect Replicator worker | 8083 | |
Control Center | 9021 |
Start the destination cluster
KRaft mode
Run the following command to go to the path where you want to install Confluent Platform:
cd $CONFLUENT_HOMERun the following command to create a directory that is used to store all example files:
mkdir my-examplesRun the following command to copy the etc/kafka/kraft/server.properties file to the examples directory. Then, rename the file to match its purpose.
cp etc/kafka/kraft/server.properties my-examples/server_destination.propertiesRun the following command to use the kafka-storage tool to generate a random Universally Unique Identifier (UUID):
KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)"NoteThe kafka-storage command is run only once per broker and controller. You cannot use this command to update an existing cluster. If you make a configuration mistake in this step, you must recreate the directories from scratch and perform the steps again.
Run the following command to format the log directories for the destination cluster:
./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c my-examples/server_destination.propertiesRun the following command to start the destination cluster:
./bin/kafka-server-start my-examples/server_destination.propertiesFor more information, see Quick Start for Confluent Platform.
Zookeeper mode
Run the following command to switch to the path where Confluent Platform is installed:
cd $CONFLUENT_HOMERun the following command to create a directory to store all example files:
mkdir my-examplesRun the following command to copy the etc/kafka/zookeeper.properties file to the examples directory. Then, rename the file to match its purpose.
cp etc/kafka/zookeeper.properties my-examples/zookeeper_destination.propertiesRun the following command to copy the etc/kafka/kraft/server.properties file to the examples directory. Then, rename the file to match its purpose.
cp etc/kafka/server.properties my-examples/server_destination.propertiesStart a ZooKeeper server. In this example, services run on localhost. Run the following command to start ZooKeeper in its own terminal:
./bin/zookeeper-server-start my-examples/zookeeper_destination.propertiesStart a Kafka broker to serve as the single node Kafka cluster for the destination. Run the following command to start Kafka in its own terminal:
./bin/kafka-server-start my-examples/server_destination.properties ./bin/kafka-server-start my-examples/server_destination.properties
Start the origin cluster
Configure and start the origin cluster in a new terminal window.
KRaft mode
When you configure the destination node to run on default ports, you must run the origin node on a different port to prevent conflicts. For example, if the Kafka broker on the origin node is configured on port 9082, the controller can be configured on port 9071. For information about port mappings, see the "Ports for Kafka brokers and Confluent Platform components" section of this topic. You can copy the configuration files of the origin cluster to a temporary location and perform the following steps to modify them to prevent conflicts with the destination cluster.
Run the following command to switch to the path where Confluent Platform is installed:
cd $CONFLUENT_HOMERun the following command to copy the etc/kafka/kraft/server.properties file to the examples directory. Then, rename the file to match its purpose.
cp etc/kafka/kraft/server.properties my-examples/server_origin.propertiesRun the following commands to update the port numbers:
sed -i '' -e "s/9093/9071/g" my-examples/server_origin.properties sed -i '' -e "s/9092/9082/g" my-examples/server_origin.properties sed -i '' -e "s/8090/8091/g" my-examples/server_origin.properties sed -i '' -e "s/#confluent.metadata.server.listeners/confluent.metadata.server.listeners/g" my-examples/server_origin.properties sed -i '' -e "s/confluent.metrics.reporter.bootstrap.servers=localhost:9092/confluent.metrics.reporter.bootstrap.servers=localhost:9082/g" my-examples/server_origin.propertiesRun the following command to update data directories:
sed -i '' -e "s/kraft-combined-logs/kraft-combined-logs-origin/g" my-examples/server_origin.propertiesUse the kafka-storage tool to generate a random UUID.
KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)"Run the following command to format the log directories for the origin cluster:
./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c my-examples/server_origin.propertiesStart a Kafka broker to serve as the single node Kafka cluster for the origin. Run the following command to start Kafka in its own terminal:
./bin/kafka-server-start my-examples/server_origin.properties
Zookeeper mode
When you configure the destination cluster to run on default ports, you must run the origin cluster on a different port to prevent conflicts. For example, if the Kafka broker in the origin cluster is configured on port 9082, ZooKeeper can be configured on 2171. For information about port mappings, see the "Ports for Kafka brokers and Confluent Platform components" section of this topic. You can copy the configuration files of the origin cluster to your examples directory and perform the following steps to modify them to prevent conflicts with the destination cluster.
Run the following commands to copy the configuration files to my-examples:
cp etc/kafka/zookeeper.properties my-examples/zookeeper_origin.properties cp etc/kafka/server.properties my-examples/server_origin.propertiesRun the following commands to update the port numbers:
sed -i '' -e "s/2181/2171/g" my-examples/zookeeper_origin.properties sed -i '' -e "s/9092/9082/g" my-examples/server_origin.properties sed -i '' -e "s/2181/2171/g" my-examples/server_origin.properties sed -i '' -e "s/#listeners/listeners/g" my-examples/server_origin.properties sed -i '' -e "s/8090/8091/g" my-examples/server_origin.properties sed -i '' -e "s/#confluent.metadata.server.listeners/confluent.metadata.server.listeners/g" my-examples/server_origin.propertiesRun the following command to update the broker ID of the origin cluster:
sed -i '' -e "s/broker.id=0/broker.id=1/g" my-examples/server_origin.propertiesRun the following commands to update data directories:
sed -i '' -e "s/zookeeper/zookeeper_origin/g" my-examples/zookeeper_origin.properties sed -i '' -e "s/kafka-logs/kafka-logs-origin/g" my-examples/server_origin.propertiesStart the origin cluster.
Run the following command to start ZooKeeper in its own terminal:
./bin/zookeeper-server-start my-examples/zookeeper_origin.propertiesRun the following command to start Kafka in its own terminal:
./bin/kafka-server-start my-examples/server_origin.properties
Create a topic
Open a new command window to run Kafka commands.
Run the following command to create a topic named test-topic in the origin cluster:
kafka-topics --create --topic test-topic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9082Run the following command to check whether the topic is created:
kafka-topics --list --bootstrap-server localhost:9082If an output similar to the following one is displayed, the topic is created. Note that a topic whose name starts with _confluent is an internal topic.
__confluent.support.metrics _confluent-command test-topicWhen you configure and run Replicator, the test-topic topic is replicated to the destination cluster that runs on port 2181.
Configure and run Replicator
Create the following files in $CONFLUENT_HOME/my-examples/.
Run the following command to configure the origin cluster in a file named consumer.properties:
cp etc/kafka/consumer.properties my-examples/.Edit the file and make sure it contains the addresses of brokers from the origin cluster. The default broker list matches the origin cluster that you started earlier.
# Origin cluster connection configuration bootstrap.servers=localhost:9082Run the following command to configure the destination cluster in a new file named producer.properties:
cp etc/kafka/producer.properties my-examples/.Edit the file and make sure it contains the addresses of brokers from the destination cluster. The default broker list matches the destination cluster that you started earlier.
# Destination cluster connection configuration bootstrap.servers=localhost:9092Run the following commands to define the Replicator configuration in a new file named replication.properties for the Connect worker:
# Replication configuration topic.rename.format=${topic}.replica replication.factor=1 config.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 confluent.topic.replication.factor=1NoteIf no port is defined in the replication.properties file, this worker runs on its default port 8083. The port is the desired configuration for this deployment.
The replication factor properties, which are all set to 1, are used because these test clusters are small. In production environments, we recommend that you specify the minimum cluster size as 3.
Start Replicator
Run the following command to start the Replicator executable in its own terminal:
./bin/replicator --cluster.id replicator --consumer.config my-examples/consumer.properties --producer.config my-examples/producer.properties --replication.config my-examples/replication.properties --whitelist 'test-topic'--cluster.id: an identifier that is used to determine which Replicator cluster this executable joins. Multiple Replicator executable instances with the same cluster.id work together.--consumer.config: the path to the origin cluster configuration.--producer.config: the path to the destination cluster configuration.--replication.config: the path to a file that contains any non-connection specific configuration. Command line arguments overwrite these configurations.--whitelist: a list of topics to replicate from origin to destination.
For information about a full list of command line options, see Command line parameters of Replicator executable. Query messages that are related to starting the source task and creating the replicated topic. These messages indicate Replicator is up and running and is copying topics.
Verify topic replication between clusters
After Replicator is initiated, it checks the origin cluster for topics that need to be replicated.
Run the following command to check whether the test-topic.replica topic exists:
./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092Expected output:
./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092
Topic: test-topic.replica PartitionCount: 1 ReplicationFactor: 1 Configs: message.timestamp.type=CreateTime,segment.bytes=1073741824
Topic: test-topic.replica Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Offline: 0You can also list and describe the topics on the destination cluster. Example: test-topic.replica.
./bin/kafka-topics --list --bootstrap-server localhost:9092To list topics on the origin cluster, run kafka-topics --list against localhost:9082.
To view the description of the original topic, run kafka-topics --describe and then look for test-topic and target localhost:9082.
After you create the topic in the source cluster, you can send data to the test-topic topic in the origin cluster by using a Kafka producer. Then, you can confirm that the data is replicated by consuming from the test-topic.replica topic in the destination cluster. For example, to send a sequence of numbers by using a producer in the Kafka console, run the following command in a new terminal window:
seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082You can confirm delivery in the destination cluster by using a consumer in the Kafka console in its own terminal window:
./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092If the numbers 1 to 10,000 appear in the consumer output, multi-cluster replication is created.
Use Control Center to monitor Replicator
Stop Replicator and brokers on both the origin and destination clusters. Then, stop the ZooKeeper instances.
For information about how to activate the monitoring extension for Replicator, see Replicator monitoring extension.
Add the full path of replicator-rest-extension-<version>.jar to your CLASSPATH.
Add rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension to my-examples/replication.properties.
Uncomment or add the following lines to the Kafka configuration files for both the destination and origin: my-examples/server_destination.properties and my-examples/server_origin.properties. The configuration for confluent.metrics.reporter.bootstrap.servers must point to localhost on port 9092 in both files. Therefore, you need to edit one or both of these port numbers.
# Your deployment is in development mode and a replication factor of 1 is used. confluent.metrics.reporter.topic.replicas=1 # Enable metrics reporting on Control Center and provide access to the Confluent internal topic that collects and stores the monitoring data. metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter confluent.metrics.reporter.bootstrap.servers=localhost:9092Edit my-examples/producer.properties to add the monitoring interceptor for the producer.
# Monitoring interceptor for producer interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptorEdit my-examples/consumer.properties to add the monitoring interceptor for the consumer.
# Monitoring interceptor for consumer interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptorEdit etc/confluent-control-center/control-center-dev.properties to add the following two lines that specify origin and destination bootstrap servers for Control Center.
# multi-cluster monitoring confluent.controlcenter.kafka.origin.bootstrap.servers=localhost:9082 confluent.controlcenter.kafka.destination.bootstrap.servers=localhost:9092NoteControl Center requires the host and port of the Connect REST endpoint to know where to look for Replicator monitoring metrics. In the configuration file control-center-dev.properties that is used for this example, the location is configured on the default port.
# Separate multiple host names with commas (,). confluent.controlcenter.connect.cluster=http://localhost:8083The default value of the production-ready configuration file control-center-production.properties is commented out. If you use this file, have multiple connectors, or want to configure Connect clusters in different ways, you must specify the Connect endpoints. To specify the Connect endpoints, you can uncomment the default or specify hosts for your own Connect clusters. For more information, see General settings.
If you run both Replicator and Connect clusters in your deployment, you must specify Replicator and Connect clusters separately.
Connect
cluster: confluent.controlcenter.connect.<connect-cluster-name>.cluster=http://connect-host-1:8083Replicator
confluent.controlcenter.connect.<replicator-name>.cluster=http://replicator-host:8083
Run the following commands to restart the ZooKeeper instances on the destination and origin clusters:
./bin/zookeeper-server-start etc/kafka/zookeeper.properties ./bin/zookeeper-server-start my-examples/zookeeper_origin.propertiesRun the following commands to restart the brokers on the destination and origin clusters:
./bin/kafka-server-start my-examples/server_destination.properties ./bin/kafka-server-start my-examples/server_origin.propertiesRun the following command to restart Replicator and the Connector worker:
./bin/replicator --cluster.id replicator --consumer.config my-examples/consumer.properties --producer.config my-examples/producer.properties --replication.config my-examples/replication.properties --whitelist 'test-topic'Run the following command to launch Control Center:
./bin/control-center-start etc/confluent-control-center/control-center-dev.propertiesOpen Control Center in your browser. The clusters are rendered on Control Center with an auto-generated name based on your configuration.
(Optional) On Control Center, edit the cluster names to suit your use case.
On Control Center, select the destination cluster. In the left-side navigation pane, click Replicators. Then, use Control Center to monitor replication performance and drill down on source and replicated topics.
To view messages produced to both the original and replicated topic on Control Center, run the kafka-consumer-perf-test command in the command window of Control Center to auto-generate test data to the test-topic topic.
kafka-producer-perf-test \ --producer-props bootstrap.servers=localhost:9082 \ --topic test-topic \ --record-size 1000 \ --throughput 1000 \ --num-records 3600000Expected output:
4999 records sent, 999.8 records/sec (0.95 MB/sec), 1.1 ms avg latency, 240.0 ms max latency. 5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.5 ms avg latency, 4.0 ms max latency. 5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 5.0 ms max latency. 5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.3 ms avg latency, 3.0 ms max latency. 5001 records sent, 1000.0 records/sec (0.95 MB/sec), 0.3 ms avg latency, 4.0 ms max latency. 5000 records sent, 1000.0 records/sec (0.95 MB/sec), 0.8 ms avg latency, 24.0 ms max latency. 5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 3.0 ms max latency. ...Run the kafka-console-consumer command to consume these messages from the command line to verify whether the replica topic is receiving the messages.
./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092Verify whether the replica topic is receiving the messages on Control Center.
For more information about monitoring Replicator in Control Center, see Manage Replicator Using Control Center for Confluent Platform.
After you complete your experiments by referring to this topic, make sure that you perform the following steps to clean up:
Stop producers and consumers by using Ctl-C in each command window.
Use Ctl-C in each command window to stop each service in reverse order in which you started them. That means you must first stop Control Center, then Replicator and Kafka brokers, and finally ZooKeeper.