Purely Function 的平行化 (2)

持續精煉我們的核心 API

讓我們開始實作其他核心 API 吧!以下為完整程式:

import java.util.concurrent.*

object Par {
  opaque type Par[A] = ExecutorService => Future[A]

  private case class UnitFuture[A](get: A) extends Future[A]:
    def isDone = true
    def get(timeout: Long, units: TimeUnit) = get
    def isCancelled = false
    def cancel(evenIfRunning: Boolean): Boolean = false

  def unit[A](a: A): Par[A] =
    _ => UnitFuture(a)

  extension[A] (pa: Par[A])
    def run(s: ExecutorService): Future[A] = pa(s)

    def map2[B, C](pb: Par[B])(f: (A, B) => C): Par[C] =
      (es: ExecutorService) =>
        val af = pa(es)
        val bf = pb(es)
        UnitFuture(f(af.get, bf.get))


  def fork[A](a: => Par[A]): Par[A] =
    (es: ExecutorService) =>
      es.submit(new Callable[A] {
        def call = a(es).get
      })

  def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
}

首先 unit 是一個把一般的值轉為 Future,該 Future 不需要使用 ExecutorService 來運行,代表也不需要支持 cancel 功能,Future 的 get 也是直接回傳 unit 的入參,

也因為 Future 並不是純粹的 functional 介面,且它有依賴 side effect 做事,基於此,我們將 Future 類別擴充成新類別 UnitFuture, 讓 unit 的實作就是直接 new UnitFuture 即可;

在來我們用了 extension 來擴展 Par,把 run 和 map2 都放在下面,讓我們能直接以 Par 來呼叫 run 和 map2;

extension 的說明可看 純粹的 functional 狀態 (2)

map2 也是在取得 Par[A] 和 Par[B] 的結果回傳 UnitFuture;

fork 我們先用一個最簡單和自然的方式來實作。


Exercise D17-1

使用 lazyUnit 實作 asyncF function,asyncF 能使用入參 f: A => B 把回傳值 high-order function 中的 A 轉成 Par[B]。

def asyncF[A, B](f: A => B): A => Par[B]

接下來我們看如何透過核心 API 來做各種組合操作,假設我們有 Par[List[Int]],然後我們想針對 Par 中的 List 做排序,

def sortPar(parList: Par[List[Int]]): Par[List[Int]]

首先我們當然可以 run Par,取得 List 結果後在排序,最後在用 unit 包裹起來,但我們應當避免調用 run,目前核心 API 中能 Par 值的 function 是 map2,所以我們應該用它來實現,

def sortPar(parList: Par[List[Int]]): Par[List[Int]] =
  parList.map2(unit(()))((a, _) => a.sorted)

這裡可以看到 map2 的另一個參數不重要,所以我們用 unit(()) 來產生個空 Par 給 map2,既然我們只在意一個參數,我們可以抽象這個概念提升成 map function(我將此 map function 放到 extension 下),

def map[B](f: A => B): Par[B] =
  map2(unit(()))((a, _) => f(a))

現在我們的 sortPar 可以這樣實現。

def sortPar(parList: Par[List[Int]]): Par[List[Int]] =
  parList.map(_.sorted)

更多的組合

我們可以使用 f: A => B 來平行化運行的轉換 list 中所有元素嗎?不像 map2 是合併 2 個 Par 成一個 Par,這裡需要合併 N 個 Par 成一個 Par,首先我們可以用剛剛的 asyncF 把 List[A] 轉成新的 List[Par[B]],但下一步呢?

def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] = 
    val fbs: List[Par[B]] = ps.map(asyncF(f))
  ???

看起來我們需要個 function 把 List[Par[B]] 轉成 Par[List[B]]

def sequence[A](pas: List[Par[A]]): Par[List[A]] =
  pas.foldRight(unit(List.empty[A]))((pa, acc) => pa.map2(acc)(_ :: _))

sequence 使用了 foldRight 來和 map2 來合併 List 中的所有值,

foldRight 的說明可以看 Scala 下的 Functional 資料結構 (2)

現在我們可以完善 parMap 了,這裡我們多調用了 fork 來讓它使用新的邏輯執行緒運行,即使我們的入參是很大的 List,parMap 會立即回傳 Par[List[B]] 回來。

def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] = fork:
  val fbs: List[Par[B]] = ps.map(asyncF(f))
  sequence(fbs)

Exercise D17-2

(Hard) 實作 parFilter,它能平行化的過濾 List 中所有元素。

def parFilter[A](l: List[A])(f: A => Boolean): Par[List[A]]

API 的代數性質

在 API 實作過程中,我們都是先把 function 的型態定義好,然後在隨著型態去實現它,例如 map 就是用了 map2unit 來實現,這些是自然的演變,也可以說是我們簡化了代數方程式。我們讓 API 有了代數性質,或者說當我們有了基礎定律後,然後我們照著這個代數性的遊戲規則做一連串操作;

就像所有設計選擇,定律選擇都有後果,它限制了操作的意義以及何種實現可行,讓我們用一個例子看一下 mapping 的定律。

mapping 定律

首先我們創造一個看起來合理的定律如下(定律通常都是從具體的方程式範例而來),

map(unit(1))(_ + 1) == unit(2)

unit(1)_ + 1unit(2) 相等,以 Par 來說,就是 Future 裡的值相等,

然後定律和 function 有許多共同點,就像我們一般化 (抽象) function 那樣,我們也可以一般化定律,

map(unit(x))(f) == unit(f(x))

這裡不只限定了 1 和 _ + 1,我們可以用任何的 f 來讓等號 2 邊相等,接下來我們可以繼續簡化定律,若我們將 f 替換成 function id 的話,

def id[A](a: A): A = a

我們可以簡化方程式 2 邊的表達式然後最後得到新的定律,

map(unit(x))(id) == unit(id(x))
map(unit(x))(id) == unit(x)     // 簡化
map(y)(id) == y                 // 用 y 將 2 邊的 unit(x) 替換

我們新的定律現在不需要 unit 這個 function 了。

邏輯上來看,我們能自由做這些轉換,是因為 map 不可能對其接收的 function type 不同而有不同行為,因此,當給定 map(y)(id) = y 時,它等同於 map(unit(x))(id) == unit(id(x))

這個新的第二個定律通常也被稱為 free theorem

總結

這幾天我們設計了平行化計算和非同步 library,重點是透過設計 API,來了解遭遇問題時該怎麼以 purely functional 的設計思維去解決,我們使用容器型態 Par 來做為平行化計算的依據,將 執行(run) 這個真正初始化執行緒的動作分離出來,最後設計出核心 API 來使 Par 能自由組合、操作。


Exercise answer

tshine73
tshine73
文章: 53

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *