Redis发布订阅

有两种不联系:一种是忘记了,一种是放在回忆里

Posted by yishuifengxiao on 2021-01-19

一 基于RedisConnectionFactory的发布订阅

1.1 订阅方代码

配置代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

@Configuration
public class RedisConfig {

/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
*
* @param connectionFactory
* @param listenerAdapter
* @return
*/
//MessageListenerAdapter 表示监听频道的不同订阅者
@Bean
RedisMessageListenerContainer container2(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);

// 可以添加多个 messageListener,配置不同的交换机
// 这里支持通配符
container.addMessageListener(new MessageListenerAdapter(new RedisReceiver(), "onMessage"),
new PatternTopic("d?*"));

return container;
}
}

被消费的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

@Slf4j
public class RedisReceiver implements MessageListener {

private CommonBaseService commonBaseService;

@Override
public void onMessage(Message message, byte[] pattern) {
//业务处理
}

public RedisReceiver(CommonBaseService commonBaseService) {
this.commonBaseService = commonBaseService;
}
}

1.2 发布方代码

1
2
3
4
5
6
@Autowired
private RedisTemplate<String, Object> redisTemplate;


//发布消息
redisTemplate.convertAndSend(EventConstant.CHANNEL_NAME, behavior);

二 使用ReactiveRedisTemplate发布订阅

在项目里加入以下依赖

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

发布订阅的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Component
public class AfterCommond implements CommandLineRunner {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private ReactiveRedisTemplate reactiveRedisTemplate;

@Override
public void run(String... args) throws Exception {

reactiveRedisTemplate.listenToChannel("demo?").doOnNext(msg -> {
System.out.println("接收到的消息?为 " + msg);
}).subscribe();

reactiveRedisTemplate.listenToChannel("demo?*").doOnNext(msg -> {
System.out.println("接收到的消息?*为 " + msg);
}).subscribe();

reactiveRedisTemplate.listenToChannel("demo*").doOnNext(msg -> {
System.out.println("接收到的消息*为 " + msg);
}).subscribe();

reactiveRedisTemplate.listenToChannel("demo").doOnNext(msg -> {
System.out.println("接收到的消息A为 " + msg);
}).subscribe();

reactiveRedisTemplate.listenToChannel("/demo").doOnNext(msg -> {
System.out.println("接收到的消息B为 " + msg);
}).subscribe();

new Thread(() -> {
while (true) {
try {
redisTemplate.convertAndSend("demo", "A" + System.currentTimeMillis());
redisTemplate.convertAndSend("/demo", "B" + System.currentTimeMillis());
Thread.sleep(2000);
System.out.println("------->L A");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

}
}

在控制台可以看到以下日志

1
2
3
4
5
6
7
8
9
10
11
12
------->L A
接收到的消息A为 ChannelMessage {channel=demo, message=A1611801361359}
接收到的消息B为 ChannelMessage {channel=/demo, message=B1611801361360}
------->L A
接收到的消息A为 ChannelMessage {channel=demo, message=A1611801363361}
接收到的消息B为 ChannelMessage {channel=/demo, message=B1611801363362}
------->L A
接收到的消息A为 ChannelMessage {channel=demo, message=A1611801365363}
接收到的消息B为 ChannelMessage {channel=/demo, message=B1611801365364}
------->L A
接收到的消息A为 ChannelMessage {channel=demo, message=A1611801367366}
接收到的消息B为 ChannelMessage {channel=/demo, message=B1611801367367}

注意:

使用ReactiveRedisTemplate进行订阅时暂不支持通配符模式

三 使用ReactiveRedisMessageListenerContainer订阅

订阅代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Bean
public ReactiveRedisMessageListenerContainer reactiveRedisMessageListenerContainer(ReactiveRedisConnectionFactory factory) {
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);
Flux<ReactiveSubscription.Message<String, String>> receive = container.receive(ChannelTopic.of("demo"));
receive.doOnNext(t -> {

try {
ReactiveSubscription.Message ms = t;

System.out.println(new String(ms.getMessage().toString().getBytes("utf-8"), "utf-8"));

System.out.println("------> 接收到的消息为 " + t);

System.out.println("--- " + ms);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}).subscribe();
return container;
}

注意:

  • 在此模式下,发送消息时注意设置好序列化,否则在接收方容易出现乱码。
  • ChannelTopic不支持通配符

如果需要支持通配符,请使用以下模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean
public ReactiveRedisMessageListenerContainer reactiveRedisMessageListenerContainer(ReactiveRedisConnectionFactory factory) {
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);
Flux<ReactiveSubscription.PatternMessage<String, String, String>> receive = container.receive(new PatternTopic[]{PatternTopic.of("de?*")});
receive.doOnNext(t -> {

try {
ReactiveSubscription.Message ms = t;


System.out.println("------> 接收到的消息为 " + t);

System.out.println("--- " + ms);
} catch (Exception e) {
e.printStackTrace();
}
}).subscribe();
return container;
}

此时控制台收到的消息如下:

1
2
3
4
5
6
7
8
9
------->L A
------> 接收到的消息为 PatternMessage{channel=demo, pattern=de?*, message="A1611808659713"}
--- PatternMessage{channel=demo, pattern=de?*, message="A1611808659713"}
------->L A
------> 接收到的消息为 PatternMessage{channel=demo, pattern=de?*, message="A1611808661715"}
--- PatternMessage{channel=demo, pattern=de?*, message="A1611808661715"}
------->L A
------> 接收到的消息为 PatternMessage{channel=demo, pattern=de?*, message="A1611808663717"}
--- PatternMessage{channel=demo, pattern=de?*, message="A1611808663717"}