溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java 并行數(shù)據(jù)處理和性能分析

發(fā)布時間:2020-10-22 19:11:12 來源:腳本之家 閱讀:178 作者:此心光明-超然 欄目:開發(fā)技術(shù)

并行流

并行流是一個把元素分成多個塊的流,每個塊用不同的線程處理??梢宰詣臃謪^(qū),讓所有的處理器都忙起來。

假設(shè)要寫一個方法,接受一個數(shù)量n做參數(shù),計算1-n的和??梢赃@樣實現(xiàn):

  public long sequentialSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
        .limit(n)
        .reduce(0L, Long::sum);
  }

也許可以使用parallel方法,簡單地使用并行計算,提高程序性能:

  public long sequentialSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
        .limit(n)
        .parallel()
        .reduce(0L, Long::sum);
  }

這樣,流可能在內(nèi)部被分成多個塊,導(dǎo)致reduction操作可以在不同的塊上互不依賴地并行地各自工作。最后,reduction操作組合每個子流的并行reductions的返回值,返回的結(jié)果就是整個流的結(jié)果。見下面的示意圖

Java 并行數(shù)據(jù)處理和性能分析

實際上,調(diào)用parallel方法,流自身不會有任何變化。在內(nèi)部,設(shè)置一個布爾類型的標(biāo)記,標(biāo)明你想在并行模式執(zhí)行操作,接下來的操作都是并行的。

類似地,你也可以使用sequential方法,把并行流轉(zhuǎn)成串行的。你也許認(rèn)為可以組合這兩個方法:

    stream.parallel()
      .filter(...)
      .sequential()
      .map(...)
      .parallel()
      .reduce();

但是,最后一次調(diào)用parallel或者sequential才會全局地影響管道。上面的例子,管道將被并行地執(zhí)行。

配置并行流使用的線程池

并行流內(nèi)部使用ForkJoinPool。默認(rèn)地,線程數(shù)量等于處理器數(shù)量(Runtime.getRuntime().availableProcessors())。但是,可以修改系統(tǒng)屬性java.util.concurrent.ForkJoinPool.common.parallelism,配置線程數(shù)量。

這是全局配置,所以,除非你認(rèn)為對性能有幫助,否則不要修改。

測量流的性能

我們聲稱并行加法應(yīng)該比串行的或者自己的迭代方法快。我們可以使用JMH測量一下。這是一個工具,使用基于注解的方法,可以為JVM程序增加

可靠的microbenchmarks。如果使用maven,可以這樣引入:

    <dependency>
      <groupId>org.openjdk.jmh</groupId>
      <artifactId>jmh-core</artifactId>
      <version>1.21</version>
    </dependency>
    <dependency>
      <groupId>org.openjdk.jmh</groupId>
      <artifactId>jmh-generator-annprocess</artifactId>
      <version>1.21</version>
    </dependency>

第一個庫是核心實現(xiàn),第二個包含一個注解處理器,幫助生成JAR文件,通過它可以方便地運行你的benchmark。maven配置里還應(yīng)該有下面的plugin:

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <finalName>benchmarks</finalName>
              <transformers>
                <transformer
                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>org.openjdk.jmh.Main</mainClass>
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>

程序代碼如下

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;

import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

//測量平均時間
@BenchmarkMode(Mode.AverageTime)
//以毫秒為單位,打印benchmark結(jié)果
@OutputTimeUnit(TimeUnit.MILLISECONDS)
//執(zhí)行兩次,增加可靠性。堆空間是4Gb
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"})
@State(Scope.Benchmark)
public class ParallelStreamBenchmark {
  private static final long N = 10_000_000L;

  @Benchmark
  public long sequentialSum() {
    return Stream.iterate(1L, i -> i + 1).limit(N)
        .reduce(0L, Long::sum);
  }
  
  //每次執(zhí)行benchmark后,執(zhí)行GC
  @TearDown(Level.Invocation)
  public void tearDown() {
    System.gc();
  }
}

使用大內(nèi)存,和每次迭代以后試著GC都是為了盡量減少GC的影響。盡管如此,結(jié)果應(yīng)該再加一些鹽。很多因素會影響執(zhí)行時間,比如你的機器有多少核。

默認(rèn)地,JMH一般先執(zhí)行5次熱身迭代,這樣可以讓HotSpot優(yōu)化代碼,然后再執(zhí)行5次迭代用來計算最終的結(jié)果。你可以使用-w和-i命令行參數(shù)修改這些配置。

在我的機器上,使用JDK 1.8.0_121, Java HotSpot™ 64-Bit Server VM,執(zhí)行結(jié)果是

Benchmark Mode Cnt Score Error Units

ParallelStreamBenchmark.sequentialSum avgt 10 83.565 ± 1.841 ms/op

你應(yīng)該期望,使用經(jīng)典的for循環(huán)的迭代版本運行得更快,因為它在更低層(level)工作,而且,更重要的是,它不需要執(zhí)行原始類型的裝箱和拆箱操作。我們測試一下這個方法:

  @Benchmark
  public long iterativeSum() {
    long result = 0;
    for (long i = 1L; i <= N; i++) {
      result += i;
    }
    return result;
  }

執(zhí)行結(jié)果是

Benchmark Mode Cnt Score Error Units

ParallelStreamBenchmark.iterativeSum avgt 10 6.877 ± 0.068 ms/op

證實了我們的期望:迭代版本比串行流快了10倍。讓我們使用并行流試一試:

Benchmark Mode Cnt Score Error Units

ParallelStreamBenchmark.parallelSum avgt 10 110.157 ± 1.882 ms/op

非常令人失望:并行版本的求和一點都沒有發(fā)揮多核的優(yōu)勢,比串行版還要慢。為什么會這樣?有兩個問題混在一起:

迭代生成了裝箱對象,它們在做加法前,必須拆箱成數(shù)字

迭代很難劃分獨立的塊來并行地執(zhí)行

第二點是特別有趣的,不是所有的流都是適合并行處理的。特別是,迭代的流就很難,這是因為,函數(shù)的輸入依賴上一個函數(shù)的結(jié)果。見下圖:

Java 并行數(shù)據(jù)處理和性能分析

這意味著,reduction過程并沒有像第一張圖里所表示的那樣執(zhí)行。reduction開始的時候,還沒有整個數(shù)字列表,所以沒法分塊。把流標(biāo)記為并行的,反而增加了在不同線程上執(zhí)行的求和要被串行處理的負(fù)擔(dān)。

使用更專業(yè)的方法

LongStream.rangeClosed方法使用的是原始long類型,所以不用裝箱和拆箱。而且,它生產(chǎn)的數(shù)的范圍,可以很容易地分成不依賴的塊。比如,范圍1-20可以被分成1-5、6-10、11-15和16-20。

  @Benchmark
  public long rangedSum() {
    return LongStream.rangeClosed(1, N)
        .reduce(0L, Long::sum);
  }

輸出是

Benchmark Mode Cnt Score Error Units

ParallelStreamBenchmark.rangedSum avgt 10 7.660 ± 1.643 ms/op

可以看出來,比并行流快了很多,僅比經(jīng)典的for循環(huán)慢了一點。LongStream支持并行:

  @Benchmark
  public long parallelRangedSum() {
    return LongStream.rangeClosed(1, N)
        .parallel()
        .reduce(0L, Long::sum);
  }

輸出是

Benchmark Mode Cnt Score Error Units

ParallelStreamBenchmark.parallelRangedSum avgt 10 4.790 ± 5.142 ms/op

可以發(fā)現(xiàn),并行生效了。甚至比for循環(huán)還快了1/3。

正確使用并行流

濫用并行流產(chǎn)生錯誤的主要原因是使用了改變共享狀態(tài)的算法。下面是一個通過改變共享的累加器來實現(xiàn)前n個自然數(shù)求和的例子:

  public long sideEffectSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).forEach(accumulator::add);
    return accumulator.total;
  }
  
  public class Accumulator {
    public long total = 0;
    public void add(long value) { 
      total += value; 
    }
  }

這種代碼很常見,特別對熟悉命令式編程范式的開發(fā)者而言。當(dāng)你迭代數(shù)字列表時,經(jīng)常這樣做:初始化一個累加器,遍歷元素,使用累加器相加。

這代碼有什么錯?它是串行的,失去了并行性。讓我們試著使用并行流:

  public long sideEffectParallelSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
    return accumulator.total;
  }

多執(zhí)行幾次,你會發(fā)現(xiàn),每次返回的結(jié)果都不一樣,而且都不是正確的50000005000000。這是因為多線程累加的時候,total += value并不是原子操作。那么怎樣才能寫出并行情況下,正確的代碼呢?

如果有懷疑,就做測試

注意裝箱問題。Java提供的原始類型流(IntStream、LongStream和DoubleStream)可以避免類似的問題,盡量使用他們

有些操作使用并行流性能更差。尤其是像limit和findFirst這種依賴元素順序的操作,使用并行是非常昂貴的。比如,findAny就比findFirst性能好,因為它跟順序無關(guān)。調(diào)用unordered方法,可以把一個有順序的流變成無順序的流。比如,如果你需要流的N個元素,而你對前M個感興趣,在一個無順序的流上調(diào)用limit比有順序的高效

如果數(shù)據(jù)量不大,不要選擇并行流

要考慮流的底層數(shù)據(jù)結(jié)構(gòu)的可分解程度。比如,ArrayList比LinkedList分解起來更高效,因為不遍歷就可以分割。使用range工廠增加的原始類型流也很容易分割??梢酝ㄟ^實現(xiàn)自己的Spliterator分割流

流的特征,以及中間操作如何修改流的元素,會改變分解過程的性能。比如,一個SIZED流可以被分解成兩個相等的部分,并且每個部分可以高效得并行處理,但是,filter會過濾掉任何不滿足條件的元素,導(dǎo)致流的size成了未知的

考慮結(jié)束操作是廉價的還是昂貴的merge步驟(比如,Collector的combiner方法)。如果是昂貴的,組合并行結(jié)果的代價會比并行流帶來的好處還要高

下面的表格,總結(jié)一些流在可分解性方面的并行友好性

可分解性
ArrayList 優(yōu)秀
LinkedList
IntStream.range 優(yōu)秀
Stream.iterate
HashSet
TreeSet

fork/join框架

fork/join框架用來遞歸地把可并行的任務(wù)分解成小任務(wù),然后組合每個子任務(wù)的結(jié)果,以生成總的結(jié)果。它實現(xiàn)了ExecutorService接口,這樣所有的子任務(wù)都在一個線程池(ForkJoinPool)內(nèi)工作。

RecursiveTask

要向ForkJoinPool提交任務(wù),你不得不增加RecursiveTask的子類-R是并行任務(wù)(以及每個子任務(wù))的返回類型,或者

增加RecursiveAction的子類-當(dāng)沒有返回值的時候。要定義RecursiveTask,需要實現(xiàn)它唯一的抽象方法:

protected abstract R compute();

該方法定義分割任務(wù)和不能繼續(xù)被分割時處理一個子任務(wù)的算法的邏輯。該方法的實現(xiàn),經(jīng)常像下面的偽代碼:

if (任務(wù)足夠小,不再被分) {
  順序執(zhí)行任務(wù)
} else {
  把任務(wù)分成兩個子任務(wù)
  遞歸地調(diào)用本方法,盡量分割每個子任務(wù)
  等待所有子任務(wù)的完成
  組合每個子任務(wù)的結(jié)果
}

可以發(fā)現(xiàn),這是分治算法的并行實現(xiàn)。我們繼續(xù)求和的例子,演示怎么使用fork/join框架。首先需要擴展RecursiveTask類:

import java.util.concurrent.RecursiveTask;

/**
 * Created by leishu on 18-12-11.
 */
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
  //分割任務(wù)的閾值
  public static final long THRESHOLD = 10_000;
  //要被求和的數(shù)組
  private final long[] numbers;
  private final int start;
  private final int end;

  public ForkJoinSumCalculator(long[] numbers) {
    this(numbers, 0, numbers.length);
  }
  //生成子任務(wù)的私有構(gòu)造器
  private ForkJoinSumCalculator(long[] numbers, int start, int end) {
    this.numbers = numbers;
    this.start = start;
    this.end = end;
  }

  @Override
  protected Long compute() {
    //子任務(wù)的大小
    int length = end - start;
    if (length <= THRESHOLD) {
      return computeSequentially();//小于閾值,不分割
    }
    //增加第一個子任務(wù)
    ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
    //異步執(zhí)行,新的子任務(wù)使用ForkJoinPool的另一個線程
    leftTask.fork();
    ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
    //同步執(zhí)行第二個子任務(wù),允許遞歸
    Long rightResult = rightTask.compute();
    //讀取第一個子任務(wù)的結(jié)果,如果沒完成就等待
    Long leftResult = leftTask.join();
    //組合
    return leftResult + rightResult;
  }

  //順序執(zhí)行
  private long computeSequentially() {
    long sum = 0;
    for (int i = start; i < end; i++) {
      sum += numbers[i];
    }
    return sum;
  }
}

Java 并行數(shù)據(jù)處理和性能分析

使用fork/join的最佳實踐

調(diào)用任務(wù)的join方法,會阻塞調(diào)用者,直到返回結(jié)果。所以,要在兩個子任務(wù)都啟動以后在調(diào)用它

不要在RecursiveTask內(nèi)使用ForkJoinPool的invoke方法

子任務(wù)的fork方法是用來做調(diào)度的。在兩個子任務(wù)上直接調(diào)用它似乎是很自然的,但是,在其中一個上調(diào)用compute效率更高,因為這樣能重用相同的線程

偷工作

任務(wù)被分給ForkJoinPool里的線程。每個線程有一個保存任務(wù)的雙端鏈表,順序地執(zhí)行鏈表中的任務(wù)。如果由于某種原因(比如I/O),一個線程完成了分配給他的全部任務(wù),它會隨機地從其他線程選擇一個隊列,從隊列的尾部偷一個任務(wù)。這個過程會持續(xù),直到所有的隊列都空了為止。所以,要有大量的小任務(wù),而不是幾個大任務(wù),這樣可以更好地平衡線程的負(fù)荷。

Java 并行數(shù)據(jù)處理和性能分析

Spliterator

Spliterator是Java 8 提供的新接口,意思是“splitable iterator”,用來并行地迭代源中的元素。也許你不用開發(fā)自己的Spliterator,但是,理解了它,也就明白了并行流是如何工作的。Java 8已經(jīng)在Collections框架內(nèi)提供了Spliterator的默認(rèn)實現(xiàn)。Collection接口有一個default方法spliterator(),它就返回一個Spliterator對象。我們先看看Spliterator接口的定義:

public interface Spliterator<T> {
  //用來按順序消費Spliterator的元素,如果還有元素就返回true
  boolean tryAdvance(Consumer<? super T> action);
  //把一些元素分到一個新的Spliterator,以允許他們并行處理
  Spliterator<T> trySplit();
  //剩余的可被遍歷的元素數(shù)量估值
  long estimateSize();
  int characteristics();
}

tryAdvance方法的行為類似于迭代器,用來按順序消費Spliterator的元素,如果還有元素就返回true。trySplit方法

用來把一些元素分到一個新的Spliterator,以允許他們并行處理。

分割過程

把一個流分割成多個部分是一個遞歸過程,如下圖所示。首先,在第一個Spliterator上調(diào)用trySplit生成一個新的。然后,在這兩個Spliterator上調(diào)用trySplit,這樣產(chǎn)生四個。一直進行下去,直到該方法返回null,標(biāo)志著不能再被分割。最后,當(dāng)所有的trySplit都返回null時,遞歸過程結(jié)束。

Java 并行數(shù)據(jù)處理和性能分析

分割過程也會受到Spliterator的特征(由characteristics方法聲明)的影響。

Spliterator特征

characteristics方法返回一個整數(shù),用來更好地控制和優(yōu)化Spliterator的用法。

Characteristic 描述
ORDERED 元素是有順序的(比如List),所以Spliterator使用該順序做遍歷和分區(qū)
DISTINCT 對于每對遍歷的元素x和y,x.equals(y)返回false
SORTED 遍歷的元素遵循預(yù)定義的排序順序
SIZED 源的size是已知的(比如set),所以estimatedSize()返回的值是精確的
NON-NULL 元素不會為空
IMMUTABLE 源是不可變的,說明遍歷的時候,元素不會被增加、修改和刪除
CONCURRENT 源是并發(fā)安全的,并發(fā)修改的時候,不用任何同步
SUBSIZED Spliterator和接下來產(chǎn)生的Spliterator都是SIZED

實現(xiàn)自己的Spliterator

我們開發(fā)一個簡單的方法,用來計算字符串中的單詞數(shù)。

  public int countWordsIteratively(String s) {
    int counter = 0;
    boolean lastSpace = true;
    for (char c : s.toCharArray()) {
      if (Character.isWhitespace(c)) {
        lastSpace = true;
      } else {
        if (lastSpace) counter++;
        lastSpace = false;
      }
    }
    return counter;
  }

要計算的字符串是但丁的“地域”的第一句

    public static final String SENTENCE =
        " Nel  mezzo del cammin di nostra vita "
            + "mi ritrovai in una selva oscura"
            + " che la dritta via era  smarrita ";

    System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");

注意,兩個單詞間的空格數(shù)是隨機的。執(zhí)行結(jié)果

Found 19 words

使用函數(shù)式實現(xiàn)

首先需要把字符串轉(zhuǎn)換成一個流。原始類型int、long和double才有原始的的流,所以,我們使用Stream:

Stream<Character> stream = IntStream.range(0, SENTENCE.length())

.mapToObj(SENTENCE::charAt);

可以使用reduction計算單詞數(shù)量。當(dāng)reduce的時候,你不得不攜帶由兩個變量組成的狀態(tài):整數(shù)型的總數(shù)和布爾型的字符是否是空格。因為Java沒有tuples,你得增加一個新類-WordCounter-封裝狀態(tài):

  class WordCounter {
    private final int counter;
    private final boolean lastSpace;

    public WordCounter(int counter, boolean lastSpace) {
      this.counter = counter;
      this.lastSpace = lastSpace;
    }
    
    //遍歷,累加
    public WordCounter accumulate(Character c) {
      if (Character.isWhitespace(c)) {
        return lastSpace ? this : new WordCounter(counter, true);
      } else {
        //如果上一個字符是空格,而當(dāng)前的不是,就加1
        return lastSpace ? new WordCounter(counter + 1, false) : this;
      }
    }

    //組合,求和
    public WordCounter combine(WordCounter wordCounter) {
      return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);
    }

    public int getCounter() {
      return counter;
    }
  }

下面是遍歷一個新字符時,WordCounter的狀態(tài)圖

Java 并行數(shù)據(jù)處理和性能分析

然后,我們就可以使用流的reduce方法了

  private int countWords(Stream<Character> stream) {
    WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
        WordCounter::accumulate,
        WordCounter::combine);
    return wordCounter.getCounter();
  }

我們做一下測試

    Stream<Character> stream = IntStream.range(0, SENTENCE.length())
        .mapToObj(SENTENCE::charAt);
    System.out.println("Found " + countWords(stream) + " words");

執(zhí)行結(jié)果是正確的。

并行的實現(xiàn)

我們修改一下代碼

System.out.println("Found " + countWords(stream.parallel()) + " words");

執(zhí)行結(jié)果不是找到19個單詞了。因為源字符串在隨意的位置被分割,一個字符被多次分割。要解決這個問題,就需要實現(xiàn)自己的Spliterator。

  class WordCounterSpliterator implements Spliterator<Character> {

    private final String string;
    private int currentChar = 0;

    private WordCounterSpliterator(String string) {
      this.string = string;
    }

    @Override
    public boolean tryAdvance(Consumer<? super Character> action) {
      //消費當(dāng)前字符
      action.accept(string.charAt(currentChar++));
      //如果還有字符可被消費,返回true
      return currentChar < string.length();
    }

    @Override
    public Spliterator<Character> trySplit() {
      int currentSize = string.length() - currentChar;
      //小于閾值,不再分割
      if (currentSize < 10) {
        return null;
      }
      //候選的分割位置是字符串的一半長度
      for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
        //如果是空格,才分割
        if (Character.isWhitespace(string.charAt(splitPos))) {
          Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
          //當(dāng)前位置修改為分割位置
          currentChar = splitPos;
          return spliterator;
        }
      }
      return null;
    }

    @Override
    public long estimateSize() {
      return string.length() - currentChar;
    }

    @Override
    public int characteristics() {
      return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
    }
  }

然后,我們做測試

    Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
    Stream<Character> stream = StreamSupport.stream(spliterator, true);

    System.out.println("Found " + countWords(stream) + " words");

這回沒問題了。

以上這篇Java 并行數(shù)據(jù)處理和性能分析就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持億速云。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI