java基于quasar实现协程池【后篇】

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

java基于quasar实现协程池【前篇】java基于quasar实现协程池_爪哇盘古的博客-CSDN博客

         在上一个文章中讲述了通过仿照java自写线程池的方式改写成quasar协程池功能可以说实现了效果但是遇到了一个烦恼就是在协程阻塞过程中会疯狂报警告如果您的项目有日志文件产生当遇到一个非常耗时的任务时后面的任务阻塞产生警告那么该日志文件的体量是致命的所以为了摆脱这个问题不要尝试、不要猜。要看文档【我对英文很不友好的】

 quasar纤程文档FiberExecutorScheduler (Quasar 0.8.0)

 

在该文档中我发现了FiberExecutorScheduler类这个类将是本文阐述quasar协程池的正确打开方式【全是泪】

分析经历我一直对Quasar及其轻质纤维替代Threads感到好奇。那么quasar本身是否有自己的纤程池呢于是看是翻阅文档找吧挨个看吧

线程池ThreadPoolExecutor类的实现。

int maxThreadPoolSize = 10;

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        maxThreadPoolSize,
        maxThreadPoolSize,
        10, TimeUnit.MINUTES,
        new ArrayBlockingQueue<Runnable>(maxThreadPoolSize),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.CallerRunsPolicy()
);

for (int i = 0; i < 100; i++) {
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // run some code
        }
    });
}

上面的代码创建了一个具有10个线程的池一个在池前的队列(该队列可以容纳10个元素)和一个拒绝策略(当队列已满时)以使主线程自己执行Runnable任务。当for循环创建100个可运行对象时它们将在池中一次执行10个排队等待10个并且主线程自己拾取一个Runnable直到其他对象完成然后主线程返回将Runnables添加到执行程序。

每个光纤调度器调度的光纤当您创建不带调度器的光纤时将创建一个FiberForkJoinScheduler并将其分配给该光纤。

简而言之如果要管理线程池中的光纤请使用FiberExecutorScheduler
Quasar关于调度光纤的文档


您的代码可能像这样

        //线程池任务数量
        int maxThreadPoolSize = 10;
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                maxThreadPoolSize,
                maxThreadPoolSize,
                10, TimeUnit.MINUTES,
                new ArrayBlockingQueue<Runnable>(maxThreadPoolSize),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        //重点通过FiberExecutorScheduler类将线程池传为协程池
        FiberExecutorScheduler scheduler = new FiberExecutorScheduler("FibersInAPool", executor);
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            Fiber fiber = new Fiber<>(scheduler
                    , new SuspendableRunnable() {
                @Override
                public void run() throws SuspendExecution, InterruptedException {
                    // run some code
                    System.out.println(finalI);
                    Thread.sleep(1000);
                }
            });
            fiber.start();
        }

这个操作看着可能会很奇怪一个纤程池干嘛要利用线程池来加载使用这可能与java底层本身不支持协程有关吧【猜】不过通过这种形式也有好处就是可以直接通过开始的线程池的一些功能。方便了使用习惯无非就是创建个线程池通过FiberExecutorScheduler来改为协程池进行协程操作这样对于springboot的bean也会更友好的实现了吧毕竟是通过线程池创建bean

以下是实现效果

 

 不会出现阻塞警告了

 

 光纤非常便宜因此您根本不需要池(及其异步作业调度模型)只需启动光纤并在每次需要新的顺序进程与其他进程同时运行时让其运行常规顺序代码即可。

当然它也支持类似go的管道【看文档】可以自行开发你的业务逻辑。

Channel (Quasar 0.8.0)

 

package testgrp;
 
import java.util.concurrent.ExecutionException;
 
import co.paralleluniverse.strands.SuspendableCallable;
import co.paralleluniverse.strands.SuspendableRunnable;
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.IntChannel;
 
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
 
/**
 * Increasing-Echo Quasar Example
 *
 * @author circlespainter
 */
public class QuasarIncreasingEchoApp {
    static public Integer doAll() throws ExecutionException, InterruptedException {
        final IntChannel increasingToEcho = Channels.newIntChannel(0); // Synchronizing channel (buffer = 0)
        final IntChannel echoToIncreasing = Channels.newIntChannel(0); // Synchronizing channel (buffer = 0)
 
        Fiber<Integer> increasing = new Fiber<>("INCREASER", new SuspendableCallable<Integer>() { @Override public Integer run() throws SuspendExecution, InterruptedException {
            // The following is enough to test instrumentation of synchronizing methods
            // synchronized(new Object()) {}
 
            int curr = 0;
            for (int i = 0; i < 10 ; i++) {
                Fiber.sleep(10);
                System.out.println("INCREASER sending: " + curr);
                increasingToEcho.send(curr);
                curr = echoToIncreasing.receive();
                System.out.println("INCREASER received: " + curr);
                curr++;
                System.out.println("INCREASER now: " + curr);
            }
            System.out.println("INCREASER closing channel and exiting");
            increasingToEcho.close();
            return curr;
        } }).start();
 
        Fiber<Void> echo = new Fiber<Void>("ECHO", new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException {
            Integer curr;
            while (true) {
                Fiber.sleep(1000);
                curr = increasingToEcho.receive();
                System.out.println("ECHO received: " + curr);
 
                if (curr != null) {
                    System.out.println("ECHO sending: " + curr);
                    echoToIncreasing.send(curr);
                } else {
                    System.out.println("ECHO detected closed channel, closing and exiting");
                    echoToIncreasing.close();
                    return;
                }
            }
        } }).start();
 
        try {
            increasing.join();
            echo.join();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        return increasing.get();
    }
 
    static public void main(String[] args) throws ExecutionException, InterruptedException {
        doAll();
    }
}

我们大java真是无所不能通过这次实践也是让我倍感骄傲以后谁也别说什么java不支持协程了~

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Java