Purely Function 的平行化 (1)

摩爾定律 所賜,現在的計算資源越來越強,有越來越多的程式可以分散地、平行化 (parallelism) 的運行,平行運行下的程式勢必會遇到諸如 race condition、deadlock 或難以測試的問題,之後幾天我們會使用 pure function 來建立支援平行化和異步計算的 library,除此之外,我們還可以學習從 functional programming 的角度應對平行運行時的設計思維,

pure function 的好處之一就是易於組合和模組化,所以我們會維持一貫主題 分離關注點,直到真的 ”運行” 之前,把所有計算、轉換當成某種表達式來 “組合” 起來。

就讓我們從簡單的使用案例開始吧!

推導出 library 中需要哪些核心 API

假設我們有個 sum function 加總 Seq,

def sum(ints: IndexedSeq[Int]) =
  ints.foldLeft(0)(_ + _)

如果我們不想依序的加總,我們可以用 divide-and-conquer (分治) 算法來加速計算,

def sum(ints: IndexedSeq[Int]): Int =
  if ints.size <= 1 then
    ints.headOption.getOrElse(0)
  else
    val (l, r) = ints.splitAt(ints.size / 2)
    sum(l) + sum(r)

sum(l) 和 sum(r) 是我們可以進行平行運算的地方,所以我們可以定義 Par[A] 這個容器型態來表示它可能會在不同執行緒上運行,然後將欲回傳的屬性表示 A,以此處來說就是 Int,最後我們得取得運行結果,所以我們需要以下 2 個 function:

  • def unit[A](a: => A): Par[A] 接受還沒 evaluate (運行) 的 call-by-name 表達式參數,然後回傳 Par,表示它可能會在不同執行緒上運行。
  • def get[A](a: Par[A]): A 從平行化運行的 Par 中取得結果。

根據我們定義的 type 修改後的程式如下。

def sum(ints: IndexedSeq[Int]): Int =
  if ints.size <= 1 then
    ints.headOption.getOrElse(0)
  else
    val (l, r) = ints.splitAt(ints.size / 2)
    val sumL: Par[Int] = Par.unit(sum(l))
    val sumR: Par[Int] = Par.unit(sum(r))
    Par.get(sumL) + Par.get(sumR)

為什麼我們不使用 java.lang.Thraed 呢?最大的原因是 Thread 的 start 和 join 並沒有回傳有意義的值,所以若我們要從 Runnable 取得結果時,勢必會發生 side effect。

public interface Runnable {
    public abstract void run();
}

public class Thread implements Runnable {
    public synchronized void start()
    public final void join()
}

在這裡,如果我們使用 Substitution Model 把 sumL 和 sumR 替換掉的話,雖然結果還是正確,但它失去平行化功能了,

Par.get(Par.unit(sum(l))) + Par.get(Par.unit(sum(r)))

代表 unit 在給 get 當參數用時,有著 side effect,我們不能直接內嵌 unit 進去,因為 get 得等待 Par 運行完成然後取得結果,

所以看起來我們要避免調用 get,或者減少調用次數,在最終階段才調用,且我們也想要 Par 型態是有能力組合異步計算,而不用等待執行緒完成,

或許我們可定義一個新 function map2 來嘗試解決,

def sum(ints: IndexedSeq[Int]): Par[Int] =
  if ints.size <= 1 then
    Par.unit(ints.headOption.getOrElse(0))
  else
    val (l, r) = ints.splitAt(ints.size / 2)
    Par.map2(sum(l), sum(r))(_ + _)

但此時又有新的問題了,如果我們細部拆解調用順序,如下圖,

可以發現因為 scala function 預設是 strict 的關係,evaluate 參數的順序是由左到右,所以 sum(r) 不會馬上執行,而是要等到 sum(l) 做完才會輪到 sum(r),看起來我們得讓 map2 lazy,而且能立即把 2 個參數平行化運行。

Par.map2(Par.unit(sum(l)), Par.unit(sum(r)))(_ + _)

明確的 fork

但這樣真的好嗎?如果是以下程式平行化運行好像沒什麼太意義,我們真的需要分隔 logical thread (邏輯執行緒) 來運行嗎?

Par.map2(Par.unit(1), Par.unit(1))(_ + _)

這也點出了另一個問題,我們沒有選項,能明確讓程式知道我們真的要把平行化運行從主執行緒 fork 出來,因此我們可以在定義一個 fork function 表達 fork 的決心,

def fork[A](a: => Par[A]): Par[A]

有了 fork,我們可以讓 map2 保持 strict,最後我們的 sum 會長的下面這樣。

def sum(ints: IndexedSeq[Int]): Par[Int] =
  if ints.size <= 1 then
    Par.unit(ints.headOption.getOrElse(0))
  else
    val (l, r) = ints.splitAt(ints.size / 2)
    Par.map2(Par.fork(sum(l)), Par.fork(sum(r)))(_ + _)

現在來看一下之前的 unit function 要 strict 還是 lazy,因為有 fork function 的關係,我們可以讓 unit 變為 strict,然後使用 fork 和 unit 來實作 lazyUnit function,

def unit[A](a:  A): Par[A]
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

最後來看一下 fork 該如何實現,因為 Par 只是個容器型態,表示我們會用多執行緒運行程式,

如果 fork 的實作會立即建立多執行緒,我們會失去控制平行化策略的彈性,從 基於介面而非實現開發 這個觀點來看,我們需要一個 function 來負責啟動,倘若不立即建立多執行緒,那我們就更需要一個 function 來啟動,

所以我們可以把 get 改名為 run,來命令我們的 library 開始運行,並取回運行結果。

def run[A](a: Par[A]): A

統整一下 Par 容器型態的 function

我們的平行化 library 中定義了 Par[A] 容器型態,然後核心 API 中包含了以下 function,

def unit[A](a:  A): Par[A]
def map2[A, B, C](a: Par[A], b: Par[B])(f: (A, B) => C): Par[C]
def fork[A](a: => Par[A]): Par[A]
def lazyUnit[A](a: => A): Par[A]
def run[A](a: Par[A]): A

我們也簡單定義了每個 function 的職責:

  • unit 使一般的值變成平行化計算。
  • map2 使用 high-order function 把 2 個平行化計算的合併成 1 個平行化計算。
  • fork 將某一個平行化計算從主執行緒 fork 出來,此計算不會馬上運行,直到被 run 觸發。
  • lazyUnit 包裹一個還未計算的表達式成為平行化計算並 fork 它
  • run 實際運行平行化計算並從中提取值

run

run 需要真正的開始執行平行化計算了,我們或許可以自行實現底層 API,但是,Java 標準庫中已經有 java.util.concurrent.ExecutorService 可供我們使用,如果把它翻譯成 Scala 的話 API 定義如下:

trait ExecutorService:
  def submit[A](a: Callable[A]): Future[A]

trait Callable[A]:
  def call: A

trait Future[A]:
  def get: A
  def get(timeout: Long, unit: TimeUnit): A
  def cancel(mayInterruptIfRunning: Boolean): Boolean
  def isDone: Boolean
  def isCancelled: Boolean

ExecutorService 需提交 Callable 然後回傳 Future,Future 隱含著會執行在不同執行緒上,然後透過會等待的 get 取得值,

我們嘗試讓 run 存取 ExecutorService,

def run[A](s: ExecutorService)(a: Par[A]): A

所以 Par[A] 可以想像成代表了 ExecutorService => A,因為 A 代表了要從 Future 中 get,為了更好的推展延遲運行,以及用到 Future 那些 function,我們可以把 Par[A] 型態改成這樣 ExecutorService => Future[A],然後讓我們的 run 回傳 Future,

type Par[A] = ExecutorService => Future[A]

def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)

type 的意思可以參考 純粹的 functional 狀態 (1) 中的 更好的 RNG 型態表達方式 段落。

現在我們的 Par[A] 是 function ExecutorService => Future[A] 的別名,操作 Par 型態也不用擔心會立即建立新的執行緒,除非我們調用 run 並提供 ExecutorService,才會取得實際執行在不同執行緒上的 Future 物件。


明天來持續精煉 API。

tshine73
tshine73
文章: 53

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *