Redis 消息队列 Stream

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

tip作为程序员一定学习编程之道一定要对代码的编写有追求不能实现就完事了。我们应该让自己写的代码更加优雅即使这会费时费力。

推荐体系化学习JavaJava面试专题

文章目录

1、什么是 Stream

Stream 是 Redis 5.0 版本中新增的一种数据结构它是一个高性能、持久化的消息队列可以用于实现消息的发布和订阅。Stream 可以看作是一个有序的消息队列每个消息都有一个唯一的 ID可以根据 ID 进行消息的查找、删除和确认。在 Stream 中消息以键值对的形式存储可以存储任意类型的数据。Stream 还支持多个消费者组每个消费者组可以独立消费消息避免消息重复消费。Stream 的引入使得 Redis 在消息队列领域更具竞争力同时也为开发者提供了一种高效、可靠的消息处理方式。

2、为什么要设计 Stream

Redis 设计 Stream 的原因主要是为了满足大规模实时数据处理的需求。在传统的消息队列中消息的消费者只能消费最新的消息而无法消费过去的消息。而在实时数据处理中往往需要对历史数据进行分析和处理因此需要一种能够存储大量历史数据并支持快速查询和消费的数据结构。Stream 的引入解决了这个问题它支持持久化存储和快速查询可以存储大量历史数据并且支持多个消费者组独立消费消息从而满足了大规模实时数据处理的需求。此外Stream 还支持消息的延迟和重试等功能使得 Redis 在消息队列领域更具竞争力。

3、Stream 命令详解

Stream 是 Redis 5.0 版本新增的一种数据结构支持高性能、持久化的消息队列。下面是 Stream 命令的详细介绍

  1. XADD key ID field1 value1 [field2 value2 …]向指定的 Stream 中添加一条消息消息的 ID 由用户指定消息的字段和值由用户指定。
127.0.0.1:6379> XADD mystream 1000 name John age 30
"1000-0"
  1. XRANGE key start end [COUNT count]返回指定 Stream 中指定范围内的消息范围由 start 和 end 指定可以使用 “-” 表示最大或最小 IDCOUNT 参数表示返回消息的数量。
127.0.0.1:6379> XRANGE mystream 1000-0 1000-1
1) 1) "1000-0"
      2) 1) "name"
         2) "John"
         3) "age"
         4) "30"
  1. XREVRANGE key end start [COUNT count]同 XRANGE 命令但是返回的消息是逆序的。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XREVRANGE mystream 1003-0 1001-0
1) 1) "1003-0"
      2) 1) "name"
         2) "Jack"
         3) "age"
         4) "30"
   2) 1) "1002-0"
      2) 1) "name"
         2) "Mary"
         3) "age"
         4) "28"
   3) 1) "1001-0"
      2) 1) "name"
         2) "Tom"
         3) "age"
         4) "25"
  1. XLEN key返回指定 Stream 中消息的数量。
127.0.0.1:6379> XLEN mystream
1
  1. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [ID]从指定 Stream 中读取消息可以指定读取的消息数量和阻塞时间如果没有新的消息则等待指定时间后返回空结果。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XREAD COUNT 2 BLOCK 1000 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"
      2) 1) "1002-0"
         2) 1) "name"
            2) "Mary"
            3) "age"
            4) "28"
  1. XACK key group ID [ID …]确认指定消费者组已经处理了指定 ID 的消息。
127.0.0.1:6379> XACK mystream mygroup 1000-0
(integer) 1
  1. XGROUP CREATE key groupname ID [MKSTREAM]创建一个新的消费者组可以指定组名和起始 ID如果指定 MKSTREAM 参数则会自动创建 Stream。
127.0.0.1:6379> XGROUP CREATE mystream mygroup 1000-0
OK
  1. XGROUP SETID key groupname ID设置消费者组的起始 ID。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XGROUP SETID mystream mygroup 1002-0
OK
  1. XGROUP DESTROY key groupname销毁指定的消费者组。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XGROUP DESTROY mystream mygroup
(integer) 1
  1. XGROUP DELCONSUMER key groupname consumername从指定消费者组中删除指定的消费者。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"

127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup myconsumer COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"

127.0.0.1:6379> XGROUP DELCONSUMER mystream mygroup myconsumer
(integer) 1
  1. XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] STREAMS key [ID]从指定消费者组中读取消息可以指定读取的消息数量和阻塞时间如果没有新的消息则等待指定时间后返回空结果。
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1000-0"
         2) 1) "name"
            2) "John"
            3) "age"
            4) "30"
  1. XCLAIM key groupname consumername min-idle-time ID [ID …] [IDLE milliseconds] [TIME milliseconds] [RETRYCOUNT count] [FORCE]从指定消费者组中获取一条未被确认的消息并将其标记为正在处理可以指定最小空闲时间、最大空闲时间、重试次数等参数。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"

127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup myconsumer COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"

127.0.0.1:6379> XCLAIM mystream mygroup myconsumer 0 1001-0
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"
  1. XDEL key ID [ID …]从指定 Stream 中删除指定 ID 的消息。
127.0.0.1:6379> XDEL mystream 1000-0
(integer) 1
  1. XTRIM key MAXLEN [~] count删除 Stream 中多余的消息可以指定删除的数量或者删除到指定的 ID。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XADD mystream 1004 name Lucy age 27
"1004-0"
127.0.0.1:6379> XADD mystream 1005 name Bob age 32
"1005-0"

127.0.0.1:6379> XTRIM mystream MAXLEN 3
(integer) 2

4、java 写一点 Stream 的 demo

我这边 redis 的版本是 Redis-x64-5.0.14.1windows 上玩的绿色版的下载地址https://github.com/tporadowski/redis/releases

在这里插入图片描述
一下是 demo 的代码

package com.pany.camp.redis;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamConsumersInfo;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;

import java.util.*;

public class RedisStreamDemo {
    private static final Logger logger = LoggerFactory.getLogger(RedisStreamDemo.class);
    private static final int MESSAGE_READ_COUNT = 1;
    private static final long MESSAGE_READ_TIMEOUT = 120000L;

    public static void main(String[] args) {
        // 创建 Jedis 实例
        try (Jedis jedis = new Jedis("127.0.0.1", 6379)) {
            // 定义 Stream 名称和消费者组名称
            String streamName = "mystream";
            String groupName = "mygroup5";
            // 创建消费者组
            try {
                jedis.xgroupCreate(streamName, groupName, new StreamEntryID(), true);
            } catch (JedisDataException e) {
                // 如果 Stream 已经存在则忽略异常
                if (!e.getMessage().contains("BUSYGROUP")) {
                    throw e;
                }
            }
            logger.info("消费者组已创建");
            // 添加消息到 Stream 中
            Map<String, String> fields = new HashMap<>();
            fields.put("field1", "value1");
            fields.put("field2", "value2");
            StreamEntryID messageId = jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, fields);
            logger.info("消息已添加到 Stream 中消息内容为{}", JSONObject.toJSONString(fields));
            // 读取消息
            Map.Entry<String, StreamEntryID> streams = new AbstractMap.SimpleImmutableEntry<>(streamName,
                    new StreamEntryID().UNRECEIVED_ENTRY);
            List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(groupName, "c1", MESSAGE_READ_COUNT,
                    MESSAGE_READ_TIMEOUT, true, streams);
            logger.info("从 Stream 中读取了 {} 条消息", messages.size());
            for (Map.Entry<String, List<StreamEntry>> entry : messages) {
                String sn = entry.getKey();
                List<StreamEntry> streamMessages = entry.getValue();
                for (StreamEntry message : streamMessages) {
                    logger.info("Stream 名称{}", sn);
                    logger.info("Message ID{}", message.getID());
                    logger.info("Message fields{}", message.getFields());
                }
            }
            // 确认消息已经被消费
            jedis.xack(streamName, groupName, messageId);
            logger.info("消息已确认消费");
            // 删除消息
            jedis.xdel(streamName, messageId);
            logger.info("消息已删除");
        } catch (Exception e) {
            logger.error("执行 Redis 操作出错", e);
        }
    }
}
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.7.0</version>
</dependency>

在这里插入图片描述

5、Stream 的应用场景

Redis Stream 的常用的应用场景

  1. 消息队列Stream 可以作为一个高性能的消息队列使用支持多个消费者对同一 Stream 进行消费且支持消费者组的管理、消息确认和消息持久化等功能。

  2. 日志收集Stream 可以作为一个分布式的日志收集系统使用支持多个客户端将日志写入到同一 Stream 中且支持按照时间戳和 ID 进行查询和过滤。

  3. 实时数据处理Stream 可以作为一个实时数据处理系统使用支持多个客户端将实时数据写入到同一 Stream 中且支持按照时间戳和 ID 进行查询和过滤。

  4. 事件驱动架构Stream 可以作为一个事件驱动架构的基础设施使用支持多个事件源将事件写入到同一 Stream 中且支持按照事件类型和时间戳进行查询和过滤。

本文由激流原创首发于CSDN博客博客主页 https://blog.csdn.net/qq_37967783?spm=1010.2135.3001.5421
喜欢的话记得点赞收藏啊

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: redis

“Redis 消息队列 Stream” 的相关文章