溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java并行處理的實(shí)現(xiàn)方法

發(fā)布時(shí)間:2021-07-14 13:39:52 來(lái)源:億速云 閱讀:491 作者:chen 欄目:開(kāi)發(fā)技術(shù)

本篇內(nèi)容介紹了“Java并行處理的實(shí)現(xiàn)方法”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

目錄
  • 1. 背景

  • 2.知識(shí)

  • 3. Java 中的并行處理

  • 4. 擴(kuò)展

    • 線程池方式實(shí)現(xiàn)并行處理

    • 使用 fork/join框架

  • 5.參考:

    1. 背景

    本文是一個(gè)短文章,介紹Java 中的并行處理。
    說(shuō)明:10多分鐘讀完的文章我稱之為短文章,適合快速閱讀。

    2.知識(shí)

    并行計(jì)算(parallel computing)一般是指許多指令得以同時(shí)進(jìn)行的計(jì)算模式。在同時(shí)進(jìn)行的前提下,可以將計(jì)算的過(guò)程分解成小部分,之后以并發(fā)方式來(lái)加以解決。

    也就是分解為幾個(gè)過(guò)程:

    1、將一個(gè)大任務(wù)拆分成多個(gè)子任務(wù),子任務(wù)還可以繼續(xù)拆分。
    2、各個(gè)子任務(wù)同時(shí)進(jìn)行運(yùn)算執(zhí)行。
    3、在執(zhí)行完畢后,可能會(huì)有個(gè) " 歸納 " 的任務(wù),比如 求和,求平均等。

    再簡(jiǎn)化一點(diǎn)的理解就是: 先拆分  -->  在同時(shí)進(jìn)行計(jì)算  --> 最后“歸納”
    為什么要“并行”,優(yōu)點(diǎn)呢?

    1、為了獲得 “節(jié)省時(shí)間”,“快”。適合用于大規(guī)模運(yùn)算的場(chǎng)景。從理論上講,在 n 個(gè)并行處理的執(zhí)行速度可能會(huì)是在單一處理機(jī)上執(zhí)行的速度的 n 倍。
    2、以前的計(jì)算機(jī)是單核的,現(xiàn)代的計(jì)算機(jī)Cpu都是多核的,服務(wù)器甚至都是多Cpu的,并行計(jì)算可以充分利用硬件的性能。

    3. Java 中的并行處理

    JDK 8 新增的Stream API(java.util.stream)將生成環(huán)境的函數(shù)式編程引入了Java庫(kù)中,可以方便開(kāi)發(fā)者能夠?qū)懗龈佑行?、更加?jiǎn)潔的代碼。

    steam 的另一個(gè)價(jià)值是創(chuàng)造性地支持并行處理(parallel processing)。示例:

    final Collection< Task > tasks = Arrays.asList(
        new Task( Status.OPEN, 5 ),
        new Task( Status.OPEN, 13 ),
        new Task( Status.CLOSED, 8 ) 
    );
    
    // 并行執(zhí)行多個(gè)任務(wù),并 求和
    final double totalPoints = tasks
       .stream()
       .parallel()
       .map( task -> task.getPoints() ) // or map( Task::getPoints ) 
       .reduce( 0, Integer::sum );
     
    System.out.println( "Total points (all tasks): " + totalPoints );

    對(duì)于上面的tasks集合,上面的代碼計(jì)算所有任務(wù)的點(diǎn)數(shù)之和。
    它使用 parallel 方法并行處理所有的task,并使用 reduce 方法計(jì)算最終的結(jié)果。

    4. 擴(kuò)展

    線程池方式實(shí)現(xiàn)并行處理

    jdk1.5引入了并發(fā)包,其中包括了ThreadPoolExecutor,相關(guān)代碼如下:

    public class ExecutorServiceTest {
     
        public static final int THRESHOLD = 10_000;
        public static long[] numbers;
     
        public static void main(String[] args) throws Exception {
            numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
            ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
            CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executor);
            int taskSize = (int) (numbers.length / THRESHOLD);
            for (int i = 1; i <= taskSize; i++) {
                final int key = i;
                completionService.submit(new Callable<Long>() {
     
                    @Override
                    public Long call() throws Exception {
                        return sum((key - 1) * THRESHOLD, key * THRESHOLD);
                    }
                });
            }
            long sumValue = 0;
            for (int i = 0; i < taskSize; i++) {
                sumValue += completionService.take().get();
            }
            // 所有任務(wù)已經(jīng)完成,關(guān)閉線程池
            System.out.println("sumValue = " + sumValue);
            executor.shutdown();
        }
     
        private static long sum(int start, int end) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += numbers[i];
            }
            return sum;
        }
    }

    使用 fork/join框架

    分支/合并框架的目的是以遞歸的方式將可以并行的認(rèn)為拆分成更小的任務(wù),然后將每個(gè)子任務(wù)的結(jié)果合并起來(lái)生成整體結(jié)果;相關(guān)代碼如下:

    public class ForkJoinTest extends java.util.concurrent.RecursiveTask<Long> {
        
        private static final long serialVersionUID = 1L;
        private final long[] numbers;
        private final int start;
        private final int end;
        public static final long THRESHOLD = 10_000;
     
        public ForkJoinTest(long[] numbers) {
            this(numbers, 0, numbers.length);
        }
     
        private ForkJoinTest(long[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }
     
        @Override
        protected Long compute() {
            int length = end - start;
            if (length <= THRESHOLD) {
                return computeSequentially();
            }
            ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);
            leftTask.fork();
            ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);
            Long rightResult = rightTask.compute();
            // 注:join方法會(huì)阻塞,因此有必要在兩個(gè)子任務(wù)的計(jì)算都開(kāi)始之后才執(zhí)行join方法
            Long leftResult = leftTask.join();
            return leftResult + rightResult;
        }
     
        private long computeSequentially() {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += numbers[i];
            }
            return sum;
        }
     
        public static void main(String[] args) {
            System.out.println(forkJoinSum(10_000_000));
        }
     
        public static long forkJoinSum(long n) {
            long[] numbers = LongStream.rangeClosed(1, n).toArray();
            ForkJoinTask<Long> task = new ForkJoinTest(numbers);
            return new ForkJoinPool().invoke(task);
        }
    }

    上面的代碼實(shí)現(xiàn)了 遞歸方式拆分子任務(wù),并放入到線程池中執(zhí)行。

    5.參考:

    https://zh.wikipedia.org/wiki/%E5%B9%B6%E8%A1%8C%E8%AE%A1%E7%AE%97

    “Java并行處理的實(shí)現(xiàn)方法”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

    向AI問(wèn)一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI