持續精煉我們的核心 API
讓我們開始實作其他核心 API 吧!以下為完整程式:
import java.util.concurrent.*
object Par {
opaque type Par[A] = ExecutorService => Future[A]
private case class UnitFuture[A](get: A) extends Future[A]:
def isDone = true
def get(timeout: Long, units: TimeUnit) = get
def isCancelled = false
def cancel(evenIfRunning: Boolean): Boolean = false
def unit[A](a: A): Par[A] =
_ => UnitFuture(a)
extension[A] (pa: Par[A])
def run(s: ExecutorService): Future[A] = pa(s)
def map2[B, C](pb: Par[B])(f: (A, B) => C): Par[C] =
(es: ExecutorService) =>
val af = pa(es)
val bf = pb(es)
UnitFuture(f(af.get, bf.get))
def fork[A](a: => Par[A]): Par[A] =
(es: ExecutorService) =>
es.submit(new Callable[A] {
def call = a(es).get
})
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
}
首先 unit
是一個把一般的值轉為 Future,該 Future 不需要使用 ExecutorService 來運行,代表也不需要支持 cancel 功能,Future 的 get 也是直接回傳 unit 的入參,
也因為 Future 並不是純粹的 functional 介面,且它有依賴 side effect 做事,基於此,我們將 Future 類別擴充成新類別 UnitFuture, 讓 unit 的實作就是直接 new UnitFuture 即可;
在來我們用了 extension 來擴展 Par,把 run 和 map2 都放在下面,讓我們能直接以 Par 來呼叫 run 和 map2;
extension 的說明可看 純粹的 functional 狀態 (2)。
map2
也是在取得 Par[A] 和 Par[B] 的結果回傳 UnitFuture;
fork
我們先用一個最簡單和自然的方式來實作。
Exercise D17-1
使用 lazyUnit 實作 asyncF function,asyncF 能使用入參 f: A => B
把回傳值 high-order function 中的 A 轉成 Par[B]。
def asyncF[A, B](f: A => B): A => Par[B]
接下來我們看如何透過核心 API 來做各種組合操作,假設我們有 Par[List[Int]]
,然後我們想針對 Par 中的 List 做排序,
def sortPar(parList: Par[List[Int]]): Par[List[Int]]
首先我們當然可以 run Par,取得 List 結果後在排序,最後在用 unit 包裹起來,但我們應當避免調用 run,目前核心 API 中能 Par 值的 function 是 map2,所以我們應該用它來實現,
def sortPar(parList: Par[List[Int]]): Par[List[Int]] =
parList.map2(unit(()))((a, _) => a.sorted)
這裡可以看到 map2 的另一個參數不重要,所以我們用 unit(())
來產生個空 Par 給 map2,既然我們只在意一個參數,我們可以抽象這個概念提升成 map function(我將此 map function 放到 extension 下),
def map[B](f: A => B): Par[B] =
map2(unit(()))((a, _) => f(a))
現在我們的 sortPar 可以這樣實現。
def sortPar(parList: Par[List[Int]]): Par[List[Int]] =
parList.map(_.sorted)
更多的組合
我們可以使用 f: A => B
來平行化運行的轉換 list 中所有元素嗎?不像 map2 是合併 2 個 Par 成一個 Par,這裡需要合併 N 個 Par 成一個 Par,首先我們可以用剛剛的 asyncF 把 List[A]
轉成新的 List[Par[B]]
,但下一步呢?
def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] =
val fbs: List[Par[B]] = ps.map(asyncF(f))
???
看起來我們需要個 function 把 List[Par[B]]
轉成 Par[List[B]]
,
def sequence[A](pas: List[Par[A]]): Par[List[A]] =
pas.foldRight(unit(List.empty[A]))((pa, acc) => pa.map2(acc)(_ :: _))
sequence
使用了 foldRight 來和 map2 來合併 List 中的所有值,
foldRight 的說明可以看 Scala 下的 Functional 資料結構 (2)。
現在我們可以完善 parMap 了,這裡我們多調用了 fork
來讓它使用新的邏輯執行緒運行,即使我們的入參是很大的 List,parMap 會立即回傳 Par[List[B]] 回來。
def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] = fork:
val fbs: List[Par[B]] = ps.map(asyncF(f))
sequence(fbs)
Exercise D17-2
(Hard) 實作 parFilter,它能平行化的過濾 List 中所有元素。
def parFilter[A](l: List[A])(f: A => Boolean): Par[List[A]]
API 的代數性質
在 API 實作過程中,我們都是先把 function 的型態定義好,然後在隨著型態去實現它,例如 map
就是用了 map2
和 unit
來實現,這些是自然的演變,也可以說是我們簡化了代數方程式。我們讓 API 有了代數性質,或者說當我們有了基礎定律後,然後我們照著這個代數性的遊戲規則做一連串操作;
就像所有設計選擇,定律選擇都有後果,它限制了操作的意義以及何種實現可行,讓我們用一個例子看一下 mapping 的定律。
mapping 定律
首先我們創造一個看起來合理的定律如下(定律通常都是從具體的方程式範例而來),
map(unit(1))(_ + 1) == unit(2)
unit(1)
和 _ + 1
與 unit(2)
相等,以 Par 來說,就是 Future 裡的值相等,
然後定律和 function 有許多共同點,就像我們一般化 (抽象) function 那樣,我們也可以一般化定律,
map(unit(x))(f) == unit(f(x))
這裡不只限定了 1 和 _ + 1
,我們可以用任何的 f 來讓等號 2 邊相等,接下來我們可以繼續簡化定律,若我們將 f 替換成 function id 的話,
def id[A](a: A): A = a
我們可以簡化方程式 2 邊的表達式然後最後得到新的定律,
map(unit(x))(id) == unit(id(x))
map(unit(x))(id) == unit(x) // 簡化
map(y)(id) == y // 用 y 將 2 邊的 unit(x) 替換
我們新的定律現在不需要 unit 這個 function 了。
邏輯上來看,我們能自由做這些轉換,是因為 map 不可能對其接收的 function type 不同而有不同行為,因此,當給定 map(y)(id) = y
時,它等同於 map(unit(x))(id) == unit(id(x))
,
這個新的第二個定律通常也被稱為 free theorem。
總結
這幾天我們設計了平行化計算和非同步 library,重點是透過設計 API,來了解遭遇問題時該怎麼以 purely functional 的設計思維去解決,我們使用容器型態 Par 來做為平行化計算的依據,將 執行(run) 這個真正初始化執行緒的動作分離出來,最後設計出核心 API 來使 Par 能自由組合、操作。