Rubin's Blog

  • 首页
  • 关于作者
  • 隐私政策
享受恬静与美好~~~
分享生活的点点滴滴~~~
  1. 首页
  2. 并发编程
  3. 正文

java并发编程之ForkJoinPool

2022年 1月 2日 505点热度 0人点赞 0条评论

ForkJoinPool就是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool看作一个单机版的Map/Reduce,多个线程并行计算。

相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。

假设有5个任务,在ThreadPoolExecutor中有5个线程并行执行,其中一个任务的计算量很大,其余4个任务的计算量很小,这会导致1个线程很忙,其他4个线程则处于空闲状态。

利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而实现任务计算的负载均衡。

快排

快排有2个步骤:

  1. 利用数组的第1个元素把数组划分成两半,左边数组里面的元素小于或等于该元素,右边数组里面的元素比该元素大
  2. 对左右的两个子数组分别排序

左右两个子数组相互独立可以并行计算。利用ForkJoinPool,代码如下:

package com.rubin.concurrent.forkjoinpool;

import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class ForkJoinPoolDemo2 {

    static class SortTask extends RecursiveAction {

        final long[] array;
        final int lo;
        final int hi;

        public SortTask(long[] array) {
            this.array = array;
            this.lo = 0;
            this.hi = array.length - 1;
        }

        public SortTask(long[] array, int lo, int hi) {
            this.array = array;
            this.lo = lo;
            this.hi = hi;
        }

        private int partition(long[] array, int lo, int hi) {
            long x = array[hi];
            int i = lo - 1;
            for (int j = lo; j < hi; j++) {
                if (array[j] <= x) {
                    i++;
                    swap(array, i, j);
                }
            }
            swap(array, i + 1, hi);
            return i + 1;
        }

        private void swap(long[] array, int i, int j) {
            if (i != j) {
                long temp = array[i];
                array[i] = array[j];
                array[j] = temp;
            }
        }

        @Override
        protected void compute() {
            if (lo < hi) {
                // 找到分区的元素下标
                int pivot = partition(array, lo, hi);
                // 将数组分为两部分
                SortTask left = new SortTask(array, lo, pivot - 1);
                SortTask right = new SortTask(array, pivot + 1, hi);

                left.fork();
                right.fork();
                left.join();
                right.join();
            }
        }

    }

    public static void main(String[] args) throws InterruptedException {
        long[] array = {5, 3, 7, 9, 2, 4, 1, 8, 10};
        // 一个任务
        ForkJoinTask sort = new SortTask(array);
        // 一个pool
        ForkJoinPool pool = new ForkJoinPool();
        // ForkJoinPool开启多个线程,同时执行上面的子任务
        pool.submit(sort);
        // 结束ForkJoinPool
        pool.shutdown();
        // 等待结束Pool
        pool.awaitTermination(10, TimeUnit.SECONDS);
        System.out.println(Arrays.toString(array));
    }

}

求1到n个数的和

package com.rubin.concurrent.forkjoinpool;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ForkJoinPoolDemo1 {

    static class SumTask extends RecursiveTask<Long> {

        private static final int THRESHOLD = 10;
        private long start;
        private long end;

        public SumTask(long n) {
            this(1, n);
        }

        public SumTask(long start, long end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            long sum = 0;
            // 如果计算的范围在threshold之内,则直接进行计算
            if ((end - start) <= THRESHOLD) {
                for (long l = start; l <= end; l++) {
                    sum += l;
                }
            } else {
                // 否则找出起始和结束的中间值,分割任务
                long mid = (start + end) >>> 1;
                SumTask left = new SumTask(start, mid);
                SumTask right = new SumTask(mid + 1, end);
                left.fork();
                right.fork();
                // 收集子任务计算结果
                sum = left.join() + right.join();
            }
            return sum;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        SumTask sum = new SumTask(10);
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> future = pool.submit(sum);
        Long aLong = future.get();
        System.out.println(aLong);
        pool.shutdown();
    }

}

上面的代码用到了RecursiveAction和RecursiveTask两个类,它们都继承自抽象类ForkJoinTask,用到了其中关键的接口 fork()、join()。二者的区别是一个有返回值,一个没有返回值。

RecursiveAction/RecursiveTask类继承关系:

我们可以使用现有的接口灵活的完成工作中的各种场景的解决方案。至于该框架的实现原理比线程池要复杂,但是思想都一样,都是多线程并发的执行任务,关闭逻辑也一致,上述示例中也有优雅关闭的写法。

如果后续有精力的话,会详细补充该工具的使用逻辑。

本文的内容就到这里了。欢迎小伙伴们积极留言交流~~~

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: 并发编程
最后更新:2022年 6月 9日

RubinChu

一个快乐的小逗比~~~

打赏 点赞
< 上一篇

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复
文章目录
  • 快排
  • 求1到n个数的和
最新 热点 随机
最新 热点 随机
问题记录之Chrome设置屏蔽Https禁止调用Http行为 问题记录之Mac设置软链接 问题记录之JDK8连接MySQL数据库失败 面试系列之自我介绍 面试总结 算法思维
Tomcat之系统架构 数据结构与算法概述 SpringMVC应用 Spring AOP 应用 SpringCloud Alibaba之Nacos 数据结构之散列表

COPYRIGHT © 2021 rubinchu.com. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

京ICP备19039146号-1