时间:2021-05-23
前言
Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样。
链表实现消息队列
Redis链表支持前后插入以及前后取出,所以如果往尾部插入元素,往头部取出元素,这就是一种消息队列,也可以说是消费者/生产者模型。可以利用lpush和rpop来实现。但是有一个问题,如果链表中没有数据,那么消费者将要在while循环中调用rpop,这样以来就浪费cpu资源,好在Redis提供一种阻塞版pop命令brpop或者blpop,用法为brpop/blpop list timeout, 当链表为空的时候,brpop/blpop将阻塞,直到设置超时时间到或者list插入一个元素。
用法如下:
charles@charles-Aspire-4741:~/mydir/mylib/redis$ ./src/redis-cli127.0.0.1:6379> lpush list hello(integer) 1127.0.0.1:6379> brpop list 01) "list"2) "hello"127.0.0.1:6379> brpop list 0//阻塞在这里//当我在另一个客户端lpush一个元素之后,客户端输出为127.0.0.1:6379> brpop list 01) "list"2) "world"(50.60s)//阻塞的时间当链表为空的时候,brpop是阻塞的,等待超时时间到或者另一个客户端lpush一个元素。接下来,看下源码是如何实现阻塞brpop命令的。要实现客户端阻塞,只需要服务器不给客户端发送消息,那么客户端就会阻塞在read调用中,等待消息到达。这是很好实现的,关键是如何判断这个客户端阻塞的链表有数据到达以及通知客户端解除阻塞?Redis的做法是,将阻塞的键以及阻塞在这个键上的客户端链表存储在一个字典中,然后每当向数据库插入一个链表时,就判断这个新插入的链表是否有客户端阻塞,有的话,就解除这个阻塞的客户端,并且发送刚插入链表元素给客户端,客户端就这样解除阻塞。
先看下有关数据结构,以及server和client有关属性
//阻塞状态typedef struct blockingState { mstime_t timeout; dict *keys; /* The keys we are waiting to terminate a blocking * operation such as BLPOP. Otherwise NULL. */ robj *target; /* The key that should receive the element, * for BRPOPLPUSH. */ int numreplicas; long long reploffset; } blockingState;//继续列表typedef struct readyList { redisDb *db;//就绪键所在的数据库 robj *key;//就绪键} readyList;//客户端有关属性typedef struct redisClient { int btype; blockingState bpop; }//服务器有关属性struct redisServer { unsigned int bpop_blocked_clients; list *unblocked_clients; list *ready_keys; }//数据库有关属性typedef struct redisDb { //keys->redisCLient映射 dict *blocking_keys; dict *ready_keys; }redisDB必须对上述的数据结构足够了解,否则很难看懂下面的代码,因为这些代码需要操作上述的数据结构。先从brpop命令执行函数开始分析,brpop命令执行函数为
void brpopCommand(redisClient *c) { blockingPopGenericCommand(c,REDIS_TAIL);}//++++++++++++++++++++++++++++++++++++++++++++++++++void blockingPopGenericCommand(redisClient *c, int where) { robj *o; mstime_t timeout; int j; if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS) != REDIS_OK) return;//将超时时间保存在timeout中 for (j = 1; j < c->argc-1; j++) { o = lookupKeyWrite(c->db,c->argv[j]);//在数据库中查找操作的链表 if (o != NULL) {//如果不为空 if (o->type != REDIS_LIST) {//不是链表类型 addReply(c,shared.wrongtypeerr);//报错 return; } else { if (listTypeLength(o) != 0) {//链表不为空 char *event = (where == REDIS_HEAD) ? "lpop" : "rpop"; robj *value = listTypePop(o,where);//从链表中pop出一个元素 redisAssert(value != NULL); //给客户端发送pop出来的元素信息 addReplyMultiBulkLen(c,2); addReplyBulk(c,c->argv[j]); addReplyBulk(c,value); decrRefCount(value); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event, c->argv[j],c->db->id); if (listTypeLength(o) == 0) {//如果链表为空,从数据库删除链表 dbDelete(c->db,c->argv[j]); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del", c->argv[j],c->db->id); } } } } } blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);}从源码可以看出,brpop可以操作多个链表变量,例如brpop list1 list2 0,但是只能输出第一个有元素的链表。如果list1没有元素,而list2有元素,则输出list2的元素;如果两个都有元素,则输出list1的元素;如果都没有元素,则等待其中某个链表插入一个元素,之后在2返回。最后调用blockForyKeys阻塞
void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) { dictEntry *de; list *l; int j; c->bpop.timeout = timeout;//超时时间赋值给客户端blockingState属性 c->bpop.target = target;//这属性适用于brpoplpush命令的输入对象,如果是brpop, //则target为空 if (target != NULL) incrRefCount(target);//不为空,增加引用计数 for (j = 0; j < numkeys; j++) { if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue; incrRefCount(keys[j]); //将阻塞的key和客户端添加进c->db->blocking_keys de = dictFind(c->db->blocking_keys,keys[j]); if (de == NULL) { int retval; l = listCreate(); retval = dictAdd(c->db->blocking_keys,keys[j],l); incrRefCount(keys[j]); redisAssertWithInfo(c,keys[j],retval == DICT_OK); } else { l = dictGetVal(de); } listAddNodeTail(l,c);//添加到阻塞键的客户点链表中 } blockClient(c,REDIS_BLOCKED_LIST);//设置客户端阻塞标志}blockClient函数只是简单的设置客户端属性,如下
void blockClient(redisClient *c, int btype) { c->flags |= REDIS_BLOCKED;//设置标志 c->btype = btype;//阻塞操作类型 server.bpop_blocked_clients++;}由于这个函数之后,brpop命令执行函数就结束了,由于没有给客户端发送消息,所以客户端就阻塞在read调用中。那么如何解开客户端的阻塞了?
插入一个元素解阻塞
任何指令的执行函数都是在processCommand函数中调用call函数,然后在call函数中调用命令执行函数,lpush也一样。当执行完lpush之后,此时链表不为空,回到processCommand调用中,执行以下语句
if (listLength(server.ready_keys)) handleClientsBlockedOnLists();这两行代码是先检查server.ready_keys是否为空,如果不为空,说明已经有一些就绪的链表,此时可以判断是否有客户端阻塞在这个键值上,如果有,则唤醒;现在问题又来了,这个server.ready_keys在哪更新链表了?
原来是在dbAdd函数中,当往数据库中添加的值类型为REDIS-LIST时,这时就要调用signalListAsReady函数将链表指针添加进server.ready_keys:
//db.cvoid dbAdd(redisDb *db, robj *key, robj *val) { sds copy = sdsdup(key->ptr); int retval = dictAdd(db->dict, copy, val);//将数据添加进数据库 redisAssertWithInfo(NULL,key,retval == REDIS_OK); //判断是否为链表类型,如果是,调用有链表已经ready函数 if (val->type == REDIS_LIST) signalListAsReady(db, key); if (server.cluster_enabled) slotToKeyAdd(key); }//t_list.cvoid signalListAsReady(redisDb *db, robj *key) { readyList *rl; if (dictFind(db->blocking_keys,key) == NULL) return; if (dictFind(db->ready_keys,key) != NULL) return; rl = zmalloc(sizeof(*rl)); rl->key = key; rl->db = db; incrRefCount(key); listAddNodeTail(server.ready_keys,rl);//添加链表末尾 /* We also add the key in the db->ready_keys dictionary in order * to avoid adding it multiple times into a list with a simple O(1) * check. */ incrRefCount(key); //同时将这个阻塞键放入db->ready_keys redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);}OK,这时server.ready_keys上已经有就绪键了,这时就调用processCommand函数中的handleClientsBlockedOnLists()函数来处理阻塞客户端,在这个函数中,
void handleClientsBlockedOnLists(void) { while(listLength(server.ready_keys) != 0) { list *l; l = server.ready_keys; server.ready_keys = listCreate(); while(listLength(l) != 0) { listNode *ln = listFirst(l);//获取第一个就绪readyList readyList *rl = ln->value; dictDelete(rl->db->ready_keys,rl->key); robj *o = lookupKeyWrite(rl->db,rl->key); if (o != NULL && o->type == REDIS_LIST) { dictEntry *de; de = dictFind(rl->db->blocking_keys,rl->key); if (de) { list *clients = dictGetVal(de);//转换为客户端链表 int numclients = listLength(clients); while(numclients--) {//给每个客户端发送消息 listNode *clientnode = listFirst(clients); redisClient *receiver = clientnode->value;//阻塞的客户端 robj *dstkey = receiver->bpop.target;//brpoplpush命令目的链表 int where = (receiver->lastcmd && receiver->lastcmd->proc == blpopCommand) ? REDIS_HEAD : REDIS_TAIL;//获取取出的方向 robj *value = listTypePop(o,where);//取出就绪链表的元素 if (value) { /* Protect receiver->bpop.target, that will be * freed by the next unblockClient() * call. */ if (dstkey) incrRefCount(dstkey); unblockClient(receiver);//设置客户端为非阻塞状态 if (serveClientBlockedOnList(receiver, rl->key,dstkey,rl->db,value, where) == REDIS_ERR) { /* If we failed serving the client we need * to also undo the POP operation. */ listTypePush(o,value,where); }//给客户端回复链表中的元素内容 if (dstkey) decrRefCount(dstkey); decrRefCount(value); } else { break; } } } //如果链表为空,则从数据库中删除 if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key); /* We don't call signalModifiedKey() as it was already called * when an element was pushed on the list. */ } decrRefCount(rl->key); zfree(rl); listDelNode(l,ln); } listRelease(l); }}从这个源码可知,如果有两个客户端,同时阻塞在一个链表上面,那么如果链表插入一个元素之后,只有先阻塞的那个客户端收到消息,后面阻塞的那个客户端继续阻塞,这也是先阻塞先服务的思想。handleClientsBlockedOnLists函数调用了unblockClient(receiver) ,该函数功能为接触客户端阻塞标志,以及找到db阻塞在key上的客户端链表,并将接触阻塞的客户端从链表删除。然后调用serveClientBlockOnList给客户端回复刚在链表插入的元素。
int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where){ robj *argv[3]; if (dstkey == NULL) { argv[0] = (where == REDIS_HEAD) ? shared.lpop : shared.rpop; argv[1] = key; propagate((where == REDIS_HEAD) ? server.lpopCommand : server.rpopCommand, db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL); addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,key); addReplyBulk(receiver,value); } else { }}propagate函数主要是将命令信息发送给aof和slave。函数中省略部分是brpoplpush list list1 0命令的目的链表list1非空时,将从list链表pop出来的元素插入list1中。当给客户端发送消息之后,客户端就从read函数调用中返回,变为不阻塞。
通过超时时间解阻塞
如果链表一直没有数据插入,那么客户端将会一直阻塞下去,这肯定是不行的,所以brpop还支持超时阻塞,即阻塞时间超过一定值之后,服务器返回一个空值,这样客户端就解脱阻塞了。
对于时间超时,都放在了100ms执行一次的时间事件中;超时解脱阻塞函数也在serverCron中;在serverCron->clientsCron->clientsCronHandleTimeout
int clientsCronHandleTimeout(redisClient *c, mstime_t now_ms) { time_t now = now_ms/1000; //.......... else if (c->flags & REDIS_BLOCKED) { /* Blocked OPS timeout is handled with milliseconds resolution. * However note that the actual resolution is limited by * server.hz. */ if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) { replyToBlockedClientTimedOut(c); unblockClient(c); } } //.............把这个函数不相干的代码删除,主要部分先判断这个客户端是否阻塞,如果是,超时时间是否到期,如果是,则调用replyToBlockedClientTimedOut给客户端回复一个空回复,以及接触客户端阻塞。
总结
链表消息队列实现暂时分析到这了,大家都学会了吗?希望这篇文章给大家能带来一定的帮助,如果有疑问可以留言交流。
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
本文实例讲述了PHP实现基于Redis的MessageQueue队列封装操作。分享给大家供大家参考,具体如下:Redis的链表List可以用来做链表,高并发的特
最近在研究redis做消息队列时,顺便看了一下RabbitMQ做消息队列的实现。以下是总结的RabbitMQ中三种exchange模式的实现,分别是fanout
本文实例为大家分享了php+redis消息队列实现抢购的具体代码,供大家参考,具体内容如下实现功能:1.基于redis队列,防止高并发的超卖2.基于mysql的
目前公司用阿里云+redis的方式实现的消息队列。了解了目前几种主流的消息组件(主要包括rabbitmq、kafka、)的优缺点后,这里为了深入学习rabbit
前言Redis的list数据结构常用来做消息队列,通常使用的命令有lpop/rpop,还有带阻塞版的blpop/brpop等。Laravel5.3消息队列也是用