溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

JVM中Java和Scala并發(fā)性基礎(chǔ)是什么

發(fā)布時間:2021-10-23 17:16:31 來源:億速云 閱讀:109 作者:柒染 欄目:編程語言

本篇文章給大家分享的是有關(guān)JVM中Java和Scala并發(fā)性基礎(chǔ)是什么,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

處理器速度數(shù)十年來一直持續(xù)快速發(fā)展,并在世紀(jì)交替之際走到了終點(diǎn)。從那時起,處理器制造商更多地是通過增加核心來提高芯片性能,而不再通過增加時鐘速率來提高芯片性能。多核系統(tǒng)現(xiàn)在成為了從手機(jī)到企業(yè)服務(wù)器等所有設(shè)備的標(biāo)準(zhǔn),而這種趨勢可能繼續(xù)并有所加速。開發(fā)人員越來越需要在他們的應(yīng)用程序代碼中支持多個核心,這樣才能滿足性能需求。

在本系列文章中,您將了解一些針對 Java 和 Scala 語言的并發(fā)編程的新方法,包括 Java 如何將 Scala 和其他基于 JVM 的語言中已經(jīng)探索出來的理念結(jié)合在一起。***期文章將介紹一些背景,通過介紹 Java 7 和 Scala 的一些***技術(shù),幫助了解 JVM 上的并發(fā)編程的全景。您將了解如何使用 Java ExecutorService 和 ForkJoinPool 類來簡化并發(fā)編程。還將了解一些將并發(fā)編程選項(xiàng)擴(kuò)展到純 Java 中的已有功能之外的基本 Scala 特性。在此過程中,您會看到不同的方法對并發(fā)編程性能有何影響。后續(xù)幾期文章將會介紹 Java 8 中的并發(fā)性改進(jìn)和一些擴(kuò)展,包括用于執(zhí)行可擴(kuò)展的 Java 和 Scala 編程的 Akka 工具包。

Java 并發(fā)性支持

在 Java 平臺誕生之初,并發(fā)性支持就是它的一個特性,線程和同步的實(shí)現(xiàn)為它提供了超越其他競爭語言的優(yōu)勢。Scala 基于 Java 并在 JVM 上運(yùn)行,能夠直接訪問所有 Java 運(yùn)行時(包括所有并發(fā)性支持)。所以在分析 Scala 特性之前,我首先會快速回顧一下 Java 語言已經(jīng)提供的功能。

Java 線程基礎(chǔ)

在 Java 編程過程中創(chuàng)建和使用線程非常容易。它們由 java.lang.Thread 類表示,線程要執(zhí)行的代碼為 java.lang.Runnable 實(shí)例的形式。如果需要的話,可以在應(yīng)用程序中創(chuàng)建大量線程,您甚至可以創(chuàng)建數(shù)千個線程。在有多個核心時,JVM 使用它們來并發(fā)執(zhí)行多個線程;超出核心數(shù)量的線程會共享這些核心。

Java 5:并發(fā)性的轉(zhuǎn)折點(diǎn)

Java 從一開始就包含對線程和同步的支持。但在線程間共享數(shù)據(jù)的最初規(guī)范不夠完善,這帶來了 Java 5 的 Java 語言更新中的重大變化 (JSR-133)。Java Language Specification for Java 5 更正并規(guī)范化了 synchronized 和 volatile 操作。該規(guī)范還規(guī)定不變的對象如何使用多線程。(基本上講,只要在執(zhí)行構(gòu)造函數(shù)時不允許引用 “轉(zhuǎn)義”,不變的對象始終是線程安全的。)以前,線程間的交互通常需要使用阻塞的 synchronized 操作。這些更改支持使用 volatile 在線程間執(zhí)行非阻塞協(xié)調(diào)。因此,在 Java 5 中添加了新的并發(fā)集合類來支持非阻塞操作 — 這與早期僅支持阻塞的線程安全方法相比是一項(xiàng)重大改進(jìn)。

線程操作的協(xié)調(diào)難以讓人理解。只要從程序的角度讓所有內(nèi)容保持一致,Java 編譯器和 JVM 就不會對您代碼中的操作重新排序,這使得問題變得更加復(fù)雜。例如:如果兩個相加操作使用了不同的變量,編譯器或 JVM 可以安裝與指定的順序相反的順序執(zhí)行這些操作,只要程序不在兩個操作都完成之前使用兩個變量的總數(shù)。這種重新排序操作的靈活性有助于提高 Java 性能,但一致性只被允許應(yīng)用在單個線程中。硬件也有可能帶來線程問題。現(xiàn)代系統(tǒng)使用了多種緩存內(nèi)存級別,一般來講,不是系統(tǒng)中的所有核心都能同樣看到這些緩存。當(dāng)某個核心修改內(nèi)存中的一個值時,其他核心可能不會立即看到此更改。

由于這些問題,在一個線程使用另一個線程修改的數(shù)據(jù)時,您必須顯式地控制線程交互方式。Java 使用了特殊的操作來提供這種控制,在不同線程看到的數(shù)據(jù)視圖中建立順序?;静僮魇?,線程使用 synchronized 關(guān)鍵字來訪問一個對象。當(dāng)某個線程在一個對象上保持同步時,該線程將會獲得此對象所獨(dú)有的一個鎖的獨(dú)占訪問。如果另一個線程已持有該鎖,等待獲取該鎖的線程必須等待,或者被阻塞,直到該鎖被釋放。當(dāng)該線程在一個 synchronized 代碼塊內(nèi)恢復(fù)執(zhí)行時,Java 會保證該線程可以 “看到了” 以前持有同一個鎖的其他線程寫入的所有數(shù)據(jù),但只是這些線程通過離開自己的 synchronized 鎖來釋放該鎖之前寫入的數(shù)據(jù)。這種保證既適用于編譯器或 JVM 所執(zhí)行的操作的重新排序,也適用于硬件內(nèi)存緩存。一個 synchronized 塊的內(nèi)部是您代碼中的一個穩(wěn)定性孤島,其中的線程可依次安全地執(zhí)行、交互和共享信息。

在變量上對 volatile 關(guān)鍵字的使用,為線程間的安全交互提供了一種稍微較弱的形式。synchronized 關(guān)鍵字可確保在您獲取該鎖時可以看到其他線程的存儲,而且在您之后,獲取該鎖的其他線程也會看到您的存儲。volatile 關(guān)鍵字將這一保證分解為兩個不同的部分。如果一個線程向volatile 變量寫入數(shù)據(jù),那么首先將會擦除它在這之前寫入的數(shù)據(jù)。如果某個線程讀取該變量,那么該線程不僅會看到寫入該變量的值,還會看到寫入的線程所寫入的其他所有值。所以讀取一個 volatile 變量會提供與輸入 一個 synchronized 塊相同的內(nèi)存保證,而且寫入一個volatile 變量會提供與離開 一個 synchronized 塊相同的內(nèi)存保證。但二者之間有很大的差別:volatile 變量的讀取或?qū)懭虢^不會受阻塞。

抽象 Java 并發(fā)性

同步很有用,而且許多多線程應(yīng)用程序都是在 Java 中僅使用基本的 synchronized 塊開發(fā)出來的。但協(xié)調(diào)線程可能很麻煩,尤其是在處理許多線程和許多塊的時候。確保線程僅在安全的方式下交互 避免潛在的死鎖(兩個或更多線程等待對方釋放鎖之后才能繼續(xù)執(zhí)行),這很困難。支持并發(fā)性而不直接處理線程和鎖的抽象,這為開發(fā)人員提供了處理常見用例的更好方法。

java.util.concurrent 分層結(jié)構(gòu)包含一些集合變形,它們支持并發(fā)訪問、針對原子操作的包裝器類,以及同步原語。這些類中的許多都是為支持非阻塞訪問而設(shè)計的,這避免了死鎖的問題,而且實(shí)現(xiàn)了更高效的線程。這些類使得定義和控制線程之間的交互變得更容易,但他們?nèi)匀幻媾R著基本線程模型的一些復(fù)雜性。

java.util.concurrent 包中的一對抽象,支持采用一種更加分離的方法來處理并發(fā)性:Future<T> 接口、Executor 和ExecutorService 接口。這些相關(guān)的接口進(jìn)而成為了對 Java 并發(fā)性支持的許多 Scala 和 Akka 擴(kuò)展的基礎(chǔ),所以更詳細(xì)地了解這些接口和它們的實(shí)現(xiàn)是值得的。

Future<T> 是一個 T 類型的值的持有者,但奇怪的是該值一般在創(chuàng)建 Future 之后才能使用。正確執(zhí)行一個同步操作后,才會獲得該值。收到Future 的線程可調(diào)用方法來:

  • 查看該值是否可用

  • 等待該值變?yōu)榭捎?/p>

  • 在該值可用時獲取它

  • 如果不再需要該值,則取消該操作

Future 的具體實(shí)現(xiàn)結(jié)構(gòu)支持處理異步操作的不同方式。

Executor 是一種圍繞某個執(zhí)行任務(wù)的東西的抽象。這個 “東西” 最終將是一個線程,但該接口隱藏了該線程處理執(zhí)行的細(xì)節(jié)。Executor 本身的適用性有限,ExecutorService 子接口提供了管理終止的擴(kuò)展方法,并為任務(wù)的結(jié)果生成了 Future。Executor 的所有標(biāo)準(zhǔn)實(shí)現(xiàn)還會實(shí)現(xiàn)ExecutorService,所以實(shí)際上,您可以忽略根接口。

線程是相對重量級的資源,而且與分配并丟棄它們相比,重用它們更有意義。ExecutorService 簡化了線程間的工作共享,還支持自動重用線程,實(shí)現(xiàn)了更輕松的編程和更高的性能。ExecutorService 的 ThreadPoolExecutor 實(shí)現(xiàn)管理著一個執(zhí)行任務(wù)的線程池。

應(yīng)用 Java 并發(fā)性

并發(fā)性的實(shí)際應(yīng)用常常涉及到需要與您的主要處理邏輯獨(dú)立的外部交互的任務(wù)(與用戶、存儲或其他系統(tǒng)的交互)。這類應(yīng)用很難濃縮為一個簡單的示例,所以在演示并發(fā)性的時候,人們通常會使用簡單的計算密集型任務(wù),比如數(shù)學(xué)計算或排序。我將使用一個類似的示例。

任務(wù)是找到離一個未知的輸入最近的已知單詞,其中的最近 是按照Levenshtein 距離 來定義的:將輸入轉(zhuǎn)換為已知的單詞所需的最少的字符增加、刪除或更改次數(shù)。我使用的代碼基于 Wikipedia 上的 Levenshtein 距離 文章中的一個示例,該示例計算了每個已知單詞的 Levenshtein 距離,并返回***匹配值(或者如果多個已知的單詞擁有相同的距離,那么返回結(jié)果是不確定的)。

清單 1 給出了計算 Levenshtein 距離的 Java 代碼。該計算生成一個矩陣,將行和列與兩個對比的文本的大小進(jìn)行匹配,在每個維度上加 1。為了提高效率,此實(shí)現(xiàn)使用了一對大小與目標(biāo)文本相同的數(shù)組來表示矩陣的連續(xù)行,將這些數(shù)組包裝在每個循環(huán)中,因?yàn)槲抑恍枰弦恍械闹稻涂梢杂嬎阆乱恍小?/p>

清單 1. Java 中的 Levenshtein 距離計算
/**   * Calculate edit distance from targetText to known word.   *   * @param word known word   * @param v0 int array of length targetText.length() + 1   * @param v1 int array of length targetText.length() + 1   * @return distance   */ private int editDistance(String word, int[] v0, int[] v1) {         // initialize v0 (prior row of distances) as edit distance for empty 'word'      for (int i = 0; i < v0.length; i++) {          v0[i] = i;      }         // calculate updated v0 (current row distances) from the previous row v0      for (int i = 0; i < word.length(); i++) {             // first element of v1 = delete (i+1) chars from target to match empty 'word'          v1[0] = i + 1;             // use formula to fill in the rest of the row          for (int j = 0; j < targetText.length(); j++) {              int cost = (word.charAt(i) == targetText.charAt(j)) ? 0 : 1;              v1[j + 1] = minimum(v1[j] + 1, v0[j + 1] + 1, v0[j] + cost);          }             // swap v1 (current row) and v0 (previous row) for next iteration          int[] hold = v0;          v0 = v1;          v1 = hold;      }         // return final value representing best edit distance      return v0[targetText.length()];  }

如果有大量已知詞匯要與未知的輸入進(jìn)行比較,而且您在一個多核系統(tǒng)上運(yùn)行,那么您可以使用并發(fā)性來加速處理:將已知單詞的集合分解為多個塊,將每個塊作為一個獨(dú)立任務(wù)來處理。通過更改每個塊中的單詞數(shù)量,您可以輕松地更改任務(wù)分解的粒度,從而了解它們對總體性能的影響。清單 2 給出了分塊計算的 Java 代碼,摘自 示例代碼 中的 ThreadPoolDistance 類。清單 2 使用一個標(biāo)準(zhǔn)的 ExecutorService,將線程數(shù)量設(shè)置為可用的處理器數(shù)量。

清單 2. 在 Java 中通過多個線程來執(zhí)行分塊的距離計算
private final ExecutorService threadPool;  private final String[] knownWords;  private final int blockSize;     public ThreadPoolDistance(String[] words, int block) {      threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());      knownWords = words;      blockSize = block;  }     public DistancePair bestMatch(String target) {         // build a list of tasks for matching to ranges of known words      List<DistanceTask> tasks = new ArrayList<DistanceTask>();         int size = 0;      for (int base = 0; base < knownWords.length; base += size) {          size = Math.min(blockSize, knownWords.length - base);          tasks.add(new DistanceTask(target, base, size));      }      DistancePair best;      try {             // pass the list of tasks to the executor, getting back list of futures          List<Future<DistancePair>> results = threadPool.invokeAll(tasks);             // find the best result, waiting for each future to complete          best = DistancePair.WORST_CASE;          for (Future<DistancePair> future: results) {              DistancePair result = future.get();              best = DistancePair.best(best, result);          }         } catch (InterruptedException e) {          throw new RuntimeException(e);      } catch (ExecutionException e) {          throw new RuntimeException(e);      }      return best;  }     /**   * Shortest distance task implementation using Callable.   */ public class DistanceTask implements Callable<DistancePair>  {      private final String targetText;      private final int startOffset;      private final int compareCount;         public DistanceTask(String target, int offset, int count) {          targetText = target;          startOffset = offset;          compareCount = count;      }         private int editDistance(String word, int[] v0, int[] v1) {          ...      }         /* (non-Javadoc)       * @see java.util.concurrent.Callable#call()       */     @Override     public DistancePair call() throws Exception {             // directly compare distances for comparison words in range          int[] v0 = new int[targetText.length() + 1];          int[] v1 = new int[targetText.length() + 1];          int bestIndex = -1;          int bestDistance = Integer.MAX_VALUE;          boolean single = false;          for (int i = 0; i < compareCount; i++) {              int distance = editDistance(knownWords[i + startOffset], v0, v1);              if (bestDistance > distance) {                  bestDistance = distance;                  bestIndex = i + startOffset;                  single = true;              } else if (bestDistance == distance) {                  single = false;              }          }          return single ? new DistancePair(bestDistance, knownWords[bestIndex]) :                  new DistancePair(bestDistance);      }  }

清單 2 中的 bestMatch() 方法構(gòu)造一個 DistanceTask 距離列表,然后將該列表傳遞給 ExecutorService。這種對 ExecutorService 的調(diào)用形式將會接受一個 Collection<? extends Callable<T>> 類型的參數(shù),該參數(shù)表示要執(zhí)行的任務(wù)。該調(diào)用返回一個 Future<T> 列表,用它來表示執(zhí)行的結(jié)果。ExecutorService 使用在每個任務(wù)上調(diào)用 call() 方法所返回的值,異步填寫這些結(jié)果。在本例中,T 類型為DistancePair&mdash; 一個表示距離和匹配的單詞的簡單的值對象,或者在沒有找到惟一匹配值時近表示距離。

bestMatch() 方法中執(zhí)行的原始線程依次等待每個 Future 完成,累積***的結(jié)果并在完成時返回它。通過多個線程來處理 DistanceTask 的執(zhí)行,原始線程只需等待一小部分結(jié)果。剩余結(jié)果可與原始線程等待的結(jié)果并發(fā)地完成。

并發(fā)性性能

要充分利用系統(tǒng)上可用的處理器數(shù)量,必須為 ExecutorService 配置至少與處理器一樣多的線程。您還必須將至少與處理器一樣多的任務(wù)傳遞給ExecutorService 來執(zhí)行。實(shí)際上,您或許希望擁有比處理器多得多的任務(wù),以實(shí)現(xiàn)***的性能。這樣,處理器就會繁忙地處理一個接一個的任務(wù),近在***才空閑下來。但是因?yàn)樯婕暗介_銷(在創(chuàng)建任務(wù)和 future 的過程中,在任務(wù)之間切換線程的過程中,以及最終返回任務(wù)的結(jié)果時),您必須保持任務(wù)足夠大,以便開銷是按比例減小的。

圖 1 展示了我在使用 Oracle 的 Java 7 for 64-bit Linux&reg; 的四核 AMD 系統(tǒng)上運(yùn)行測試代碼時測量的不同任務(wù)數(shù)量的性能。每個輸入單詞依次與 12,564 個已知單詞相比較,每個任務(wù)在一定范圍的已知單詞中找到***的匹配值。全部 933 個拼寫錯誤的輸入單詞會重復(fù)運(yùn)行,每輪運(yùn)行之間會暫停片刻供 JVM 處理,該圖中使用了 10 輪運(yùn)行后的***時間。從圖 1 中可以看出,每秒的輸入單詞性能在合理的塊大小范圍內(nèi)(基本來講,從 256 到大于 1,024)看起來是合理的,只有在任務(wù)變得非常小或非常大時,性能才會極速下降。對于塊大小 16,384,***的值近創(chuàng)建了一個任務(wù),所以顯示了單線程性能。

圖 1. ThreadPoolDistance 性能

JVM中Java和Scala并發(fā)性基礎(chǔ)是什么

Fork-Join

Java 7 引入了 ExecutorService 的另一種實(shí)現(xiàn):ForkJoinPool 類。ForkJoinPool 是為高效處理可反復(fù)分解為子任務(wù)的任務(wù)而設(shè)計的,它使用 RecursiveAction 類(在任務(wù)未生成結(jié)果時)或 RecursiveTask<T> 類(在任務(wù)具有一個 T 類型的結(jié)果時)來處理任務(wù)。RecursiveTask<T> 提供了一種合并子任務(wù)結(jié)果的便捷方式,如清單 3 所示。

清單 3. RecursiveTask<DistancePair> 示例
private ForkJoinPool threadPool = new ForkJoinPool();     private final String[] knownWords;     private final int blockSize;     public ForkJoinDistance(String[] words, int block) {      knownWords = words;      blockSize = block;  }     public DistancePair bestMatch(String target) {      return threadPool.invoke(new DistanceTask(target, 0, knownWords.length, knownWords));  }     /**   * Shortest distance task implementation using RecursiveTask.   */ public class DistanceTask extends RecursiveTask<DistancePair>  {      private final String compareText;      private final int startOffset;      private final int compareCount;      private final String[] matchWords;         public DistanceTask(String from, int offset, int count, String[] words) {          compareText = from;          startOffset = offset;          compareCount = count;          matchWords = words;      }         private int editDistance(int index, int[] v0, int[] v1) {          ...      }         /* (non-Javadoc)       * @see java.util.concurrent.RecursiveTask#compute()       */     @Override     protected DistancePair compute() {          if (compareCount > blockSize) {                 // split range in half and find best result from bests in each half of range              int half = compareCount / 2;              DistanceTask t1 = new DistanceTask(compareText, startOffset, half, matchWords);              t1.fork();              DistanceTask t2 = new DistanceTask(compareText, startOffset + half,                  compareCount - half, matchWords);              DistancePair p2 = t2.compute();              return DistancePair.best(p2, t1.join());          }             // directly compare distances for comparison words in range          int[] v0 = new int[compareText.length() + 1];          int[] v1 = new int[compareText.length() + 1];          int bestIndex = -1;          int bestDistance = Integer.MAX_VALUE;          boolean single = false;          for (int i = 0; i < compareCount; i++) {              int distance = editDistance(i + startOffset, v0, v1);              if (bestDistance > distance) {                  bestDistance = distance;                  bestIndex = i + startOffset;                  single = true;              } else if (bestDistance == distance) {                  single = false;              }          }          return single ? new DistancePair(bestDistance, knownWords[bestIndex]) :              new DistancePair(bestDistance);      }  }

圖 2 顯示了清單 3 中的 ForkJoin 代碼與 清單 2 中的 ThreadPool 代碼的性能對比。ForkJoin 代碼在所有塊大小中穩(wěn)定得多,僅在您只有單個塊(意味著執(zhí)行是單線程的)時性能會顯著下降。標(biāo)準(zhǔn)的 ThreadPool 代碼僅在塊大小為 256 和 1,024 時會表現(xiàn)出更好的性能。

圖 2. ThreadPoolDistance 與 ForkJoinDistance 的性能對比

JVM中Java和Scala并發(fā)性基礎(chǔ)是什么

這些結(jié)果表明,如果可調(diào)節(jié)應(yīng)用程序中的任務(wù)大小來實(shí)現(xiàn)***的性能,那么使用標(biāo)準(zhǔn) ThreadPool 比 ForkJoin 更好。但請注意,ThreadPool的 “***性能點(diǎn)” 取決于具體任務(wù)、可用處理器數(shù)量以及您系統(tǒng)的其他因素。一般而言,ForkJoin 以最小的調(diào)優(yōu)需求帶來了優(yōu)秀的性能,所以***盡可能地使用它。

Scala 并發(fā)性基礎(chǔ)

Scala 通過許多方式擴(kuò)展了 Java 編程語言和運(yùn)行時,其中包括添加更多、更輕松的處理并發(fā)性的方式。對于初學(xué)者而言,Future<T> 的 Scala 版本比 Java 版本靈活得多。您可以直接從代碼塊中創(chuàng)建 future,可向 future 附加回調(diào)來處理這些 future 的完成。清單 4 顯示了 Scala future 的一些使用示例。該代碼首先定義了 futureInt() 方法,以便按需提供 Future<Int>,然后通過三種不同的方式來使用 future。

清單 4. Scala Future<T> 示例代碼
import ExecutionContext.Implicits.global     val lastInteger = new AtomicInteger  def futureInt() = future {    Thread sleep 2000   lastInteger incrementAndGet  }     // use callbacks for completion of futures  val a1 = futureInt  val a2 = futureInt  a1.onSuccess {      case i1 => {        a2.onSuccess {          case i2 => println("Sum of values is " + (i1 + i2))        }      }  }  Thread sleep 3000    // use for construct to extract values when futures complete  val b1 = futureInt  val b2 = futureInt  for (i1 <- b1; i2 <- b2) yield println("Sum of values is " + (i1 + i2))  Thread sleep 3000    // wait directly for completion of futures  val c1 = futureInt  val c2 = futureInt  println("Sum of values is " + (Await.result(c1, Duration.Inf) +    Await.result(c2, Duration.Inf)))

清單 4 中的***個示例將回調(diào)閉包附加到一對 future 上,以便在兩個 future 都完成時,將兩個結(jié)果值的和打印到控制臺上?;卣{(diào)是按照創(chuàng)建它們的順序直接嵌套在 future 上,但是,即使更改順序,它們也同樣有效。如果在您附加回調(diào)時 future 已完成,該回調(diào)仍會運(yùn)行,但無法保證它會立即運(yùn)行。原始執(zhí)行線程會在 Thread sleep 3000 行上暫停,以便在進(jìn)入下一個示例之前完成 future。

第二個示例演示了使用 Scala for comprehension 從 future 中異步提取值,然后直接在表達(dá)式中使用它們。for comprehension 是一種 Scala 結(jié)構(gòu),可用于簡潔地表達(dá)復(fù)雜的操作組合(mapfilter、flatMap 和 foreach)。它一般與各種形式的集合結(jié)合使用,但 Scala future 實(shí)現(xiàn)了相同的單值方法來訪問集合值。所以可以使用 future 作為一種特殊的集合,一種包含最多一個值(可能甚至在未來某個時刻之前之后才包含該值)的集合。在這種情況下,for 語句要求獲取 future 的結(jié)果,并在表達(dá)式中使用這些結(jié)果值。在幕后,這種技術(shù)會生成與***個示例完全相同的代碼,但以線性代碼的形式編寫它會得到更容易理解的更簡單的表達(dá)式。和***個示例一樣,原始執(zhí)行線程會暫停,以便在進(jìn)入下一個示例之前完成 future。

第三個示例使用阻塞等待來獲取 future 的結(jié)果。這與 Java future 的工作原理相同,但在 Scala 中,一個獲取***等待時間參數(shù)的特殊Await.result() 方法調(diào)用會讓阻塞等待變得更為明顯。

清單 4 中的代碼沒有顯式地將 future 傳遞給 ExecutorService 或等效的對象,所以如果沒有使用過 Scala,那么您可能想知道 future 內(nèi)部的代碼是如何執(zhí)行的。答案取決于 清單 4 中最上面一行:import ExecutionContext.Implicits.global。Scala API 常常為代碼塊中頻繁重用的參數(shù)使用 implicit 值。future { } 結(jié)構(gòu)要求 ExecutionContext 以隱式參數(shù)的形式提供。這個 ExecutionContext 是 JavaExecutorService 的一個 Scala 包裝器,以相同方式用于使用一個或多個托管線程來執(zhí)行任務(wù)。

除了 future 的這些基本操作之外,Scala 還提供了一種方式將任何集合轉(zhuǎn)換為使用并行編程的集合。將集合轉(zhuǎn)換為并行格式后,您在集合上執(zhí)行的任何標(biāo)準(zhǔn)的 Scala 集合操作(比如 map、filter 或 fold)都會自動地盡可能并行完成。(本文稍后會在 清單 7 中提供一個相關(guān)示例,該示例使用 Scala 查找一個單詞的***匹配值。)

錯誤處理

Java 和 Scala 中的 future 都必須解決錯誤處理的問題。在 Java 中,截至 Java 7,future 可拋出一個 ExecutionException 作為返回結(jié)果的替代方案。應(yīng)用程序可針對具體的失敗類型而定義自己的 ExecutionException 子類,或者可連鎖異常來傳遞詳細(xì)信息,但這限制了靈活性。

Scala future 提供了更靈活的錯誤處理。您可以通過兩種方式完成 Scala future:成功時提供一個結(jié)果值(假設(shè)要求一個結(jié)果值),或者在失敗時提供一個關(guān)聯(lián)的 Throwable。您也可以采用多種方式處理 future 的完成。在 清單 4 中,onSuccess 方法用于附加回調(diào)來處理 future 的成功完成。您還可以使用 onComplete 來處理任何形式的完成(它將結(jié)果或 throwable 包裝在一個 Try 中來適應(yīng)兩種情況),或者使用 onFailure 來專門處理錯誤結(jié)果。Scala future 的這種靈活性擴(kuò)展到了您可以使用 future 執(zhí)行的所有操作,所以您可以將錯誤處理直接集成到代碼中。

這個 Scala Future<T> 還有一個緊密相關(guān)的 Promise<T> 類。future 是一個結(jié)果的持有者,該結(jié)果在某個時刻可能可用(或不可用 &mdash; 無法內(nèi)在地確保一個 future 將完成)。future 完成后,結(jié)果是固定的,不會發(fā)生改變。promise 是這個相同契約的另一端:結(jié)果的一個一次性、可分配的持有者,具有結(jié)果值或 throwable 的形式。可從 promise 獲取 future,在 promise 上設(shè)置了結(jié)果后,就可以在該 future 上設(shè)置此結(jié)果。

應(yīng)用 Scala 并發(fā)性

現(xiàn)在您已熟悉一些基本的 Scala 并發(fā)性概念,是時候來了解一下解決 Levenshtein 距離問題的代碼了。清單 5 顯示了 Levenshtein 距離計算的一個比較符合語言習(xí)慣的 Scala 實(shí)現(xiàn),該代碼基本上與 清單 1 中的 Java 代碼類似,但采用了函數(shù)風(fēng)格。

清單 5. Scala 中的 Levenshtein 距離計算
val limit = targetText.length  /** Calculate edit distance from targetText to known word.    *    * @param word known word    * @param v0 int array of length targetText.length + 1    * @param v1 int array of length targetText.length + 1    * @return distance    */ def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = {       val length = word.length       @tailrec   def distanceByRow(rnum: Int, r0: Array[Int], r1: Array[Int]): Int = {      if (rnum >= length) r0(limit)      else {           // first element of r1 = delete (i+1) chars from target to match empty 'word'        r1(0) = rnum + 1          // use formula to fill in the rest of the row        for (j <- 0 until limit) {          val cost = if (word(rnum) == targetText(j)) 0 else 1         r1(j + 1) = min(r1(j) + 1, r0(j + 1) + 1, r0(j) + cost);        }           // recurse with arrays swapped for next row        distanceByRow(rnum + 1, r1, r0)      }    }       // initialize v0 (prior row of distances) as edit distance for empty 'word'    for (i <- 0 to limit) v0(i) = i       // recursively process rows matching characters in word being compared to find best    distanceByRow(0, v0, v1)  }

清單 5 中的代碼對每個行值計算使用了尾部遞歸 distanceByRow() 方法。此方法首先檢查計算了多少行,如果該數(shù)字與檢查的單詞中的字符數(shù)匹配,則返回結(jié)果距離。否則會計算新的行值,然后遞歸地調(diào)用自身來計算下一行(將兩個行數(shù)組包裝在該進(jìn)程中,以便正確地傳遞新的***的行值)。Scala 將尾部遞歸方法轉(zhuǎn)換為與 Java while 循環(huán)等效的代碼,所以保留了與 Java 代碼的相似性。

但是,此代碼與 Java 代碼之間有一個重大區(qū)別。清單 5 中的 for comprehension 使用了閉包。閉包并不總是得到了當(dāng)前 JVM 的高效處理(參閱Why is using for/foreach on a Range slow?,了解有關(guān)的詳細(xì)信息),所以它們在該計算的最里層循環(huán)上增加了大量開銷。如上所述,清單 5 中的代碼的運(yùn)行速度沒有 Java 版本那么快。清單 6 重寫了代碼,將 for comprehension 替換為添加的尾部遞歸方法。這個版本要詳細(xì)得多,但執(zhí)行效率與 Java 版本相當(dāng)。

清單 6. 為提升性能而重新構(gòu)造的計算代碼
val limit = targetText.length     /** Calculate edit distance from targetText to known word.    *    * @param word known word    * @param v0 int array of length targetText.length + 1    * @param v1 int array of length targetText.length + 1    * @return distance    */ def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = {       val length = word.length       @tailrec   def distanceByRow(row: Int, r0: Array[Int], r1: Array[Int]): Int = {      if (row >= length) r0(limit)      else {           // first element of v1 = delete (i+1) chars from target to match empty 'word'        r1(0) = row + 1          // use formula recursively to fill in the rest of the row        @tailrec       def distanceByColumn(col: Int): Unit = {          if (col < limit) {            val cost = if (word(row) == targetText(col)) 0 else 1           r1(col + 1) = min(r1(col) + 1, r0(col + 1) + 1, r0(col) + cost)            distanceByColumn(col + 1)          }        }        distanceByColumn(0)           // recurse with arrays swapped for next row        distanceByRow(row + 1, r1, r0)      }    }       // initialize v0 (prior row of distances) as edit distance for empty 'word'    @tailrec   def initArray(index: Int): Unit = {      if (index <= limit) {        v0(index) = index        initArray(index + 1)      }    }    initArray(0)       // recursively process rows matching characters in word being compared to find best    distanceByRow(0, v0, v1)  }

清單 7 給出的 Scala 代碼執(zhí)行了與 清單 2 中的 Java 代碼相同的阻塞的距離計算。bestMatch() 方法找到由 Matcher 類實(shí)例處理的特定單詞塊中與目標(biāo)文本最匹配的單詞,使用尾部遞歸 best() 方法來掃描單詞。*Distance 類創(chuàng)建多個 Matcher 實(shí)例,每個對應(yīng)一個單詞塊,然后協(xié)調(diào)匹配結(jié)果的執(zhí)行和組合。

清單 7. Scala 中使用多個線程的一次阻塞距離計算
class Matcher(words: Array[String]) {       def bestMatch(targetText: String) = {         val limit = targetText.length      val v0 = new Array[Int](limit + 1)      val v1 = new Array[Int](limit + 1)         def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = {        ...      }         @tailrec     /** Scan all known words in range to find best match.        *          * @param index next word index        * @param bestDist minimum distance found so far        * @param bestMatch unique word at minimum distance, or None if not unique        * @return best match        */     def best(index: Int, bestDist: Int, bestMatch: Option[String]): DistancePair =        if (index < words.length) {          val newDist = editDistance(words(index), v0, v1)          val next = index + 1         if (newDist < bestDist) best(next, newDist, Some(words(index)))          else if (newDist == bestDist) best(next, bestDist, None)          else best(next, bestDist, bestMatch)        } else DistancePair(bestDist, bestMatch)         best(0, Int.MaxValue, None)    }  }     class ParallelCollectionDistance(words: Array[String], size: Int) extends TimingTestBase {       val matchers = words.grouped(size).map(l => new Matcher(l)).toList       def shutdown = {}       def blockSize = size       /** Find best result across all matchers, using parallel collection. */   def bestMatch(target: String) = {      matchers.par.map(m => m.bestMatch(target)).        foldLeft(DistancePair.worstMatch)((a, m) => DistancePair.best(a, m))    }  }     class DirectBlockingDistance(words: Array[String], size: Int) extends TimingTestBase {       val matchers = words.grouped(size).map(l => new Matcher(l)).toList       def shutdown = {}       def blockSize = size       /** Find best result across all matchers, using direct blocking waits. */   def bestMatch(target: String) = {      import ExecutionContext.Implicits.global      val futures = matchers.map(m => future { m.bestMatch(target) })      futures.foldLeft(DistancePair.worstMatch)((a, v) =>        DistancePair.best(a, Await.result(v, Duration.Inf)))    }  }

清單 7 中的兩個 *Distance 類顯示了協(xié)調(diào) Matcher 結(jié)果的執(zhí)行和組合的不同方式。ParallelCollectionDistance 使用前面提到的 Scala 的并行集合 feature 來隱藏并行計算的細(xì)節(jié),只需一個簡單的 foldLeft 就可以組合結(jié)果。

DirectBlockingDistance 更加明確,它創(chuàng)建了一組 future,然后在該列表上為每個結(jié)果使用一個 foldLeft 和嵌套的阻塞等待。

性能再分析

清單 7 中的兩個 *Distance 實(shí)現(xiàn)都是處理 Matcher 結(jié)果的合理方法。(它們不僅合理,而且非常高效。示例代碼 包含我在試驗(yàn)中嘗試的其他兩種實(shí)現(xiàn),但未包含在本文中。)在這種情況下,性能是一個主要問題,所以圖 3 顯示了這些實(shí)現(xiàn)相對于 Java ForkJoin 代碼的性能。

圖 3. ForkJoinDistance 與 Scala 替代方案的性能對比

JVM中Java和Scala并發(fā)性基礎(chǔ)是什么

圖 3 顯示,Java ForkJoin 代碼的性能比每種 Scala 實(shí)現(xiàn)都更好,但 DirectBlockingDistance 在 1,024 的塊大小下提供了更好的性能。兩種 Scala 實(shí)現(xiàn)在大部分塊大小下,都提供了比 清單 1 中的 ThreadPool 代碼更好的性能。

這些性能結(jié)果僅是演示結(jié)果,不具權(quán)威性。如果您在自己的系統(tǒng)上運(yùn)行計時測試,可能會看到不同的性能,尤其在使用不同數(shù)量的核心的時候。如果希望為距離任務(wù)獲得***的性能,那么可以實(shí)現(xiàn)一些優(yōu)化:可以按照長度對已知單詞進(jìn)行排序,首先與長度和輸入相同的單詞進(jìn)行比較(因?yàn)榫庉嬀嚯x總是不低于與單詞長度之差)。或者我可以在距離計算超出之前的***值時,提前退出計算。但作為一個相對簡單的算法,此試驗(yàn)公平地展示了兩種并發(fā)操作是如何提高性能的,以及不同的工作共享方法的影響。

在性能方面,清單 7 中的 Scale 控制代碼與 清單 2 和 清單 3 中的 Java 代碼的對比結(jié)果很有趣。Scala 代碼短得多,而且(假設(shè)您熟悉 Scala?。┍?Java 代碼更清晰。Scala 和 Java 可很好的相互操作,您可以在本文的 完整示例代碼 中看到:Scala 代碼對 Scala 和 Java 代碼都運(yùn)行了計時測試,Java 代碼進(jìn)而直接處理 Scala 代碼的各部分。得益于這種輕松的互操作性,您可以將 Scala 引入現(xiàn)有的 Java 代碼庫中,無需進(jìn)行通盤修改。最初使用 Scala 為 Java 代碼實(shí)現(xiàn)高水平控制常常很有用,這樣您就可以充分利用 Scala 強(qiáng)大的表達(dá)特性,同時沒有閉包或轉(zhuǎn)換的任何重大性能影響。

清單 7 中的 ParallelCollectionDistance Scala 代碼的簡單性非常具有吸引力。使用此方法,您可以從代碼中完全抽象出并發(fā)性,從而編寫類似單線程應(yīng)用程序的代碼,同時仍然獲得多個處理器的優(yōu)勢。幸運(yùn)的是,對于喜歡此方法的簡單性但又不愿意或無法執(zhí)行 Scala 開發(fā)的人而言,Java 8 帶來了一種執(zhí)行直接的 Java 編程的類似特性。

現(xiàn)在您已經(jīng)了解了 Java 和 Scala 并發(fā)性操作的基礎(chǔ)知識,Java 8 的許多改動您看起來可能都很熟悉(Scala 并發(fā)性特性中使用的許多相同的概念都包含在 Java 8 中),所以您很快就能夠在普通的 Java 代碼中使用一些 Scala 技術(shù)。

以上就是JVM中Java和Scala并發(fā)性基礎(chǔ)是什么,小編相信有部分知識點(diǎn)可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI