本文以Java SDK为例介绍如何在VPC环境下使用SDK接入消息队列Kafka版的默认接入点并收发消息。

安装Java依赖库

pom.xml中添加以下依赖。
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.2</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.6</version>
</dependency>
说明 建议您保持服务端和客户端版本一致,即保持客户端库版本和消息队列Kafka版实例的大版本一致。您可以消息队列Kafka版控制台的实例详情页面获取消息队列Kafka版实例的大版本。

准备配置

  1. 创建Log4j配置文件log4j.properties
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    log4j.rootLogger=INFO, STDOUT
    
    log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
    log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
    log4j.appender.STDOUT.layout.ConversionPattern=[%d] %p %m (%c)%n
  2. 创建Kafka配置文件。
    ## 配置接入点,即控制台的实例详情页面显示的默认接入点。
    bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx
    ## 配置Topic,可以在控制台上创建Topic。
    topic=alikafka-topic-demo
    ## 配置Consumer Group,可以在控制台创建Consumer Group。
    group.id=CID-consumer-group-demo
  3. 创建配置文件加载程序JavaKafkaConfigurer.java
    public class JavaKafkaConfigurer {
        private static Properties properties;
        public synchronized static Properties getKafkaProperties() {
           if (null != properties) {
               return properties;
           }
           //获取配置文件kafka.properties的内容。
           Properties kafkaProperties = new Properties();
           try {
               kafkaProperties.load(KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
           } catch (Exception e) {
               //没加载到文件,程序要考虑退出。
               e.printStackTrace();
           }
           properties = kafkaProperties;
           return kafkaProperties;
        }
    }

多语言SDK

其他语言SDK请参见SDK概述