发布订阅(pub/sub)是一种消息通信模式,主要的目的是解耦消息发布者和消息订阅者之间的耦合,这点和设计模式中的观察者模式比较相似。pub /sub不仅仅解决发布者和订阅者直接代码级别耦合也解决两者在物理部署上的耦合。redis作为一个pub/sub server,在订阅者和发布者之间起到了消息路由的功能。订阅者可以通过subscribe和psubscribe命令向redis server订阅自己感兴趣的消息类型,redis将消息类型称为通道(channel)。当发布者通过publish命令向redis server发送特定类型的消息时。订阅该消息类型的全部client都会收到此消息。这里消息的传递是多对多的。一个client可以订阅多个 channel,也可以向多个channel发送消息。
注:
redis中消息不会持久化,当订阅者不在线时,消息就会丢失。(没有处理离线消息的功能)
订阅、退订:
1)订阅命令:
- 通过subscribe
命令可以订阅一个或多个频道,从而成为该频道的订阅者,当有其他客户端向该频道发送消息时,频道的所有订阅者会收到该消息。
1 | 127.0.0.1:6379> subscribe hello_redis |
注:可以一次性订阅多频道,如:127.0.0.1:6379> subscribe hello_redis hello_123 hello_32
- 通过psubscribe
命令可以订阅一个或多个模式,从而成为模式的订阅者,当有其他客户端向一个频道发送消息时,频道的所有订阅者会收到该消息,同时所有和这个频道匹配的模式也会收到消息。
1 | 127.0.0.1:6379> psubscribe 'hello_*' |
注:可以一次性订阅多个模式,如:127.0.0.1:6379> psubscribe he ha
2)退订命令:
- 通过unsubscribe
可以退订某个频道; - 通过punsubscribe
可以退订某个模式;
1 | 127.0.0.1:6379> unsubscribe hello_redis |
发布消息:
在redis客户端执行publish
- 将消息message发送给channel频道的所有订阅者;
- 如果有一个或者多个模式pattern与频道channel匹配,那么将消息message发送给pattern的订阅者;
订阅、发布的实现:
1、频道的订阅、退订:
redis将频道的订阅关系保存到pubsub_channels字典里,字典的键是被订阅的频道,值是一个链表,记录了所有订阅这个频道的客户端。
1)执行subscribe命令时:
- 如果频道已经有其他订阅者,那么pubsub_channel字典中一定有其他订阅者链表,redis只需要将客户端添加到链表尾部即可将;
- 如果频道没有订阅者,那么先在pubsub_channel字典中创建一个键,然后值对应一个空链表,再讲客户端添加到链表尾部;
2)执行unsubscribe命令时,直接在链表中删除客户信息即可:
- 如果链表为空,那么把pubsub_channel字典中对应的频道键也删除;
- 如果链表不为空,删除客户节点即可;
2、模式的订阅、退订:
redis将模式的订阅关系保存到pubsub_patterns链表中,链表中每个节点是pubsubPattern结构,包含了模式和客户两个信息。
1)执行psubscribe命令是:
- 创建pubsubPattern结构,设置模式和客户信息;
- 将pubsubPattern结构添加到pubsub_patterns链表中;
2)执行punsubscribe命令时:
- 在pubsub_patterns链表中查找模式为被推定模式,且客户属性为执行退订命令的客户的pubsubPattern节点;
3、发布:执行publish命令时:
- redis需要在pubsub_Channels字典中根据channel找到对应键的值(是一个链表),然后将消息发送给链表中所有客户;
- redis需要遍历pubsub_Patterns链表,查找那些与channel匹配的模式,然后将消息发送给客户;
查看订阅信息——pubsub命令:
pubsub命令:redis2.8新增命令,客户端执行该命令,可以查看频道、模式的订阅信息。
1、pubsub channels [pattern]:返回服务器当前被订阅的频道(不能查找模式):
- 如果不指定pattern,返回服务器当前所有被订阅的频道;
- 指定pattern,返回服务器当前和pattern匹配的那些被订阅的频道;
该命令,是通过遍历pubsub_channels字典的key来实现的,返回那些符合条件的频道。1
2
3
4
5127.0.0.1:6379> pubsub channels
1) "hello_redis"
2) "hello"
3) "liuxiao"
4) "hello_liuxiao"
2、pubsub numsub [channel-1]… :返回这些频道订阅者的数量。
该命令,是通过在pubsub_channels字典中找到劈到对应的链表,返回链表的长度实现的。1
2
3127.0.0.1:6379> pubsub numsub 'hello_redis'
1) "hello_redis"
2) (integer) 2
3、pubsub numpat:返回当前服务器被订阅的模式数量。
该命令,是通过返回pubsub_patterns链表长度实现的。1
2127.0.0.1:6379> pubsub numpat
(integer) 1
【java 实例】
1、MyListener :
要使用Jedis的Publish/Subscribe功能,必须编写对JedisPubSub的自己的实现。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50package redis.subpub;
import redis.clients.jedis.JedisPubSub;
public class MyListener extends JedisPubSub {
// 取得订阅的消息后的处理
@Override
public void onMessage(String channel, String message) {
// TODO Auto-generated method stub
System.out.println(channel + "=" + message);
}
// 取得按表达式的方式订阅的消息后的处理
@Override
public void onPMessage(String pattern, String channel, String message) {
// TODO Auto-generated method stub
System.out.println(pattern + ":" + channel + "=" + message);
}
// 初始化订阅时候的处理
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
System.out.println("初始化 【频道订阅】 时候的处理 ");
}
// 取消订阅时候的处理
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
System.out.println("// 取消 【频道订阅】 时候的处理 ");
}
// 初始化按表达式的方式订阅时候的处理
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
System.out.println("初始化 【模式订阅】 时候的处理 ");
}
// 取消按表达式的方式订阅时候的处理
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
System.out.println("取消 【模式订阅】 时候的处理 ");
}
}
2、Sub1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class Sub {
public static void main(String[] args) {
try {
Jedis jedis = getJedis();
MyListener ml = new MyListener();
//可以订阅多个频道
//jedis.subscribe(ml, "liuxiao","hello","hello_liuxiao","hello_redis");
//jedis.subscribe(ml, new String[]{"hello_foo","hello_test"});
//这里启动了订阅监听,线程将在这里被阻塞
//订阅得到信息在lister的onPMessage(...)方法中进行处理
//使用模式匹配的方式设置频道
jedis.psubscribe(ml, new String[]{"hello_*"});
} catch (Exception e) {
e.printStackTrace();
}
}
}
注:
调用subscribe()或psubscribe() 时,当前线程都会阻塞。
3、Pub1
2
3
4
5
6
7
8
9
10
11public class Pub {
public static void main(String[] args) {
try {
Jedis jedis = getJedis();
jedis.publish("hello_redis","hello_redis");
} catch (Exception e) {
e.printStackTrace();
}
}
}