基于redis构建消息队列

一般来说,消息队列有两种场景:一种是发布者订阅者模式;一种是生产者消费者模式。利用redis这两种场景的消息队列都能够实现。定义:

  • 生产者消费者模式:生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。(常用于处理高并发写操作)
  • 发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。(常用来作为日志收集中一份原始数据对多个应用场景)

初步探究

1、redis作为消息中间件:
1)Producer/ConsumerMode:
该方式是借助redis的list结构实现的。Producer调用redis的lpush往特定key里塞入消息,Consumer调用brpop(阻塞方法)去不断监听该key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// producer code
String key = "demo:mq:test";
String msg = "hello world";
redisDao.lpush(key, msg);

// consumer code
String key = "demo:mq:test";
while (true) {
// block invoke
List<String> msgs = redisDao.brpop(BLOCK_TIMEOUT, listKey);
if (msgs == null) continue;
String jobMsg = msgs.get(1);
processMsg(jobMsg);
}

2)PubSub Mode:
redis 从 2.0.0 版本开始支持 pub/sub 指令。实现思想很简单,Publisher调用redis的publish方法往特定的channel发送消息,Subscriber在初始化的时候要subscribe到该channel,一旦有消息就会立即接收,否则会阻塞。

:这种订阅是非持久化的。但是可以通过把订阅的消息额外的保存到redis中来实现持久化。

2、为什么采用单独的消息队列中间件?

  • 由于应用会部署到多个结点,所以无法直接采用java的BlockingQueue阻塞队列(在分布式环境下),所以采用redis提供的队列支持。
  • 如果要做到统计的绝对实时,最好采用大数据的实时计算的解决方案:kafka+storm 来实现

3、实现阻塞队列原理:

  • redis中有一个blpop、brpop的命令,阻塞的从列表(list)中取数据,当列表为空,则阻塞知道取到数据或超时。
  • blpop命令后面参数中可以跟多个list的key,内部是按照顺序进行访问的,基于这个原理可以实现高优先级队列。

深入探究

1、使用redis怎么做消息队列

  1. 首先redis它的设计是用来做缓存的,但是由于它自身的某种特性使得他可以用来做消息队列。它有几个阻塞式的API(brpop、Sub,他们都是阻塞版的)可以使用,正是这些阻塞式的API让他有做消息队列的能力。
  2. 其次,消息队列的其他特性例如FIFO也很容易实现,只需要一个List对象从头取数据,从尾部塞数据即可实现。

2、简单FIFO队列:
1)一些基础redis基础知识的说明
redis> blpop tasklist 0
“im task 01”
这个例子使用blpop命令会阻塞方式地从tasklist列表中取头一个数据,最后一个参数就是等待超时的时间。如果设置为0则表示无限等 待。另外redis存放的数据都只能是string类型,所以在任务传递的时候只能是传递字符串。我们只需要简单的将负责数据序列化成json格式的字符串,然后消费者那边再转换一下即可。

2)实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import redis, time

def handle(task):
print task
time.sleep(4)

def main():
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
while 1:
result = r.brpop('tasklist', 0)
handle(result[1])

if __name__ == "__main__":
main()

上例子即使一个最简单的消费者,我们通过一个无限循环不断地从redis的队列中取数据。如果队列中没有数据则没有超时的阻塞在那里,有数据则取出往下执行。
一般情况取出来是个复杂的字符串,我们可能需要将其格式化后作为再传给处理函数,但是为了简单我们的例子就是一个普通字符串。另外例子中的处理函数不做任何处理,仅仅sleep 用来模拟耗时的操作。
我们另开一个redis的客户端来模拟生产者,自带的客户端就可以。多往tasklist 队列里面塞上一些数据。
1
2
3
4
5
redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> lpush tasklist 'im task 03'
redis> lpush tasklist 'im task 04'
redis> lpush tasklist 'im task 05'

随后在消费者端便会看到这些模拟出来的任务被挨个消费掉。

3、简单优先级的队列:
假设一种简单的需求,只需要高优先级的比低优先级的任务率先处理掉。其他任务之间的顺序一概不管,这种我们只需要在在遇到高优先级任务的时候将它塞到队列的前头,而不是push到最后面即可。因为我们的队列是使用的redis的 list,所以很容易实现。遇到高优先级的使用rpush 遇到低优先级的使用lpush。

1
2
3
4
5
6
redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> rpush tasklist 'im high task 01'
redis> rpush tasklist 'im high task 01'
redis> lpush tasklist 'im task 03'
redis> rpush tasklist 'im high task 03'

随后会看到,高优先级的总是比低优先级的率先执行。但是这个方案的缺点是高优先级的任务之间的执行顺序是先进后出的。

4、较完善的优先级队列:
1)弊端:
上例只是简单的将高优先级的任务塞到队列最前面,低优先级的塞到最后面。这样保证不了高优先级任务之间的顺序,假设当所有的任务都是高优先级的话,那么他们的执行顺序将是相反的。这样明显违背了队列的FIFO原则。
不过只要稍加改进就可以完善我们的队列。

2)跟使用rabbitmq一样,我们设置两个队列,一个高优先级一个低优先级的队列。高优先级任务放到高队列中,低的放在低优先队列中。redis和rabbitmq不同的是它可以要求队列消费者从哪个队列里面先读。

1
2
3
4
5
6
def main():
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
while 1:
result = r.brpop(['high_task_queue', 'low_task_queue'], 0)
handle(result[1])

上面的代码,会阻塞地从’high_task_queue’, ‘low_task_queue’这两个队列里面取数据,如果第一个没有再从第二个里面取。所以只需要将队列消费者做这样的改进便可以达到目的。
1
2
3
4
5
6
7
8
redis> lpush low_task_queue low001
redis> lpush low_task_queue low002
redis> lpush low_task_queue low003
redis> lpush low_task_queue low004
redis> lpush high_task_queue low001
redis> lpush high_task_queue low002
redis> lpush high_task_queue low003
redis> lpush high_task_queue low004

通过上面的测试看到,高优先级的会被率先执行,并且高优先级之间也是保证了FIFO的原则。这种方案我们可以支持不同阶段的优先级队列,例如高中低三个级别或者更多的级别都可以。

5、优先级级别很多的情况
假设有个这样的需求,优先级不是简单的高中低或者0-10这些固定的级别。而是类似0-99999这么多级别。那么我们第三种方案将不太合适了。

虽然redis有sorted set这样的可以排序的数据类型,看是很可惜它没有阻塞版的接口。于是我们还是只能使用list类型通过其他方式来完成目的。

有个简单的做法我们可以只设置一个队列,并保证它是按照优先级排序号的。然后通过二分查找法查找一个任务合适的位置,并通过 lset 命令插入到相应的位置。
例如队列里面包含着写优先级的任务[1, 3, 6, 8, 9, 14],当有个优先级为7的任务过来,我们通过自己的二分算法一个个从队列里面取数据出来反和目标数据比对,计算出相应的位置然后插入到指定地点即可。

因为二分查找是比较快的,并且redis本身也都在内存中,理论上速度是可以保证的。但是如果说数据量确实很大的话我们也可以通过一些方式来调优。

回想我们第三种方案,把第三种方案结合起来就会很大程度上减少开销。例如数据量十万的队列,它们的优先级也是随机0-十万的区间。我们可以设置 10个或者100个不同的队列,0-一万的优先级任务投放到1号队列,一万-二万的任务投放到2号队列。这样将一个队列按不同等级拆分后它单个队列的数据 就减少许多,这样二分查找匹配的效率也会高一点。但是数据所占的资源基本是不变的,十万数据该占多少内存还是多少。只是系统里面多了一些队列而已。

参考:http://www.cnblogs.com/happyday56/p/4142761.html