SpringCloud微服务项目实战 - 6.延迟任务

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

我没有失约我与春风共至我与夏蝉共鸣我与秋叶共舞我与凛冬共至唯独你背道而行

在这里插入图片描述



系列文章目录

  1. 项目搭建
  2. App登录及网关
  3. App文章
  4. 自媒体平台博主后台
  5. 内容审核(自动)
  6. 延迟任务 - 精准发布文章

文章目录




一、延迟任务概述

1.什么是延迟任务

  • 定时任务 有固定周期的有明确的触发时间
  • 延迟队列 没有固定的开始时间它常常是由一个事件触发的而在这个事件触发之后的一段时间内触发另一个事件任务可以立即执行也可以延迟

应用场景

  • 场景一 订单下单之后30分钟后如果用户没有付钱则系统自动取消订单如果期间下单成功任务取消
  • 场景二 接口对接出现网络问题1分钟后重试如果失败2分钟重试直到出现阈值终止

2. 技术选型

⑴. DelayQueue

JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列 内部采用优先队列 PriorityQueue 存储元素同时元素必须实现 Delayed 接口在创建元素时可以指定多久才可以从队列中获取当前元素只有在延迟期满时才能从队列中提取元素

在这里插入图片描述
使用线程池或者原生DelayQueue程序挂掉之后任务都是放在内存需要考虑未处理消息的丢失带来的影响如何保证数据不丢失需要持久化磁盘


⑵. RabbitMQ实现延迟任务

  • TTL Time To Live (消息存活时间)
  • 死信队列 Dead Letter Exchange(死信交换机)当消息成为Dead message后可以重新发送另一个交换机死信交换机

在这里插入图片描述

⑶. redis实现

zset数据类型的去重有序分数排序特点进行延迟。例如时间戳作为score进行排序
在这里插入图片描述
例如
生产者添加到4个任务到延迟队列中时间毫秒值分别为97、98、99、100。当前时间的毫秒值为90
消费者端进行监听如果当前时间的毫秒值匹配到了延迟队列中的毫秒值就立即消费




二、redis实现延迟任务

1. redis实现思路

在这里插入图片描述

问题思路

  1. 为什么任务需要存储在数据库中
    延迟任务是一个通用的服务任何需要延迟得任务都可以调用该服务需要考虑数据持久化的问题存储数据库中是一种数据安全的考虑。
  2. 为什么redis中使用两种数据类型list和zset
    效率问题算法的时间复杂度
  3. 在添加zset数据的时候为什么不需要预加载
    任务模块是一个通用的模块项目中任何需要延迟队列的地方都可以调用这个接口要考虑到数据量的问题如果数据量特别大为了防止阻塞只需要把未来几分钟要执行的数据存入缓存即可。

2. 导入微服务

⑴. 模块包

资源链接 https://pan.baidu.com/s/1zKW2mT2R_XAq95kzuk9B2Q?pwd=abcd

解压至 heima-leadnews-service 目录下

⑵. Pom配置

编辑 heima-leadnews-service/pom.xml 文件

        <!--延迟任务模块-->
        <module>heima-leadnews-schedule</module>

⑶. Nacos配置

添加 leadnews-schedule 配置

spring:
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    username: root
    password: 123456
# 设置Mapper接口所对应的XML文件位置如果你在Mapper接口中有自定义方法需要进行该配置
mybatis-plus:
  mapper-locations: classpath*:mapper/*.xml
  # 设置别名包扫描路径通过该属性可以给包中的类注册别名
  type-aliases-package: com.heima.model.schedule.pojos

在这里插入图片描述

3. 表结构分析

sql链接 https://pan.baidu.com/s/1bAmVJxP8NSKOBZAwoKJGHg?pwd=abcd
在这里插入图片描述

taskinfo 任务表
在这里插入图片描述
taskinfo_logs 任务日志表
在这里插入图片描述
MySQL中BLOB是一个二进制大型对象是一个可以存储大量数据的容器LongBlob 最大存储 4G


4. 实体类

新建 heima-leadnews-model/src/main/java/com/heima/model/schedule/pojos/Taskinfo.java 文件

/**
 * <p>
 * 
 * </p>
 *
 * @author itheima
 */
@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private byte[] parameters;

    /**
     * 优先级
     */
    @TableField("priority")
    private Integer priority;

    /**
     * 任务类型
     */
    @TableField("task_type")
    private Integer taskType;
}

新建 heima-leadnews-model/src/main/java/com/heima/model/schedule/pojos/TaskinfoLogs.java 文件

/**
 * <p>
 * 
 * </p>
 *
 * @author itheima
 */
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private byte[] parameters;

    /**
     * 优先级
     */
    @TableField("priority")
    private Integer priority;

    /**
     * 任务类型
     */
    @TableField("task_type")
    private Integer taskType;

    /**
     * 版本号,用乐观锁
     */
    @Version
    private Integer version;

    /**
     * 状态 0=int 1=EXECUTED 2=CANCELLED
     */
    @TableField("status")
    private Integer status;
}

5. 乐观锁

⑴. 解决并发策略

悲观锁(Pessimistic Lock)
每次去拿数据的时候都认为别人会修改所以每次在拿数据的时候都会上锁
在这里插入图片描述

乐观锁(Optimistic Lock)
每次去拿数据的时候都认为别人不会修改所以不会上锁但是在更新的时候会判断一下在此期间别人有没有去更新这个数据可以使用版本号等机制
在这里插入图片描述


⑵. 启动类

编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/ScheduleApplication.java 文件

    /**
     * mybatis-plus乐观锁支持
     * @return
     */
    @Bean
    public MybatisPlusInterceptor optimisticLockerInterceptor(){
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
        return interceptor;
    }

6. redis

⑴. 容器

# 列出本地镜像
docker images

# 创建容器
docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"

# 列出容器
docker ps

⑵. redis连接

redis安装链接https://pan.baidu.com/s/1IzSRs6JPAxM64IomZv8U6g?pwd=abcd

在这里插入图片描述


⑶. 集成

①. 依赖

编辑 heima-leadnews-common/pom.xml 文件

        <!--spring data redis & cache -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!-- redis依赖commons-pool 这个依赖一定要添加 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>

②. Nacos配置

编辑 leadnews-schedule 配置

spring:
  redis:
    host: 192.168.200.130
    password: leadnews
    port: 6379

③. 工具类

新建 heima-leadnews-common/src/main/java/com/heima/common/redis/CacheService.java 文件

@Component
public class CacheService extends CachingConfigurerSupport {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    public StringRedisTemplate getstringRedisTemplate() {
        return this.stringRedisTemplate;
    }

    /** -------------------key相关操作--------------------- */

    /**
     * 删除key
     *
     * @param key
     */
    public void delete(String key) {
        stringRedisTemplate.delete(key);
    }

    /**
     * 批量删除key
     *
     * @param keys
     */
    public void delete(Collection<String> keys) {
        stringRedisTemplate.delete(keys);
    }

    /**
     * 序列化key
     *
     * @param key
     * @return
     */
    public byte[] dump(String key) {
        return stringRedisTemplate.dump(key);
    }

    /**
     * 是否存在key
     *
     * @param key
     * @return
     */
    public Boolean exists(String key) {
        return stringRedisTemplate.hasKey(key);
    }

    /**
     * 设置过期时间
     *
     * @param key
     * @param timeout
     * @param unit
     * @return
     */
    public Boolean expire(String key, long timeout, TimeUnit unit) {
        return stringRedisTemplate.expire(key, timeout, unit);
    }

    /**
     * 设置过期时间
     *
     * @param key
     * @param date
     * @return
     */
    public Boolean expireAt(String key, Date date) {
        return stringRedisTemplate.expireAt(key, date);
    }

    /**
     * 查找匹配的key
     *
     * @param pattern
     * @return
     */
    public Set<String> keys(String pattern) {
        return stringRedisTemplate.keys(pattern);
    }

    /**
     * 将当前数据库的 key 移动到给定的数据库 db 当中
     *
     * @param key
     * @param dbIndex
     * @return
     */
    public Boolean move(String key, int dbIndex) {
        return stringRedisTemplate.move(key, dbIndex);
    }

    /**
     * 移除 key 的过期时间key 将持久保持
     *
     * @param key
     * @return
     */
    public Boolean persist(String key) {
        return stringRedisTemplate.persist(key);
    }

    /**
     * 返回 key 的剩余的过期时间
     *
     * @param key
     * @param unit
     * @return
     */
    public Long getExpire(String key, TimeUnit unit) {
        return stringRedisTemplate.getExpire(key, unit);
    }

    /**
     * 返回 key 的剩余的过期时间
     *
     * @param key
     * @return
     */
    public Long getExpire(String key) {
        return stringRedisTemplate.getExpire(key);
    }

    /**
     * 从当前数据库中随机返回一个 key
     *
     * @return
     */
    public String randomKey() {
        return stringRedisTemplate.randomKey();
    }

    /**
     * 修改 key 的名称
     *
     * @param oldKey
     * @param newKey
     */
    public void rename(String oldKey, String newKey) {
        stringRedisTemplate.rename(oldKey, newKey);
    }

    /**
     * 仅当 newkey 不存在时将 oldKey 改名为 newkey
     *
     * @param oldKey
     * @param newKey
     * @return
     */
    public Boolean renameIfAbsent(String oldKey, String newKey) {
        return stringRedisTemplate.renameIfAbsent(oldKey, newKey);
    }

    /**
     * 返回 key 所储存的值的类型
     *
     * @param key
     * @return
     */
    public DataType type(String key) {
        return stringRedisTemplate.type(key);
    }

    /** -------------------string相关操作--------------------- */

    /**
     * 设置指定 key 的值
     * @param key
     * @param value
     */
    public void set(String key, String value) {
        stringRedisTemplate.opsForValue().set(key, value);
    }

    /**
     * 获取指定 key 的值
     * @param key
     * @return
     */
    public String get(String key) {
        return stringRedisTemplate.opsForValue().get(key);
    }

    /**
     * 返回 key 中字符串值的子字符
     * @param key
     * @param start
     * @param end
     * @return
     */
    public String getRange(String key, long start, long end) {
        return stringRedisTemplate.opsForValue().get(key, start, end);
    }

    /**
     * 将给定 key 的值设为 value 并返回 key 的旧值(old value)
     *
     * @param key
     * @param value
     * @return
     */
    public String getAndSet(String key, String value) {
        return stringRedisTemplate.opsForValue().getAndSet(key, value);
    }

    /**
     * 对 key 所储存的字符串值获取指定偏移量上的位(bit)
     *
     * @param key
     * @param offset
     * @return
     */
    public Boolean getBit(String key, long offset) {
        return stringRedisTemplate.opsForValue().getBit(key, offset);
    }

    /**
     * 批量获取
     *
     * @param keys
     * @return
     */
    public List<String> multiGet(Collection<String> keys) {
        return stringRedisTemplate.opsForValue().multiGet(keys);
    }

    /**
     * 设置ASCII码, 字符串'a'的ASCII码是97, 转为二进制是'01100001', 此方法是将二进制第offset位值变为value
     *
     * @param key
     * @param
     * @param value
     *            值,true为1, false为0
     * @return
     */
    public boolean setBit(String key, long offset, boolean value) {
        return stringRedisTemplate.opsForValue().setBit(key, offset, value);
    }

    /**
     * 将值 value 关联到 key 并将 key 的过期时间设为 timeout
     *
     * @param key
     * @param value
     * @param timeout
     *            过期时间
     * @param unit
     *            时间单位, 天:TimeUnit.DAYS 小时:TimeUnit.HOURS 分钟:TimeUnit.MINUTES
     *            秒:TimeUnit.SECONDS 毫秒:TimeUnit.MILLISECONDS
     */
    public void setEx(String key, String value, long timeout, TimeUnit unit) {
        stringRedisTemplate.opsForValue().set(key, value, timeout, unit);
    }

    /**
     * 只有在 key 不存在时设置 key 的值
     *
     * @param key
     * @param value
     * @return 之前已经存在返回false,不存在返回true
     */
    public boolean setIfAbsent(String key, String value) {
        return stringRedisTemplate.opsForValue().setIfAbsent(key, value);
    }

    /**
     * 用 value 参数覆写给定 key 所储存的字符串值从偏移量 offset 开始
     *
     * @param key
     * @param value
     * @param offset
     *            从指定位置开始覆写
     */
    public void setRange(String key, String value, long offset) {
        stringRedisTemplate.opsForValue().set(key, value, offset);
    }

    /**
     * 获取字符串的长度
     *
     * @param key
     * @return
     */
    public Long size(String key) {
        return stringRedisTemplate.opsForValue().size(key);
    }

    /**
     * 批量添加
     *
     * @param maps
     */
    public void multiSet(Map<String, String> maps) {
        stringRedisTemplate.opsForValue().multiSet(maps);
    }

    /**
     * 同时设置一个或多个 key-value 对当且仅当所有给定 key 都不存在
     *
     * @param maps
     * @return 之前已经存在返回false,不存在返回true
     */
    public boolean multiSetIfAbsent(Map<String, String> maps) {
        return stringRedisTemplate.opsForValue().multiSetIfAbsent(maps);
    }

    /**
     * 增加(自增长), 负数则为自减
     *
     * @param key
     * @param
     * @return
     */
    public Long incrBy(String key, long increment) {
        return stringRedisTemplate.opsForValue().increment(key, increment);
    }

    /**
     *
     * @param key
     * @param
     * @return
     */
    public Double incrByFloat(String key, double increment) {
        return stringRedisTemplate.opsForValue().increment(key, increment);
    }

    /**
     * 追加到末尾
     *
     * @param key
     * @param value
     * @return
     */
    public Integer append(String key, String value) {
        return stringRedisTemplate.opsForValue().append(key, value);
    }

    /** -------------------hash相关操作------------------------- */

    /**
     * 获取存储在哈希表中指定字段的值
     *
     * @param key
     * @param field
     * @return
     */
    public Object hGet(String key, String field) {
        return stringRedisTemplate.opsForHash().get(key, field);
    }

    /**
     * 获取所有给定字段的值
     *
     * @param key
     * @return
     */
    public Map<Object, Object> hGetAll(String key) {
        return stringRedisTemplate.opsForHash().entries(key);
    }

    /**
     * 获取所有给定字段的值
     *
     * @param key
     * @param fields
     * @return
     */
    public List<Object> hMultiGet(String key, Collection<Object> fields) {
        return stringRedisTemplate.opsForHash().multiGet(key, fields);
    }

    public void hPut(String key, String hashKey, String value) {
        stringRedisTemplate.opsForHash().put(key, hashKey, value);
    }

    public void hPutAll(String key, Map<String, String> maps) {
        stringRedisTemplate.opsForHash().putAll(key, maps);
    }

    /**
     * 仅当hashKey不存在时才设置
     *
     * @param key
     * @param hashKey
     * @param value
     * @return
     */
    public Boolean hPutIfAbsent(String key, String hashKey, String value) {
        return stringRedisTemplate.opsForHash().putIfAbsent(key, hashKey, value);
    }

    /**
     * 删除一个或多个哈希表字段
     *
     * @param key
     * @param fields
     * @return
     */
    public Long hDelete(String key, Object... fields) {
        return stringRedisTemplate.opsForHash().delete(key, fields);
    }

    /**
     * 查看哈希表 key 中指定的字段是否存在
     *
     * @param key
     * @param field
     * @return
     */
    public boolean hExists(String key, String field) {
        return stringRedisTemplate.opsForHash().hasKey(key, field);
    }

    /**
     * 为哈希表 key 中的指定字段的整数值加上增量 increment
     *
     * @param key
     * @param field
     * @param increment
     * @return
     */
    public Long hIncrBy(String key, Object field, long increment) {
        return stringRedisTemplate.opsForHash().increment(key, field, increment);
    }

    /**
     * 为哈希表 key 中的指定字段的整数值加上增量 increment
     *
     * @param key
     * @param field
     * @param delta
     * @return
     */
    public Double hIncrByFloat(String key, Object field, double delta) {
        return stringRedisTemplate.opsForHash().increment(key, field, delta);
    }

    /**
     * 获取所有哈希表中的字段
     *
     * @param key
     * @return
     */
    public Set<Object> hKeys(String key) {
        return stringRedisTemplate.opsForHash().keys(key);
    }

    /**
     * 获取哈希表中字段的数量
     *
     * @param key
     * @return
     */
    public Long hSize(String key) {
        return stringRedisTemplate.opsForHash().size(key);
    }

    /**
     * 获取哈希表中所有值
     *
     * @param key
     * @return
     */
    public List<Object> hValues(String key) {
        return stringRedisTemplate.opsForHash().values(key);
    }

    /**
     * 迭代哈希表中的键值对
     *
     * @param key
     * @param options
     * @return
     */
    public Cursor<Map.Entry<Object, Object>> hScan(String key, ScanOptions options) {
        return stringRedisTemplate.opsForHash().scan(key, options);
    }

    /** ------------------------list相关操作---------------------------- */

    /**
     * 通过索引获取列表中的元素
     *
     * @param key
     * @param index
     * @return
     */
    public String lIndex(String key, long index) {
        return stringRedisTemplate.opsForList().index(key, index);
    }

    /**
     * 获取列表指定范围内的元素
     *
     * @param key
     * @param start
     *            开始位置, 0是开始位置
     * @param end
     *            结束位置, -1返回所有
     * @return
     */
    public List<String> lRange(String key, long start, long end) {
        return stringRedisTemplate.opsForList().range(key, start, end);
    }

    /**
     * 存储在list头部
     *
     * @param key
     * @param value
     * @return
     */
    public Long lLeftPush(String key, String value) {
        return stringRedisTemplate.opsForList().leftPush(key, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lLeftPushAll(String key, String... value) {
        return stringRedisTemplate.opsForList().leftPushAll(key, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lLeftPushAll(String key, Collection<String> value) {
        return stringRedisTemplate.opsForList().leftPushAll(key, value);
    }

    /**
     * 当list存在的时候才加入
     *
     * @param key
     * @param value
     * @return
     */
    public Long lLeftPushIfPresent(String key, String value) {
        return stringRedisTemplate.opsForList().leftPushIfPresent(key, value);
    }

    /**
     * 如果pivot存在,再pivot前面添加
     *
     * @param key
     * @param pivot
     * @param value
     * @return
     */
    public Long lLeftPush(String key, String pivot, String value) {
        return stringRedisTemplate.opsForList().leftPush(key, pivot, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lRightPush(String key, String value) {
        return stringRedisTemplate.opsForList().rightPush(key, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lRightPushAll(String key, String... value) {
        return stringRedisTemplate.opsForList().rightPushAll(key, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lRightPushAll(String key, Collection<String> value) {
        return stringRedisTemplate.opsForList().rightPushAll(key, value);
    }

    /**
     * 为已存在的列表添加值
     *
     * @param key
     * @param value
     * @return
     */
    public Long lRightPushIfPresent(String key, String value) {
        return stringRedisTemplate.opsForList().rightPushIfPresent(key, value);
    }

    /**
     * 在pivot元素的右边添加值
     *
     * @param key
     * @param pivot
     * @param value
     * @return
     */
    public Long lRightPush(String key, String pivot, String value) {
        return stringRedisTemplate.opsForList().rightPush(key, pivot, value);
    }

    /**
     * 通过索引设置列表元素的值
     *
     * @param key
     * @param index
     *            位置
     * @param value
     */
    public void lSet(String key, long index, String value) {
        stringRedisTemplate.opsForList().set(key, index, value);
    }

    /**
     * 移出并获取列表的第一个元素
     *
     * @param key
     * @return 删除的元素
     */
    public String lLeftPop(String key) {
        return stringRedisTemplate.opsForList().leftPop(key);
    }

    /**
     * 移出并获取列表的第一个元素 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
     *
     * @param key
     * @param timeout
     *            等待时间
     * @param unit
     *            时间单位
     * @return
     */
    public String lBLeftPop(String key, long timeout, TimeUnit unit) {
        return stringRedisTemplate.opsForList().leftPop(key, timeout, unit);
    }

    /**
     * 移除并获取列表最后一个元素
     *
     * @param key
     * @return 删除的元素
     */
    public String lRightPop(String key) {
        return stringRedisTemplate.opsForList().rightPop(key);
    }

    /**
     * 移出并获取列表的最后一个元素 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
     *
     * @param key
     * @param timeout
     *            等待时间
     * @param unit
     *            时间单位
     * @return
     */
    public String lBRightPop(String key, long timeout, TimeUnit unit) {
        return stringRedisTemplate.opsForList().rightPop(key, timeout, unit);
    }

    /**
     * 移除列表的最后一个元素并将该元素添加到另一个列表并返回
     *
     * @param sourceKey
     * @param destinationKey
     * @return
     */
    public String lRightPopAndLeftPush(String sourceKey, String destinationKey) {
        return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
                destinationKey);
    }

    /**
     * 从列表中弹出一个值将弹出的元素插入到另外一个列表中并返回它 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
     *
     * @param sourceKey
     * @param destinationKey
     * @param timeout
     * @param unit
     * @return
     */
    public String lBRightPopAndLeftPush(String sourceKey, String destinationKey,
                                        long timeout, TimeUnit unit) {
        return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
                destinationKey, timeout, unit);
    }
    
    /**
     * 删除集合中值等于value得元素
     *
     * @param key
     * @param index
     *            index=0, 删除所有值等于value的元素; index>0, 从头部开始删除第一个值等于value的元素;
     *            index<0, 从尾部开始删除第一个值等于value的元素;
     * @param value
     * @return
     */
    public Long lRemove(String key, long index, String value) {
        return stringRedisTemplate.opsForList().remove(key, index, value);
    }

    /**
     * 裁剪list
     *
     * @param key
     * @param start
     * @param end
     */
    public void lTrim(String key, long start, long end) {
        stringRedisTemplate.opsForList().trim(key, start, end);
    }

    /**
     * 获取列表长度
     *
     * @param key
     * @return
     */
    public Long lLen(String key) {
        return stringRedisTemplate.opsForList().size(key);
    }


    /** --------------------set相关操作-------------------------- */

    /**
     * set添加元素
     *
     * @param key
     * @param values
     * @return
     */
    public Long sAdd(String key, String... values) {
        return stringRedisTemplate.opsForSet().add(key, values);
    }

    /**
     * set移除元素
     *
     * @param key
     * @param values
     * @return
     */
    public Long sRemove(String key, Object... values) {
        return stringRedisTemplate.opsForSet().remove(key, values);
    }

    /**
     * 移除并返回集合的一个随机元素
     *
     * @param key
     * @return
     */
    public String sPop(String key) {
        return stringRedisTemplate.opsForSet().pop(key);
    }

    /**
     * 将元素value从一个集合移到另一个集合
     *
     * @param key
     * @param value
     * @param destKey
     * @return
     */
    public Boolean sMove(String key, String value, String destKey) {
        return stringRedisTemplate.opsForSet().move(key, value, destKey);
    }

    /**
     * 获取集合的大小
     *
     * @param key
     * @return
     */
    public Long sSize(String key) {
        return stringRedisTemplate.opsForSet().size(key);
    }

    /**
     * 判断集合是否包含value
     *
     * @param key
     * @param value
     * @return
     */
    public Boolean sIsMember(String key, Object value) {
        return stringRedisTemplate.opsForSet().isMember(key, value);
    }

    /**
     * 获取两个集合的交集
     *
     * @param key
     * @param otherKey
     * @return
     */
    public Set<String> sIntersect(String key, String otherKey) {
        return stringRedisTemplate.opsForSet().intersect(key, otherKey);
    }

    /**
     * 获取key集合与多个集合的交集
     *
     * @param key
     * @param otherKeys
     * @return
     */
    public Set<String> sIntersect(String key, Collection<String> otherKeys) {
        return stringRedisTemplate.opsForSet().intersect(key, otherKeys);
    }

    /**
     * key集合与otherKey集合的交集存储到destKey集合中
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long sIntersectAndStore(String key, String otherKey, String destKey) {
        return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKey,
                destKey);
    }

    /**
     * key集合与多个集合的交集存储到destKey集合中
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long sIntersectAndStore(String key, Collection<String> otherKeys,
                                   String destKey) {
        return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKeys,
                destKey);
    }

    /**
     * 获取两个集合的并集
     *
     * @param key
     * @param otherKeys
     * @return
     */
    public Set<String> sUnion(String key, String otherKeys) {
        return stringRedisTemplate.opsForSet().union(key, otherKeys);
    }

    /**
     * 获取key集合与多个集合的并集
     *
     * @param key
     * @param otherKeys
     * @return
     */
    public Set<String> sUnion(String key, Collection<String> otherKeys) {
        return stringRedisTemplate.opsForSet().union(key, otherKeys);
    }

    /**
     * key集合与otherKey集合的并集存储到destKey中
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long sUnionAndStore(String key, String otherKey, String destKey) {
        return stringRedisTemplate.opsForSet().unionAndStore(key, otherKey, destKey);
    }

    /**
     * key集合与多个集合的并集存储到destKey中
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long sUnionAndStore(String key, Collection<String> otherKeys,
                               String destKey) {
        return stringRedisTemplate.opsForSet().unionAndStore(key, otherKeys, destKey);
    }

    /**
     * 获取两个集合的差集
     *
     * @param key
     * @param otherKey
     * @return
     */
    public Set<String> sDifference(String key, String otherKey) {
        return stringRedisTemplate.opsForSet().difference(key, otherKey);
    }

    /**
     * 获取key集合与多个集合的差集
     *
     * @param key
     * @param otherKeys
     * @return
     */
    public Set<String> sDifference(String key, Collection<String> otherKeys) {
        return stringRedisTemplate.opsForSet().difference(key, otherKeys);
    }

    /**
     * key集合与otherKey集合的差集存储到destKey中
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long sDifference(String key, String otherKey, String destKey) {
        return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKey,
                destKey);
    }

    /**
     * key集合与多个集合的差集存储到destKey中
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long sDifference(String key, Collection<String> otherKeys,
                            String destKey) {
        return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKeys,
                destKey);
    }

    /**
     * 获取集合所有元素
     *
     * @param key
     * @param
     * @param
     * @return
     */
    public Set<String> setMembers(String key) {
        return stringRedisTemplate.opsForSet().members(key);
    }

    /**
     * 随机获取集合中的一个元素
     *
     * @param key
     * @return
     */
    public String sRandomMember(String key) {
        return stringRedisTemplate.opsForSet().randomMember(key);
    }

    /**
     * 随机获取集合中count个元素
     *
     * @param key
     * @param count
     * @return
     */
    public List<String> sRandomMembers(String key, long count) {
        return stringRedisTemplate.opsForSet().randomMembers(key, count);
    }

    /**
     * 随机获取集合中count个元素并且去除重复的
     *
     * @param key
     * @param count
     * @return
     */
    public Set<String> sDistinctRandomMembers(String key, long count) {
        return stringRedisTemplate.opsForSet().distinctRandomMembers(key, count);
    }

    /**
     *
     * @param key
     * @param options
     * @return
     */
    public Cursor<String> sScan(String key, ScanOptions options) {
        return stringRedisTemplate.opsForSet().scan(key, options);
    }

    /**------------------zSet相关操作--------------------------------*/

    /**
     * 添加元素,有序集合是按照元素的score值由小到大排列
     *
     * @param key
     * @param value
     * @param score
     * @return
     */
    public Boolean zAdd(String key, String value, double score) {
        return stringRedisTemplate.opsForZSet().add(key, value, score);
    }

    /**
     *
     * @param key
     * @param values
     * @return
     */
    public Long zAdd(String key, Set<TypedTuple<String>> values) {
        return stringRedisTemplate.opsForZSet().add(key, values);
    }

    /**
     *
     * @param key
     * @param values
     * @return
     */
    public Long zRemove(String key, Object... values) {
        return stringRedisTemplate.opsForZSet().remove(key, values);
    }

    public Long zRemove(String key, Collection<String> values) {
        if(values!=null&&!values.isEmpty()){
            Object[] objs = values.toArray(new Object[values.size()]);
            return stringRedisTemplate.opsForZSet().remove(key, objs);
        }
       return 0L;
    }

    /**
     * 增加元素的score值并返回增加后的值
     *
     * @param key
     * @param value
     * @param delta
     * @return
     */
    public Double zIncrementScore(String key, String value, double delta) {
        return stringRedisTemplate.opsForZSet().incrementScore(key, value, delta);
    }

    /**
     * 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列
     *
     * @param key
     * @param value
     * @return 0表示第一位
     */
    public Long zRank(String key, Object value) {
        return stringRedisTemplate.opsForZSet().rank(key, value);
    }

    /**
     * 返回元素在集合的排名,按元素的score值由大到小排列
     *
     * @param key
     * @param value
     * @return
     */
    public Long zReverseRank(String key, Object value) {
        return stringRedisTemplate.opsForZSet().reverseRank(key, value);
    }

    /**
     * 获取集合的元素, 从小到大排序
     *
     * @param key
     * @param start
     *            开始位置
     * @param end
     *            结束位置, -1查询所有
     * @return
     */
    public Set<String> zRange(String key, long start, long end) {
        return stringRedisTemplate.opsForZSet().range(key, start, end);
    }
    
    /**
     * 获取zset集合的所有元素, 从小到大排序
     *
     */
    public Set<String> zRangeAll(String key) {
        return zRange(key,0,-1);
    }

    /**
     * 获取集合元素, 并且把score值也获取
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    public Set<TypedTuple<String>> zRangeWithScores(String key, long start,
                                                    long end) {
        return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end);
    }

    /**
     * 根据Score值查询集合元素
     *
     * @param key
     * @param min
     *            最小值
     * @param max
     *            最大值
     * @return
     */
    public Set<String> zRangeByScore(String key, double min, double max) {
        return stringRedisTemplate.opsForZSet().rangeByScore(key, min, max);
    }


    /**
     * 根据Score值查询集合元素, 从小到大排序
     *
     * @param key
     * @param min
     *            最小值
     * @param max
     *            最大值
     * @return
     */
    public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,
                                                           double min, double max) {
        return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max);
    }

    /**
     *
     * @param key
     * @param min
     * @param max
     * @param start
     * @param end
     * @return
     */
    public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,
                                                           double min, double max, long start, long end) {
        return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max,
                start, end);
    }

    /**
     * 获取集合的元素, 从大到小排序
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    public Set<String> zReverseRange(String key, long start, long end) {
        return stringRedisTemplate.opsForZSet().reverseRange(key, start, end);

    }

    public Set<String> zReverseRangeByScore(String key, long min, long max) {
        return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max);

    }

    /**
     * 获取集合的元素, 从大到小排序, 并返回score值
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    public Set<TypedTuple<String>> zReverseRangeWithScores(String key,
                                                           long start, long end) {
        return stringRedisTemplate.opsForZSet().reverseRangeWithScores(key, start,
                end);
    }

    /**
     * 根据Score值查询集合元素, 从大到小排序
     *
     * @param key
     * @param min
     * @param max
     * @return
     */
    public Set<String> zReverseRangeByScore(String key, double min,
                                            double max) {
        return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max);
    }

    /**
     * 根据Score值查询集合元素, 从大到小排序
     *
     * @param key
     * @param min
     * @param max
     * @return
     */
    public Set<TypedTuple<String>> zReverseRangeByScoreWithScores(
            String key, double min, double max) {
        return stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key,
                min, max);
    }

    /**
     *
     * @param key
     * @param min
     * @param max
     * @param start
     * @param end
     * @return
     */
    public Set<String> zReverseRangeByScore(String key, double min,
                                            double max, long start, long end) {
        return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max,
                start, end);
    }

    /**
     * 根据score值获取集合元素数量
     *
     * @param key
     * @param min
     * @param max
     * @return
     */
    public Long zCount(String key, double min, double max) {
        return stringRedisTemplate.opsForZSet().count(key, min, max);
    }

    /**
     * 获取集合大小
     *
     * @param key
     * @return
     */
    public Long zSize(String key) {
        return stringRedisTemplate.opsForZSet().size(key);
    }

    /**
     * 获取集合大小
     *
     * @param key
     * @return
     */
    public Long zZCard(String key) {
        return stringRedisTemplate.opsForZSet().zCard(key);
    }

    /**
     * 获取集合中value元素的score值
     *
     * @param key
     * @param value
     * @return
     */
    public Double zScore(String key, Object value) {
        return stringRedisTemplate.opsForZSet().score(key, value);
    }

    /**
     * 移除指定索引位置的成员
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    public Long zRemoveRange(String key, long start, long end) {
        return stringRedisTemplate.opsForZSet().removeRange(key, start, end);
    }

    /**
     * 根据指定的score值的范围来移除成员
     *
     * @param key
     * @param min
     * @param max
     * @return
     */
    public Long zRemoveRangeByScore(String key, double min, double max) {
        return stringRedisTemplate.opsForZSet().removeRangeByScore(key, min, max);
    }

    /**
     * 获取key和otherKey的并集并存储在destKey中
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long zUnionAndStore(String key, String otherKey, String destKey) {
        return stringRedisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey);
    }

    /**
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long zUnionAndStore(String key, Collection<String> otherKeys,
                               String destKey) {
        return stringRedisTemplate.opsForZSet()
                .unionAndStore(key, otherKeys, destKey);
    }

    /**
     * 交集
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long zIntersectAndStore(String key, String otherKey,
                                   String destKey) {
        return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKey,
                destKey);
    }

    /**
     * 交集
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long zIntersectAndStore(String key, Collection<String> otherKeys,
                                   String destKey) {
        return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKeys,
                destKey);
    }

    /**
     *
     * @param key
     * @param options
     * @return
     */
    public Cursor<TypedTuple<String>> zScan(String key, ScanOptions options) {
        return stringRedisTemplate.opsForZSet().scan(key, options);
    }

    /**
     * 扫描主键建议使用
     * @param patten
     * @return
     */
    public Set<String> scan(String patten){
        Set<String> keys = stringRedisTemplate.execute((RedisCallback<Set<String>>) connection -> {
            Set<String> result = new HashSet<>();
            try (Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder()
                    .match(patten).count(10000).build())) {
                while (cursor.hasNext()) {
                    result.add(new String(cursor.next()));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return result;
        });
        return  keys;
    }
    
    /**
     * 管道技术提高性能
     * @param type
     * @param values
     * @return
     */
    public List<Object> lRightPushPipeline(String type,Collection<String> values){
        List<Object> results = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
                        StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
                        //集合转换数组
                        String[] strings = values.toArray(new String[values.size()]);
                        //直接批量发送
                        stringRedisConn.rPush(type, strings);
                        return null;
                    }
                });
        return results;
    }

    public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){

        List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
            @Nullable
            @Override
            public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
                StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;
                String[] strings = values.toArray(new String[values.size()]);
                stringRedisConnection.rPush(topic_key,strings);
                stringRedisConnection.zRem(future_key,strings);
                return null;
            }
        });
        return objects;
    }
}

④. 自动配置

新建 heima-leadnews-common/src/main/resources/META-INF/spring.factories 文件

  com.heima.common.redis.CacheService

⑷. 测试

①. list

新建 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/RedisTest.java 文件

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {

    @Autowired
    private CacheService cacheService;

    @Test
    public void testList() {
        // list左侧添加元素
        // cacheService.lLeftPush("list_001", "hello, redis");

        // 在右边获取元素并删除
        String list_001 = cacheService.lRightPop("list_001");
        System.out.println(list_001);
    }
}

在这里插入图片描述
在这里插入图片描述

②. Zset

编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/RedisTest.java 文件

    @Test
    public void testZset() {
        // 添加数据到zset中 分值
        /*cacheService.zAdd("zset_key_001", "hello zset 001", 8888);
        cacheService.zAdd("zset_key_001", "hello zset 002", 6666);
        cacheService.zAdd("zset_key_001", "hello zset 003", 2222);
        cacheService.zAdd("zset_key_001", "hello zset 004", 99999);*/

        // 按照分值获取数据
        Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);
        System.out.println(zset_key_001);
    }

在这里插入图片描述
在这里插入图片描述


7. 服务实现

⑴. Mapper

①. taskinfoMapper

新建 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/mapper/TaskinfoMapper.java 文件

/**
 * <p>
 *  Mapper 接口
 * </p>
 *
 * @author itheima
 */
@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {

    public List<Taskinfo> queryFutureTime(@Param("taskType")int type, @Param("priority")int priority, @Param("future")Date future);
}

②. xml

新建 heima-leadnews-service/heima-leadnews-schedule/src/main/resources/mapper/TaskinfoMapper.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.heima.schedule.mapper.TaskinfoMapper">

    <select id="queryFutureTime" resultType="com.heima.model.schedule.pojos.Taskinfo">
        select *
        from taskinfo
        where task_type = #{taskType}
          and priority = #{priority}
          and execute_time <![CDATA[<]]> #{future,javaType=java.util.Date}
    </select>
</mapper>

③. taskinfoLogsMapper

新建 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/mapper/TaskinfoLogsMapper.java 文件

/**
 * <p>
 *  Mapper 接口
 * </p>
 *
 * @author itheima
 */
@Mapper
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {

}

⑵. Dto

新建 heima-leadnews-model/src/main/java/com/heima/model/schedule/dtos/Task.java 文件

@Data
public class Task implements Serializable {

    /**
     * 任务id
     */
    private Long taskId;
    /**
     * 类型
     */
    private Integer taskType;

    /**
     * 优先级
     */
    private Integer priority;

    /**
     * 执行id
     */
    private long executeTime;

    /**
     * task参数
     */
    private byte[] parameters;
    
}

⑶. 常量

新建 heima-leadnews-common/src/main/java/com/heima/common/constants/ScheduleConstants.java 文件

public class ScheduleConstants {

    //task状态
    public static final int SCHEDULED=0;   //初始化状态

    public static final int EXECUTED=1;       //已执行状态

    public static final int CANCELLED=2;   //已取消状态

    public static String FUTURE="future_";   //未来数据key前缀

    public static String TOPIC="topic_";     //当前数据key前缀
}

⑷. Service

新建 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/TaskService.java 文件

public interface TaskService {

    /**
     * 添加延时任务
     * @param task
     * @return
     */
    public Long addTask(Task task);
}

⑸. ServiceImpl

新建 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/impl/TaskServiceImpl.java 文件

@Service
@Slf4j
@Transactional
public class TaskServiceImpl implements TaskService {

    /**
     * 添加延迟任务
     * @param task
     * @return
     */
    @Override
    public long addTask(Task task) {
        //1.添加任务到数据库中
        boolean success = addTaskToDb(task);

        if(success) {
            //2.添加任务到redis
            addTaskToCache(task);
        }

        return task.getTaskId();
    }

    @Autowired
    private CacheService cacheService;

    /**
     * 添加任务到redis
     * @param task
     */
    private void addTaskToCache(Task task) {

        // 设置redis的 key
        String key = task.getTaskType() +"_" +task.getPriority();

        // 获取五分钟之后的时间 毫秒值
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE, 5);
        long nextScheduleTime = calendar.getTimeInMillis();

        // 2.1 如果任务的执行时间小于等于当前时间存入list
        if(task.getExecuteTime() <= System.currentTimeMillis()) {
            cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
        } else if(task.getExecuteTime() <= nextScheduleTime){
            // 2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间未来5分钟 存入zset中
            cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
        }

    }

    @Autowired
    private TaskinfoMapper taskinfoMapper;

    @Autowired
    private TaskinfoLogsMapper taskinfoLogsMapper;

    /**
     * 添加任务到数据库中
     * @param task
     * @return
     */
    private boolean addTaskToDb(Task task) {

        boolean flag = false;

        try {

            // 1. 保存任务表
            Taskinfo taskinfo = new Taskinfo();
            BeanUtils.copyProperties(task, taskinfo);
            taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
            taskinfoMapper.insert(taskinfo);

            // 设置taskId
            task.setTaskId(taskinfo.getTaskId());

            // 2. 保存任务日志数据
            TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
            BeanUtils.copyProperties(taskinfo, taskinfoLogs);
            taskinfoLogs.setVersion(1);
            taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
            taskinfoLogsMapper.insert(taskinfoLogs);

            flag = true;

        } catch (Exception e) {
            flag = false;
            e.printStackTrace();
        }

        return flag;
    }
}

⑹. 测试

①. 当下时间

新建 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java 文件

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class TaskServiceImplTest {

    @Autowired
    private TaskService taskService;

    @Test
    public void addTask() {

        Task task = new Task();
        task.setTaskType(100);
        task.setPriority(50);
        task.setParameters("task test".getBytes());
        // 当下时间
        task.setExecuteTime(new Date().getTime());

        // 未来时间
        // task.setExecuteTime(new Date().getTime() + 500);

        // 未来时间(超过五分钟)
        // task.setExecuteTime(new Date().getTime() + 500000);

        long taskId = taskService.addTask(task);
        System.out.println(taskId);
    }
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


②. 未来时间

编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java 文件

        // 当下时间
        // task.setExecuteTime(new Date().getTime());

        // 未来时间
        task.setExecuteTime(new Date().getTime() + 500);

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


③. 未来时间(超过五分钟)

编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java 文件

        // 未来时间
        // task.setExecuteTime(new Date().getTime() + 500);

        // 未来时间(超过五分钟)
        task.setExecuteTime(new Date().getTime() + 500000);

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述




三、延迟对列服务

在这里插入图片描述

1. 取消任务

⑴. 需求分析

场景第三接口网络不通使用延迟任务进行重试当达到阈值以后取消任务

在这里插入图片描述

  • 根据taskid删除任务,修改任务日志状态为
  • (取消) 删除redis中对应的任务数据包括list和zset

⑵. Service

编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/TaskService.java 文件

    /**
     * 取消任务
     * @param taskId
     * @return
     */
    public boolean cancelTask(Long taskId);

⑶. ServiceImpl

编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/impl/TaskServiceImpl.java 文件

    /**
     * 添加延迟任务
     * @param task
     * @return
     */
    @Override
    public long addTask(Task task) {
        //1.添加任务到数据库中
        boolean success = addTaskToDb(task);

        if(success) {
            //2.添加任务到redis
            addTaskToCache(task);
        }

        return task.getTaskId();
    }

    /**
     * 取消任务
     * @param taskId
     * @return
     */
    @Override
    public boolean cancelTask(long taskId) {

        boolean flag = false;

        // 1. 删除任务, 更新任务日志
        Task task = updateDb(taskId, ScheduleConstants.CANCELLED);

        // 2. 删除 redis 数据
        if(task != null) {
            flag = true;
            removeTaskFromCache(task);
        }

        return flag;
    }

⑷. 测试类

1613872161971040257 任务的发布时间 小于当前时间并且 taskinfo表taskinfoLogs表Redis 均存在数据

编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java 文件

    @Test
    public void cancelTask() {
        taskService.cancelTask(1613872161971040257L);
    }

在这里插入图片描述
状态、版本已修改并且 taskinfo表Redis 中的数据已删除


2. 消费任务/拉取任务

⑴. 需求分析

在这里插入图片描述

⑵. Service

编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/TaskService.java 文件

    /**
     * 按照类型和优先级来拉取任务
     * @param type 类型
     * @param priority 优先级
     * @return
     */
    public Task taskPool(int type, int priority);

⑶. ServiceImpl

编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/impl/TaskServiceImpl.java 文件

    /**
     * 删除任务, 更新任务日志
     * @param taskId
     * @param status
     * @return
     */
    private Task updateDb(long taskId, int status) {

        Task task = null;

        try {
            // 删除任务
            taskinfoMapper.deleteById(taskId);

            // 更新任务日志
            TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
            taskinfoLogs.setStatus(status);
            taskinfoLogsMapper.updateById(taskinfoLogs);

            task = new Task();
            BeanUtils.copyProperties(taskinfoLogs, task);
            task.setExecuteTime(new Date().getTime());
        } catch (Exception e) {
            log.error("task cancel exception taskId={}", taskId);
        }

        return task;
    }

⑷. 测试类

①. 初始数据

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

②. 测试类

编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java 文件

    @Test
    public void taskPool() {
        Task task = taskService.taskPool(100, 50);
        System.out.println(task);
    }

③. Result

在这里插入图片描述
在这里插入图片描述
状态、版本已修改并且 taskinfo表Redis 中的数据已删除

3. 定时刷新

在这里插入图片描述

⑴. reids key值匹配

①. keys 模糊匹配

keys的模糊匹配功能很方便也很强大但是在生产环境需要慎用开发中使用keys的模糊匹配却发现redis的CPU使用率极高所以公司的redis生产环境将keys命令禁用了redis是单线程会被堵塞
在这里插入图片描述


②. scan

SCAN 命令是一个基于游标的迭代器SCAN命令每次被调用之后 都会向用户返回一个新的游标 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数 以此来延续之前的迭代过程。
在这里插入图片描述

③. 测试类

编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java 文件

    @Test
    public  void testPiple1(){
        long start =System.currentTimeMillis();
        for (int i = 0; i <10000 ; i++) {
            Task task = new Task();
            task.setTaskType(1001);
            task.setPriority(1);
            task.setExecuteTime(new Date().getTime());
            cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
        }
        System.out.println("耗时"+(System.currentTimeMillis()- start));
        // -> 耗时36104
    }


    @Test
    public void testPiple2(){
        long start  = System.currentTimeMillis();
        //使用管道技术
        List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
            @Nullable
            @Override
            public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
                for (int i = 0; i <10000 ; i++) {
                    Task task = new Task();
                    task.setTaskType(1001);
                    task.setPriority(1);
                    task.setExecuteTime(new Date().getTime());
                    redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
                }
                return null;
            }
        });
        System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
        // -> 使用管道技术执行10000次自增操作共耗时:1383毫秒
    }

⑵. ServiceImpl

编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/impl/TaskServiceImpl.java 文件

    /**
     * 未来数据定时刷新
     */
    @Scheduled(cron = "0 */1 * * * ?")
    public void refreshTask() {

        System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");

        // 1. 获取所有未来数据集合的key值
        Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); // future_*

        for (String futureKey : futureKeys) {

            // 2. 获取当前数据的key topic
            String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];

            // 3. 获取该组key下当前需要消费的任务数据
            Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());

            // 4. 将这些任务数据添加到消费者队列中
            if (!tasks.isEmpty()) {
               cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
               System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
            }
        }
    }

⑶. 启动类

编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/ScheduleApplication.java 文件

@EnableScheduling

⑷. 测试类

①. 初始数据

编辑 heima-leadnews-service/heima-leadnews-schedule/src/test/java/com/heima/schedule/service/impl/TaskServiceImplTest.java 文件

    @Test
    public void addTaskSet() {
        for (int i = 0; i < 5; i++) {
            Task task = new Task();
            task.setTaskType(100 + i);
            task.setPriority(50);
            task.setParameters("task test".getBytes());
            task.setExecuteTime(new Date().getTime() + 500 + i);
            long taskId = taskService.addTask(task);
        }
    }

在这里插入图片描述


②. 定时刷新

启动 ScheduleApplication 服务
在这里插入图片描述
在这里插入图片描述


4. redis分布式锁

⑴. 需求分析

分布式锁的解决方案

方案说明
数据库基于表的唯一索引
zookeeper根据zookeeper中的临时有序节点排序
redis使用SETNX命令完成

sexnx SET if Not eXists 命令在指定的 key 不存在时为 key 设置指定的值。

在这里插入图片描述
这种加锁的思路是如果 key 不存在则为 key 设置 value如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值如果设置成功就表示加锁成功
  • 客户端B也去请求服务器设置key的值如果返回失败那么就代表加锁失败
  • 客户端A执行代码完成删除锁
  • 客户端B在等待一段时间后再去请求设置key的值设置成功
  • 客户端B执行代码完成删除锁

⑵. 存在的问题

如果存在 两个服务 的时候服务 启动后 两个服务都会去执行 refresh定时任务 方法

①. 服务配置

编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/resources/bootstrap.yml 文件

server:
  port: ${server.port:51701}

②. 创建服务

在这里插入图片描述
在这里插入图片描述

③. 启动服务

在这里插入图片描述
在这里插入图片描述


⑶. 工具类

编辑 heima-leadnews-common/src/main/java/com/heima/common/redis/CacheService.java 文件

    /**
     * 加锁
     *
     * @param name
     * @param expire
     * @return
     */
    public String tryLock(String name, long expire) {
        name = name + "_lock";
        String token = UUID.randomUUID().toString();
        RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
        RedisConnection conn = factory.getConnection();
        try {

            //参考redis命令
            //set key value [EX seconds] [PX milliseconds] [NX|XX]
            Boolean result = conn.set(
                    name.getBytes(),
                    token.getBytes(),
                    Expiration.from(expire, TimeUnit.MILLISECONDS),
                    RedisStringCommands.SetOption.SET_IF_ABSENT //NX
            );
            if (result != null && result)
                return token;
        } finally {
            RedisConnectionUtils.releaseConnection(conn, factory,false);
        }
        return null;
    }

⑷. 定时刷新加锁

编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/impl/TaskServiceImpl.java 文件

    /**
     * 未来数据定时刷新
     */
    @Scheduled(cron = "0 */1 * * * ?")
    public void refreshTask() {

        // redis 分布式锁
        String token = cacheService.tryLock("FUTURE_TASK_ASYNC", 1000 * 30);
        if(StringUtils.isNotBlank(token)) {

⑸. 测试

启动 5170151702 两个服务后只有一个服务会执行 定时刷新
在这里插入图片描述
在这里插入图片描述


5. 数据库同步到redis

⑴. 需求分析

在这里插入图片描述


⑵. 初始数据

执行 TaskServiceImplTestaddTaskSet 方法
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


⑶. 同步至redis

编辑 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/service/impl/TaskServiceImpl.java 文件

    /**
     * 同步未来数据至redis
     */
    @Scheduled(cron = "0 */5 * * * ?")
    @PostConstruct // 初始执行
    public void reloadData() {
        // 1. 清空缓存数据
        clearCache();

        // 2. 查询符合条件的任务 五分钟以内的数据
        // 2.1 获取五分钟之后的时间 毫秒值
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE, 5);
        // 2.2 查询条件
        List<Taskinfo> taskinfos = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));

        // 3. 把任务添加至redis
        if(taskinfos != null && taskinfos.size() > 0) {
            for (Taskinfo taskinfo : taskinfos) {
                Task task = new Task();
                BeanUtils.copyProperties(taskinfo, task);
                task.setExecuteTime(taskinfo.getExecuteTime().getTime());
                addTaskToCache(task);
            }
        }

        System.out.println("数据库的任务同步到了redis");
    }

    /**
     * 清理缓存中的数据
     */
    public void clearCache() {
        Set<String> topicScan = cacheService.scan(ScheduleConstants.TOPIC + "*");
        Set<String> futureScan = cacheService.scan(ScheduleConstants.FUTURE + "*");
        cacheService.delete(topicScan);
        cacheService.delete(futureScan);
    }

⑷. 测试

删除redis中的部分数据启动 Schedule 服务
在这里插入图片描述
在这里插入图片描述


6. 延迟对列接口定义

⑴. 提供对外接口

新建 heima-leadnews-feign-api/src/main/java/com/heima/apis/schedule/IScheduleClient.java 文件

@FeignClient("leadnews-schedule")
public interface IScheduleClient {
    /**
     * 添加延时任务
     * @param task
     * @return
     */
    @PostMapping("app/v1/task/add")
    public ResponseResult addTask(@RequestBody Task task);

    /**
     * 取消任务
     * @param taskId
     * @return
     */
    @GetMapping("app/v1/task/cancel/{taskId}")
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId);

    /**
     * 按照类型和优先级来拉取任务
     * @param type 类型
     * @param priority 优先级
     * @return
     */
    @GetMapping("app/v1/task/pool/{type}/{priority}")
    public ResponseResult taskPool(@PathVariable("type") int type, @PathVariable("priority") int priority);
}

⑵. 微服务实现接口

新建 heima-leadnews-service/heima-leadnews-schedule/src/main/java/com/heima/schedule/feign/ScheduleClient.java 文件

@RestController
public class ScheduleClient implements IScheduleClient {

    @Autowired
    private TaskService taskService;

    /**
     * 添加延时任务
     * @param task
     * @return
     */
    @Override
    @PostMapping("app/v1/task/add")
    public ResponseResult addTask(@RequestBody Task task) {
        return ResponseResult.okResult(taskService.addTask(task));
    }

    /**
     * 取消任务
     * @param taskId
     * @return
     */
    @Override
    @GetMapping("app/v1/cancel/{taskId}")
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
        return ResponseResult.okResult(taskService.cancelTask(taskId));
    }

    /**
     * 按照类型和优先级来拉取任务
     * @param type 类型
     * @param priority 优先级
     * @return
     */
    @Override
    @GetMapping("app/v1/task/pool/{type}/{priority}")
    public ResponseResult taskPool(@PathVariable("type") int type, @PathVariable("priority") int priority) {
        return ResponseResult.okResult(taskService.taskPool(type, priority));
    }
}

7. 添加延迟任务

在这里插入图片描述

⑴. 序列化

①. 序列化工具对比

  • JdkSerialize java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化 ObjectOutputStream的writeObject()方法可序列化对象生成字节数组
  • Protostuff google开源的protostuff采用更为紧凑的二进制数组表现更加优异然后使用protostuff的编译工具生成pojo类

②. pom依赖

编辑 heima-leadnews-utils/pom.xml

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>1.6.0</version>
        </dependency>

③. JdkSerialize

编辑 heima-leadnews-utils/pom.xml

/**
 * jdk序列化
 */
public class JdkSerializeUtil {

    /**
     * 序列化
     * @param obj
     * @param <T>
     * @return
     */
    public static <T> byte[] serialize(T obj) {

        if (obj  == null){
            throw new NullPointerException();
        }

        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try {
            ObjectOutputStream oos = new ObjectOutputStream(bos);

            oos.writeObject(obj);
            return bos.toByteArray();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        return new byte[0];
    }

    /**
     * 反序列化
     * @param data
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T deserialize(byte[] data, Class<T> clazz) {
        ByteArrayInputStream bis = new ByteArrayInputStream(data);

        try {
            ObjectInputStream ois = new ObjectInputStream(bis);
            T obj = (T)ois.readObject();
            return obj;
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        return  null;
    }
}

④. Protostuff

编辑 heima-leadnews-utils/pom.xml

public class ProtostuffUtil {

    /**
     * 序列化
     * @param t
     * @param <T>
     * @return
     */
    public static <T> byte[] serialize(T t){
        Schema schema = RuntimeSchema.getSchema(t.getClass());
        return ProtostuffIOUtil.toByteArray(t,schema,
                LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
    }

    /**
     * 反序列化
     * @param bytes
     * @param c
     * @param <T>
     * @return
     */
    public static <T> T deserialize(byte []bytes,Class<T> c) {
        T t = null;
        try {
            t = c.newInstance();
            Schema schema = RuntimeSchema.getSchema(t.getClass());
             ProtostuffIOUtil.mergeFrom(bytes,t,schema);
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        return t;
    }

    /**
     * jdk序列化与protostuff序列化对比
     * @param args
     */
    public static void main(String[] args) {
        long start =System.currentTimeMillis();
        for (int i = 0; i <1000000 ; i++) {
            WmNews wmNews =new WmNews();
            JdkSerializeUtil.serialize(wmNews);
        }
        System.out.println(" jdk 花费 "+(System.currentTimeMillis()-start));
        // -> jdk 花费 2017

        start =System.currentTimeMillis();
        for (int i = 0; i <1000000 ; i++) {
            WmNews wmNews =new WmNews();
            ProtostuffUtil.serialize(wmNews);
        }
        System.out.println(" protostuff 花费 "+(System.currentTimeMillis()-start));
        // -> protostuff 花费 292
    }
}

⑤. 测试

执行 ProtostuffUtilmain 方法

jdk 花费 2017
protostuff 花费 292

# protostuff 性能更强

⑵. 枚举

新建 heima-leadnews-model/src/main/java/com/heima/model/common/enums/TaskTypeEnum.java 文件

@Getter
@AllArgsConstructor
public enum TaskTypeEnum {

    NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
    REMOTEERROR(1002, 2,"第三方接口调用失败重试");
    private final int taskType; //对应具体业务
    private final int priority; //业务不同级别
    private final String desc; //描述信息
}

⑶. Service

新建 heima-leadnews-service/heima-leadnews-wemedia/src/main/java/com/heima/wemedia/service/WmNewsTaskService.java 文件

public interface WmNewsTaskService {

    /**
     * 添加任务到延迟对列中
     * @param id 文章id
     * @param publishTime 发布的时间  可以做为任务的执行时间
     */
    public void addNewsToTask(Integer id, Date publishTime);
}

⑷. ServiceImpl

新建 heima-leadnews-service/heima-leadnews-wemedia/src/main/java/com/heima/wemedia/service/impl/WmNewsTaskServiceImpl.java 文件

@Service
@Slf4j
public class WmNewsTaskServiceImpl implements WmNewsTaskService {

    @Autowired
    private IScheduleClient scheduleClient;

    /**
     * 添加任务到延迟对列中
     * @param id 文章id
     * @param publishTime 发布的时间  可以做为任务的执行时间
     */
    @Override
    @Async
    public void addNewsToTask(Integer id, Date publishTime) {

        log.info("添加任务到延迟服务中----begin");

        Task task = new Task();

        task.setExecuteTime(publishTime.getTime());
        task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
        task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());

        WmNews wmNews = new WmNews();
        wmNews.setId(id);
        task.setParameters(ProtostuffUtil.serialize(wmNews));

        scheduleClient.addTask(task);

        log.info("添加任务到延迟服务中----end");
    }
}

⑸. 调用延迟任务

编辑 heima-leadnews-service/heima-leadnews-wemedia/src/main/java/com/heima/wemedia/service/impl/WmNewsServiceImpl.java 文件

    @Autowired
    private WmNewsTaskService wmNewsTaskService;

    /**
     * 发布/修改文章或发布为草稿
     * @param dto
     * @return
     */
    @Override
    public ResponseResult submitNews(WmNewsDto dto) {

        // 1. 参数校验
        if(dto == null || dto.getContent() == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
        }

        // 2. 保存或修改文章

        WmNews wmNews = new WmNews();
        // 属性拷贝 属性名词和类型相同才能拷贝
        BeanUtils.copyProperties(dto, wmNews);
        // 封面图片  list---> string
        if(dto.getImages() != null && dto.getImages().size() > 0) {
            String imageStr = StringUtils.join(dto.getImages(), ",");
            wmNews.setImages(imageStr);
        }
        // 如果当前封面为自动类型 -1
        if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)) {
            wmNews.setType(null);
        }
        saveOrUpdateWmNews(wmNews);

        // 3. 判断是否为草稿, 如果是草稿, 结束当前方法
        if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())) {
            return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
        }

        // 4. 保存文章内容图片和素材的关系
        // 提取文章内容中的图片信息
        List<String> materials = ectractUrlInfo(dto.getContent());

        saveRelativeInfoForContent(materials, wmNews.getId());

        // 5. 保存文章封面图片和素材的关系, 如果当然布局是自动, 需要匹配封面图片
        saveRelativeInfoForCover(dto, wmNews, materials);

        // 审核文章
        // wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
        wmNewsTaskService.addNewsToTask(wmNews.getId(), wmNews.getPublishTime());

        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }

⑹. 测试

启动 WemediaApplicationWemediaGatewayAplicationScheduleApplication、 Nginx添加文章查看库表及redis

8. 消费任务、审核文章

在这里插入图片描述

⑴. Service

编辑 heima-leadnews-service/heima-leadnews-wemedia/src/main/java/com/heima/wemedia/service/WmNewsTaskService.java 文件

    /**
     * 消费任务, 审核文章
     */
    public void scanNewsByTask();

⑵. ServiceImpl

编辑 heima-leadnews-service/heima-leadnews-wemedia/src/main/java/com/heima/wemedia/service/impl/WmNewsTaskServiceImpl.java 文件

    @Autowired
    private WmNewsAutoScanService wmNewsAutoScanService;

    /**
     * 消费任务, 审核文章
     */
    @Scheduled(fixedRate = 1000)
    @Override
    public void scanNewsByTask() {

        log.info("消费任务, 审核文章");

        ResponseResult responseResult = scheduleClient.taskPool(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
        if(responseResult.getCode().equals(200) && responseResult.getData() != null) {
            Task task = JSON.parseObject(JSON.toJSONString(responseResult.getData()), Task.class);
            WmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(), WmNews.class);
            wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
        }
    }

⑶. 引导类

编辑 heima-leadnews-service/heima-leadnews-wemedia/src/main/java/com/heima/wemedia/WemediaApplication.java 文件

@EnableScheduling // 开启调度任务

⑷. 测试

启动 WemediaApplicationWemediaGatewayAplicationScheduleApplicationArticleApplication、 Nginx添加文章文章审核状态自动审核

添加未来时间 查看文章审核状态是否未来时间自动审核



阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Spring