您现在的位置是:网站首页> 编程资料编程资料

Redis 中使用 list,streams,pub/sub 几种方式实现消息队列的问题_Redis_

2023-05-27 391人已围观

简介 Redis 中使用 list,streams,pub/sub 几种方式实现消息队列的问题_Redis_

使用 Redis 实现消息队列

Redis 中也是可以实现消息队列

不过谈到消息队列,我们会经常遇到下面的几个问题

1、消息如何防止丢失;

2、消息的重复发送如何处理;

3、消息的顺序性问题;

关于 mq 中如何处理这几个问题,可参看RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

基于List的消息队列

对于 List

使用 LPUSH 写入数据,使用 RPOP 读出数据

127.0.0.1:6379> LPUSH test "ceshi-1" (integer) 1 127.0.0.1:6379> RPOP test "ceshi-1" 

使用 RPOP 客户端就需要一直轮询,来监测是否有值可以读出,可以使用 BRPOP 可以进行阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。

127.0.0.1:6379> BRPOP test 10 

后面的 10 是监听的时间,单位是秒,10秒没数据,就退出。

如果客户端从队列中拿到一条消息时,但是还没消费,客户端宕机了,这条消息就对应丢失了, Redis 中为了避免这种情况的出现,提供了 BRPOPLPUSH 命令,BRPOPLPUSH 会在消费一条消息的时候,同时把消息插入到另一个 List,这样如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。

127.0.0.1:6379> LPUSH test "ceshi-1" (integer) 1 127.0.0.1:6379> LPUSH test "ceshi-2" (integer) 2 127.0.0.1:6379> BRPOPLPUSH test a-test 100 "ceshi-1" 127.0.0.1:6379> BRPOPLPUSH test a-test 100 "ceshi-2" 127.0.0.1:6379> BRPOPLPUSH test a-test 100 127.0.0.1:6379> RPOP a-test "ceshi-1" 127.0.0.1:6379> RPOP a-test "ceshi-2" 

不过 List 类型并不支持消费组的实现,Redis 从 5.0 版本开始提供的 Streams 数据类型,来支持消息队列的场景。

分析下源码实现

在版本3.2之前,Redis中的列表是 ziplist 和 linkedlist 实现的,针对 ziplist 存在的问题, 在3.2之后,引入了 quicklist 来对 ziplist 进行优化。

对于 ziplist 来讲:

1、保存过大的元素,否则容易导致内存重新分配,甚至可能引发连锁更新的问题。

2、保存过多的元素,否则访问性能会降低。

quicklist 使多个数据项,不再用一个 ziplist 来存,而是分拆到多个 ziplist 中,每个 ziplist 用指针串起来,这样修改其中一个数据项,即便发生级联更新,也只会影响这一个 ziplist,其它 ziplist 不受影响。

下面看下 list 的实现

代码链接https://github.com/redis/redis/blob/6.2/src/t_list.c

void listTypePush(robj *subject, robj *value, int where) { if (subject->encoding == OBJ_ENCODING_QUICKLIST) { int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL; if (value->encoding == OBJ_ENCODING_INT) { char buf[32]; ll2string(buf, 32, (long)value->ptr); quicklistPush(subject->ptr, buf, strlen(buf), pos); } else { quicklistPush(subject->ptr, value->ptr, sdslen(value->ptr), pos); } } else { serverPanic("Unknown list encoding"); } } /* Wrapper to allow argument-based switching between HEAD/TAIL pop */ void quicklistPush(quicklist *quicklist, void *value, const size_t sz, int where) { if (where == QUICKLIST_HEAD) { quicklistPushHead(quicklist, value, sz); } else if (where == QUICKLIST_TAIL) { quicklistPushTail(quicklist, value, sz); } } 

可以看下上面主要用到的是 quicklist

这里再来分析下 quicklist 的数据结构

typedef struct quicklist { // quicklist的链表头 quicklistNode *head; // quicklist的链表尾 quicklistNode *tail; // 所有ziplist中的总元素个数 unsigned long count; /* total count of all entries in all ziplists */ // quicklistNodes的个数 unsigned long len; /* number of quicklistNodes */ int fill : QL_FILL_BITS; /* fill factor for individual nodes */ unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */ unsigned int bookmark_count: QL_BM_BITS; quicklistBookmark bookmarks[]; } quicklist; typedef struct quicklistNode { // 前一个quicklistNode struct quicklistNode *prev; // 后一个quicklistNode struct quicklistNode *next; // quicklistNode指向的ziplist unsigned char *zl; // ziplist的字节大小 unsigned int sz; /* ziplist size in bytes */ // ziplist中的元素个数 unsigned int count : 16; /* count of items in ziplist */ // 编码格式,原生字节数组或压缩存储 unsigned int encoding : 2; /* RAW==1 or LZF==2 */ // 存储方式 unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ // 数据是否被压缩 unsigned int recompress : 1; /* was this node previous compressed? */ // 数据能否被压缩 unsigned int attempted_compress : 1; /* node can't compress; too small */ // 预留的bit位 unsigned int extra : 10; /* more bits to steal for future usage */ } quicklistNode; 

quicklist 作为一个链表结构,在它的数据结构中,是定义了整个 quicklist 的头、尾指针,这样一来,我们就可以通过 quicklist 的数据结构,来快速定位到 quicklist 的链表头和链表尾。

来看下 quicklist 是如何插入的

/* Add new entry to head node of quicklist. * * Returns 0 if used existing head. * Returns 1 if new head created. */ int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) { quicklistNode *orig_head = quicklist->head; assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */ if (likely( // 检测插入位置的 ziplist 是否能容纳该元素 _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) { quicklist->head->zl = ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); quicklistNodeUpdateSz(quicklist->head); } else { // 容纳不了,就重新创建一个 quicklistNode quicklistNode *node = quicklistCreateNode(); node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD); quicklistNodeUpdateSz(node); _quicklistInsertNodeBefore(quicklist, quicklist->head, node); } quicklist->count++; quicklist->head->count++; return (orig_head != quicklist->head); } 

quicklist 采用的是链表结构,所以当插入一个新元素的时候,首先判断下 quicklist 插入位置的 ziplist 是否能容纳该元素,即单个 ziplist 是否不超过 8KB,或是单个 ziplist 里的元素个数是否满足要求。

如果可以插入就当前的节点进行插入,否则就新建一个 quicklistNode 来保存先插入的节点。

quicklist 通过控制每个 quicklistNode 中,ziplist 的大小或是元素个数,就有效减少了在 ziplist 中新增或修改元素后,发生连锁更新的情况,从而提供了更好的访问性能。

基于 Streams 的消息队列

Streams 是 Redis 专门为消息队列设计的数据类型。

  • 是可持久化的,可以保证数据不丢失。

  • 支持消息的多播、分组消费。

  • 支持消息的有序性。

来看下几个主要的命令

XADD:插入消息,保证有序,可以自动生成全局唯一ID; XREAD:用于读取消息,可以按ID读取数据; XREADGROUP:按消费组形式读取消息; XPENDING和XACK:XPENDING命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而XACK命令用于向消息队列确认消息处理已完成。 

下面看几个常用的命令

XADD

使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列,XADD 语法格式:

$ XADD key ID field value [field value ...] 
  • key:队列名称,如果不存在就创建

  • ID:消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性

  • field value:记录

$ XADD teststream * name xiaohong surname xiaobai "1646650328883-0" 

可以看到 1646650328883-0就是自动生成的全局唯一消息ID

XREAD

使用 XREAD 以阻塞或非阻塞方式获取消息列表

$ XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] 
  • count:数量

  • milliseconds:可选,阻塞毫秒数,没有设置就是非阻塞模式

  • key:队列名

  • id:消息 ID

$ XREAD BLOCK 100 STREAMS teststream 0 1) 1) "teststream" 2) 1) 1) "1646650328883-0" 2) 1) "name" 2) "xiaohong" 3) "surname" 4) "xiaobai" 

BLOCK 就是阻塞的毫秒数

XGROUP

使用 XGROUP CREATE 创建消费者组

$ XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername] 
  • key:队列名称,如果不存在就创建

  • groupname:组名

  • $:表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略

从头开始消费

$ XGROUP CREATE teststream test-consumer-group-name 0-0 

从尾部开始消费

$ XGROUP CREATE teststream test-consumer-group-name $ 

XREADGROUP GROUP

使用 XREADGROUP GROUP 读取消费组中的消息

$ XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] 
  • group:消费组名

  • consumer:消费者名

  • count:读取数量

  • milliseconds:阻塞毫秒数

  • key:队列名

  • ID:消息 ID

$ XADD teststream * name xiaohong surname xiaobai "1646653392799-0" $ XREADGROUP GROUP test-consumer-group-name test-consumer-name COUNT 1 STREAMS teststream > 1) 1) "teststream" 2) 1) 1) "1646653392799-0" 2) 1) "name" 2) "xiaohong" 3) "surname" 4) "xiaobai" 

消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。

如果没有通过 XACK 命令告知消息已经成功消费了,该消息会一直存在,可以通过 XPENDING 命令查看已读取、但尚未确认处理完成的消息。

$ XPENDING teststream test-consumer-group-name 1) (integer) 3 2) "1646653325535-0" 3) "1646653392799-0" 4) 1) 1) "test-consumer-name" 2) "3" 

分析下源码实现

stream 的结构

typedef struct stream { // 这是使用前缀树存储数据 rax *rax; /* The radix tree holding the stream. */ uint64_t length; /* Number of elements inside this stream. */ // 当前stream的最后一个id streamID last_id; /* Zero if there are yet no items. */ // 存储当前的消费者组信息 rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ } stream; t
                
                

-六神源码网