Spring Cloud之数据流Stream

Spring Cloud Stream是Netflix提供的数据流操作开发包,是为了提供微服务间消息通信而产生的一种框架,封装了与RabbitMQ、Kafka等中间件交互的操作。Spring Cloud Stream基于Spring Boot构建,开发者可以很方便地实现自己的消息通信实例。Spring Cloud Stream为不同的消息中间件提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

1. Spring Cloud Stream说明

Spring Cloud Stream应用由第三方的中间件组成。应用间的通信通过输入通道(input channel)和输出通道(output channel)完成。这些通道是有Spring Cloud Stream 注入的。而通道与外部的消息队列的连接又是通过Binder实现的。
image

如上图所示,在实际的生产环境下,开发者只需要定义input和output输出流即可,Middleware中间件目前支持Kafka和RabbitMQ。

体验Spring Cloud Stream需要你的主机有以下环境依赖(以RabbitMQ为例):

软件环境 版本信息
jdk 1.8+
RabbitMQ 3.0+

2. 生产者实现

本文介绍的实例主要实现了三种message类型,分别是String、JSON、Object对象等。
首先,pom.xml文件中加入依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

增加消息通道接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface StreamChannelConfig {
/**
* 消息生产者
*/
String ENTITY_PRODUCER = "testStreamEntityProducer";
@Output(ENTITY_PRODUCER)
MessageChannel producerEntity();
/**
* 消息生产者
*/
String JSON_PRODUCER = "testStreamJSONProducer";
@Output(JSON_PRODUCER)
MessageChannel producerJSON();
/**
* 消息生产者
*/
String STRING_PRODUCER = "testStreamStringProducer";
@Output(STRING_PRODUCER)
MessageChannel producerString();
}

实现消息发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
@EnableBinding(StreamChannelConfig.class)
public class TestProducer {
private static final Logger LOG = LoggerFactory.getLogger(TestProducer.class);
@Resource
private StreamChannelConfig testStreamChannel;
/**
* 发送消息
* @param object
*/
public void sendEntityMessage(Object object) {
LOG.info("发送消息:{}", object.toString());
testStreamChannel.producerEntity().send(MessageBuilder.withPayload(object).build());
}
/**
* 发送消息
* @param json
*/
public void sendJSONMessage(JSONObject json) {
LOG.info("发送消息:{}", json.toJSONString());
testStreamChannel.producerJSON().send(MessageBuilder.withPayload(json).build());
}
/**
* 发送消息
* @param message
*/
public void sendStringMessage(String message) {
LOG.info("发送消息:{}", message);
testStreamChannel.producerString().send(MessageBuilder.withPayload(message).build());
}
}

配置文件中加入相关配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#rabbitmq
spring.rabbitmq.addresses=RabbitMQ所在IP:Port
spring.rabbitmq.username=XXX
spring.rabbitmq.password=XXX
#stream
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.testStreamEntityProducer.destination=testStreamEntityExchange
spring.cloud.stream.rabbit.bindings.testStreamEntityProducer.producer.routing-key-expression='testStreamEntityRoutingKey'
spring.cloud.stream.bindings.testStreamJSONProducer.destination=testStreamJSONExchange
spring.cloud.stream.rabbit.bindings.testStreamJSONProducer.producer.routing-key-expression='testStreamJSONRoutingKey'
spring.cloud.stream.bindings.testStreamStringProducer.destination=testStreamStringExchange
spring.cloud.stream.rabbit.bindings.testStreamStringProducer.producer.routing-key-expression='testStreamStringRoutingKey'

至此,已经实现了消息的生产者端,这就能看到Spring Cloud Stream的优点了,通过不到10行的代码,就能实现了对RabbitMQ的消息发送。

3. 消费者实现

添加依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

定义消费者通道接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface StreamChannelConfig {
/**
* 消费者
*/
String ENTITY_CONSUMER = "testStreamEntityConsumer";
@Input(ENTITY_CONSUMER)
SubscribableChannel consumerEntity();
/**
* 消息消费者
*/
String JSON_CONSUMER = "testStreamJSONConsumer";
@Input(JSON_CONSUMER)
SubscribableChannel consumerJSON();
/**
* 消息消费者
*/
String STRING_CONSUMER = "testStreamStringConsumer";
@Input(STRING_CONSUMER)
SubscribableChannel consumerString();
}

实现消息接收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Component
@EnableBinding(StreamChannelConfig.class)
public class TestConsumer {
private static final Logger LOG = LoggerFactory.getLogger(TestConsumer.class);
/**
* 消息消费
* @param object
*/
@StreamListener(StreamChannelConfig.ENTITY_CONSUMER)
public void consumerEntity(Object object) {
LOG.info("接收消息:{}", object.toString());
}
/**
* 消息消费
* @param json
*/
@StreamListener(StreamChannelConfig.JSON_CONSUMER)
public void consumerJSON(JSONObject json) {
LOG.info("接收消息:{}", json.toJSONString());
}
/**
* 消息消费
* @param message
*/
@StreamListener(StreamChannelConfig.STRING_CONSUMER)
public void consumerString(String message) {
LOG.info("接收消息:{}", message);
}
}

配置文件增加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#rabbitmq
spring.rabbitmq.addresses=RabbitMQ所在IP:Port
spring.rabbitmq.username=XXX
spring.rabbitmq.password=XXXX
#stream
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.testStreamEntityConsumer.destination=testStreamEntityExchange
spring.cloud.stream.bindings.testStreamEntityConsumer.group=testStreamEntityQueueName
spring.cloud.stream.rabbit.bindings.testStreamEntityConsumer.consumer.bindingRoutingKey=testStreamEntityRoutingKey
spring.cloud.stream.bindings.testStreamJSONConsumer.destination=testStreamJSONExchange
spring.cloud.stream.bindings.testStreamJSONConsumer.group=testStreamJSONQueueName
spring.cloud.stream.rabbit.bindings.testStreamJSONConsumer.consumer.bindingRoutingKey=testStreamJSONRoutingKey
spring.cloud.stream.bindings.testStreamStringConsumer.destination=testStreamStringExchange
spring.cloud.stream.bindings.testStreamStringConsumer.group=testStreamStringQueueName
spring.cloud.stream.rabbit.bindings.testStreamStringConsumer.consumer.bindingRoutingKey=testStreamStringRoutingKey

注意配置文件中的destination为消息的目的地(类似于Kafka的topic与RabbitMQ的exchange),需要生产者和消费者保持相同,exchangeType表示采用的是topic模式。group为一个组(Spring Cloud Stream中每个group都会接收都消息,且只接收一次)。

4. 总结

Spring Cloud Stream 最大的好处在于,抽象了微服务开发中事件驱动的一些概念,对消息中间件的使用做了进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(从Kafka切换到RabbitMQ而不需要改代码)。使得微服务的消息通信开发做到了高度解耦。

以上内容就是关于Spring Cloud微服务框架之数据流Stream的全部内容了,谢谢你阅读到了这里!

Author:zhaoyh