Java高并发编程实战,异步注解@Async自定义线程池

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

一、@Async注解

@Async的作用就是异步处理任务。

在方法上添加@Async表示此方法是异步方法
在类上添加@Async表示类中的所有方法都是异步方法
使用此注解的类必须是Spring管理的类
需要在启动类或配置类中加入@EnableAsync注解@Async才会生效
在使用@Async时如果不指定线程池的名称也就是不自定义线程池@Async是有默认线程池的使用的是Spring默认的线程池SimpleAsyncTaskExecutor。

默认线程池的默认配置如下

默认核心线程数8
最大线程数Integet.MAX_VALUE
队列使用LinkedBlockingQueue
容量是Integet.MAX_VALUE
空闲线程保留时间60s
线程池拒绝策略AbortPolicy
从最大线程数可以看出在并发情况下会无限制的创建线程我勒个吗啊。

也可以通过yml重新配置

spring:
  task:
    execution:
      pool:
        max-size: 10
        core-size: 5
        keep-alive: 3s
        queue-capacity: 1000
        thread-name-prefix: my-executor

也可以自定义线程池下面通过简单的代码来实现以下@Async自定义线程池。

二、代码实例

Spring为任务调度与异步方法执行提供了注解@Async支持通过在方法上标注@Async注解可使得方法被异步调用。在需要异步执行的方法上加入@Async注解并指定使用的线程池当然可以不指定直接写@Async。

1、导入POM

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.0.1-jre</version>
</dependency>

2、配置类

package com.nezhac.config;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;

@EnableAsync// 支持异步操作
@Configuration
public class AsyncTaskConfig {

    /**
     * com.google.guava中的线程池
     * @return
     */
    @Bean("my-executor")
    public Executor firstExecutor() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-executor").build();
        // 获取CPU的处理器数量
        int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,
                200, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(), threadFactory);
        threadPool.allowsCoreThreadTimeOut();
        return threadPool;
    }

    /**
     * Spring线程池
     * @return
     */
    @Bean("async-executor")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // 核心线程数
        taskExecutor.setCorePoolSize(10);
        // 线程池维护线程的最大数量只有在缓冲队列满了之后才会申请超过核心线程数的线程
        taskExecutor.setMaxPoolSize(100);
        // 缓存队列
        taskExecutor.setQueueCapacity(50);
        // 空闲时间当超过了核心线程数之外的线程在空闲时间到达之后会被销毁
        taskExecutor.setKeepAliveSeconds(200);
        // 异步方法内部线程名称
        taskExecutor.setThreadNamePrefix("async-executor-");

        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy也是丢弃任务但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy丢弃队列最前面的任务然后重新尝试执行任务重复此过程
         * ThreadPoolExecutor.CallerRunsPolicy重试添加当前的任务自动重复调用 execute() 方法直到成功
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
}

3、controller

package com.nezha.controller;

import com.nezha.service.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class UserController {

    private static final Logger logger = LoggerFactory.getLogger(UserController.class);

    @Autowired
    private UserService userService;

    @GetMapping("asyncTest")
    public void asyncTest() {
        logger.info("1");
        userService.asyncTest();
        asyncTest2();
        logger.info("1");
    }

    @Async("my-executor")
    public void asyncTest2() {
        logger.info("同文件内执行执行异步任务");
    }
}

4、service

package com.nezha.service;

public interface UserService {

    // 普通方法
    void test();

    // 异步方法
    void asyncTest();
}

service实现类

package com.nezha.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class UserServiceImpl implements UserService {

    private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);

    @Override
    public void test() {
        logger.info("执行普通任务");
    }

    @Async("my-executor")
    @Override
    public void asyncTest() {
        logger.info("执行异步任务");
    }
}

三、发现同文件内执行异步任务还是一个线程没有实现@Async效果why

众里寻他千百度查到了@Async失效的几个原因

注解@Async的方法不是public方法
注解@Async的返回值只能为void或Future
注解@Async方法使用static修饰也会失效
没加@EnableAsync注解
调用方和@Async不能在一个类中
在Async方法上标注@Transactional是没用的但在Async方法调用的方法上标注@Transcational是有效的
这里就不一一演示了有兴趣的小伙伴可以研究一下。

四、配置中分别使用了ThreadPoolTaskExecutor和ThreadPoolExecutor这两个有啥区别

ThreadPoolTaskExecutor是spring core包中的而ThreadPoolExecutor是JDK中的JUC。ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装。

1、initialize()

查看一下ThreadPoolTaskExecutor 的 initialize()方法

public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
        implements BeanNameAware, InitializingBean, DisposableBean {
    ...

    /**
     * Set up the ExecutorService.
     */
    public void initialize() {
        if (logger.isInfoEnabled()) {
            logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
        }
        if (!this.threadNamePrefixSet && this.beanName != null) {
            setThreadNamePrefix(this.beanName + "-");
        }
        this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
    }
    
    /**
     * Create the target {@link java.util.concurrent.ExecutorService} instance.
     * Called by {@code afterPropertiesSet}.
     * @param threadFactory the ThreadFactory to use
     * @param rejectedExecutionHandler the RejectedExecutionHandler to use
     * @return a new ExecutorService instance
     * @see #afterPropertiesSet()
     */
    protected abstract ExecutorService initializeExecutor(
            ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);

    ...
}

2、initializeExecutor抽象方法

再查看一下initializeExecutor抽象方法的具体实现类其中有一个就是ThreadPoolTaskExecutor类查看它的initializeExecutor方法使用的就是ThreadPoolExecutor。

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
        implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
        
    ...
    
    @Override
    protected ExecutorService initializeExecutor(
            ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

        BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

        ThreadPoolExecutor executor;
        if (this.taskDecorator != null) {
            executor = new ThreadPoolExecutor(
                    this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                    queue, threadFactory, rejectedExecutionHandler) {
                @Override
                public void execute(Runnable command) {
                    Runnable decorated = taskDecorator.decorate(command);
                    if (decorated != command) {
                        decoratedTaskMap.put(decorated, command);
                    }
                    super.execute(decorated);
                }
            };
        }
        else {
            executor = new ThreadPoolExecutor(
                    this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                    queue, threadFactory, rejectedExecutionHandler);

        }

        if (this.allowCoreThreadTimeOut) {
            executor.allowCoreThreadTimeOut(true);
        }

        this.threadPoolExecutor = executor;
        return executor;
    }
    
    ...

}

因此可以了解到ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装。

五、核心线程数

配置文件中的线程池核心线程数为何配置为

// 获取CPU的处理器数量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;

Runtime.getRuntime().availableProcessors()获取的是CPU核心线程数也就是计算资源。

CPU密集型线程池大小设置为N也就是和cpu的线程数相同可以尽可能地避免线程间上下文切换但在实际开发中一般会设置为N+1为了防止意外情况出现线程阻塞如果出现阻塞多出来的线程会继续执行任务保证CPU的利用效率。
IO密集型线程池大小设置为2N这个数是根据业务压测出来的如果不涉及业务就使用推荐。
在实际中需要对具体的线程池大小进行调整可以通过压测及机器设备现状进行调整大小。
如果线程池太大则会造成CPU不断的切换对整个系统性能也不会有太大的提升反而会导致系统缓慢。

六、线程池执行过程

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Java