本文介绍如何使用Spring Cloud框架接入消息队列Kafka版并收发消息。

背景信息

Spring Cloud是用于构建消息驱动的微服务应用程序的框架。详细信息,请参见Spring Cloud Stream

前提条件

公网环境(消息传输需鉴权与加密)

公网环境,消息采用SASL_SSL协议进行鉴权并加密。客户端通过SSL接入点访问消息队列Kafka版。接入点的详细信息,请参见接入点对比

本示例将Demo包上传在/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo路径。

  1. 登录Linux系统,执行以下命令,进入Demo包所在路径/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
    cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
  2. 执行以下命令,进入配置文件路径。
    cd sasl-ssl/src/main/resources/
  3. 执行以下命令,编辑application.properties文件,并根据表 1配置实例信息。
    vi application.properties
    ##以下参数,您需配置为实际使用的实例信息。
    kafka.bootstrap-servers=47.99.XX.XX:9093,118.178.XX.XX:9093,47.110.XX.XX:9093,116.62.XX.XX:9093,101.37.XX.XX:9093
    kafka.consumer.group=test-spring
    kafka.output.topic.name=test-output
    kafka.input.topic.name=test-input
    kafka.ssl.truststore.location=/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl/src/main/resources/kafka.client.truststore.jks
    
    ### 配置Binding参数可以把消息队列Kafka版和Spring Cloud Stream的Binder绑定在一起,以下参数保持默认即可。
    spring.cloud.stream.bindings.MyOutput.destination=${kafka.output.topic.name}
    spring.cloud.stream.bindings.MyOutput.contentType=text/plain
    spring.cloud.stream.bindings.MyInput.group=${kafka.consumer.group}
    spring.cloud.stream.bindings.MyInput.destination=${kafka.input.topic.name}
    spring.cloud.stream.bindings.MyInput.contentType=text/plain
    
    ### Binder是Spring Cloud对消息中间件的封装模块,以下参数保持默认即可。
    spring.cloud.stream.kafka.binder.autoCreateTopics=false
    spring.cloud.stream.kafka.binder.brokers=${kafka.bootstrap-servers}
    spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
    spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
    spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=${kafka.ssl.truststore.location}
    spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=KafkaOnsCl****
    ### 如果Demo中没有以下参数,请手动增加。该参数表示是否需要进行服务器主机名验证。因消息传输使用SASL身份校验,可设置为空字符串关闭服务器主机名验证。
    ### 服务器主机名验证是验证SSL证书的主机名与服务器的主机名是否匹配,默认为HTTPS。
    spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=
    表 1. 参数列表
    参数 描述
    kafka.bootstrap-servers 消息队列Kafka版实例接入点。您可在消息队列Kafka版控制台实例详情页面的接入点信息区域获取。
    kafka.consumer.group 订阅消息的Group。您可以在消息队列Kafka版控制台Group 管理页面创建。具体操作,请参见步骤三:创建资源
    kafka.output.topic.name 消息的Topic。控制台程序通过此Topic每隔一段时间发送消息,内容是固定的。您可以在消息队列Kafka版控制台Topic 管理页面创建。具体操作,请参见步骤三:创建资源
    kafka.input.topic.name 消息的Topic。您可以通过此Topic在控制台发送消息,Demo程序会消费消息,并将消息打印在日志中。
    kafka.ssl.truststore.location SSL根证书kafka.client.truststore.jks的存放路径。
  4. 执行以下命令,打开kafka_client_jaas.conf文件,配置实例的用户名与密码。
    vi kafka_client_jaas.conf
    说明
    • 如果实例未开启ACL,您可以在消息队列Kafka版控制台的实例详情页面获取默认用户的用户名和密码。
    • 如果实例已开启ACL,请确保要使用的SASL用户为PLAIN类型且已授权收发消息的权限。具体信息,请参见SASL用户授权
    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="XXX"
      password="XXX";
    };
  5. 进入/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl路径,执行以下命令,运行Demo。
    sh run_demo.sh
    程序打印如下信息,说明接收到控制台程序通过 kafka.output.topic.name配置的Topic所发送的消息。
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
  6. 登录消息队列Kafka版控制台验证消息收发是否成功。
    • 查询kafka.output.topic.name配置的Topic是否接收到控制台程序发送的消息。具体操作,请参见查询消息
    • kafka.input.topic.name配置的Topic发送消息,查看Demo程序日志中是否会打印消息。具体操作,请参见发送消息

VPC环境(消息传输不鉴权不加密)

VPC环境,消息可以采用PLAINTEXT协议不鉴权不加密传输。客户端通过默认接入点访问消息队列Kafka版。接入点的详细信息,请参见接入点对比

本示例将Demo包上传在/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo路径。

  1. 登录Linux系统,执行以下命令,进入Demo包所在路径/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
    cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
  2. 执行以下命令,进入配置文件路径。
    cd vpc/src/main/resources/
  3. 执行以下命令,编辑application.properties文件,并根据表 1配置实例信息。
    vi application.properties
    ###以下参数请修改为实际使用的实例的信息
    kafka.bootstrap-servers=192.168.XX.XX:9092,192.168.XX.XX:9092,192.168.XX.XX:9092,192.168.XX.XX:9092,192.168.XX.XX:9092
    kafka.consumer.group=test-spring
    kafka.output.topic.name=test-output
    kafka.input.topic.name=test-input
  4. 进入/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/vpc路径,执行以下命令,运行Demo。
    sh run_demo.sh
    程序打印如下信息。
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
  5. 登录消息队列Kafka版控制台验证消息收发是否成功。
    • 查询kafka.output.topic.name配置的Topic是否接收到控制台程序发送的消息。具体操作,请参见查询消息
    • kafka.input.topic.name配置的Topic发送消息,查看Demo程序日志中是否会打印消息。具体操作,请参见发送消息