茂展的分享博客

SpringCloud Stream

SpringCloud Stream

SpringCloud Stream(消息驱动微服务的框架)

先上官方的一张图
SpringCloud Stream架构图
从图中我们可以看出,应用程序通过inputs和outputs来与SpringCloud Stream中的binder交互,接着springcloud的binder来和消息中间件实现交互!
SpringCloud Stream为供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布—订阅,消费组,分区的三个核心概念。目前只支持RabbitMQ,Kafka.
在我看来,主要为了简化开发,开发者只关注业务即可。

既然有Rabbit、Kafka,为什么还要SpringCloud Stream消息驱动呢?

 比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,假如想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

如何实现呢,实操来体会

首先在对应的服务的pom.xml中引入依赖(我使用的底层是RabbitMQ)

1
2
3
4
 <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

然后我们定义一个接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface StreamClient {

String INPUT = "myMessage";
String INPUT2 = "myMessage2";


@Input(INPUT)
SubscribableChannel input();

@Output(INPUT)
MessageChannel output();

@Input(INPUT2)
SubscribableChannel input2();

@Output(INPUT2)
MessageChannel output2();
}

接着我们实现消息的接受者
其中的 @SendTo注解表示接收到消息并且返回给消息队列,然后@StreamListener(StreamClient.INPUT2)捕获到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {

@SendTo(StreamClient.INPUT2)
@StreamListener(StreamClient.INPUT)
public String process(Object message){
System.out.println("消息是:"+message);
return "received";
}

@StreamListener(StreamClient.INPUT2)
public void getMsg(Object msg){
System.out.println("接受到并返回:"+msg);
}
}

如何设置发送者呢
其中MessageBuilder是org.springframework.messaging.support.MessageBuilder该包下的

1
2
3
4
5
@GetMapping("/sendMessage")
public void process(){
String msg = "复习springcloud的 stream用法";
streamClient.output().send(MessageBuilder.withPayload(msg).build());
}

基本上介绍都差不多了
但是我们往往希望在rabbitMQ软件中也可以看出消息具体内容,所以我们可以这样在yml中配置

1
2
3
4
5
6
spring:
cloud:
stream:
bindings:
myMessage:
content-type: application/json

------本文结束感谢阅读------
🐶 您的支持将鼓励我继续创作 🐶