前言
前一篇文章 delta lake 101 簡單的介紹了什麼是 Delta Lake,接下來這篇文章就來實際操作一下 Delta Lake 啦,透過基本操作來體驗一下 Delta Lake 的主要特點,今天我們是用 jupyter notebook 的方式來執行,然後分散式引擎用的是 Spark,就讓我們開始吧!
環境準備
Java 和 Python
若你是第一次執行 Spark,還沒安裝 Java 和 Python 的話,建議參考 第一個 Spark Application 文章所寫的安裝 Java 和 Python。
安裝 Python Library
相關程式都放在我的 Github 底下了,clone 後,使用以下指令安裝 python library。
cd apps/delta_lake_tutorial
pip install -r requirements.txt
啟動 Jupyter Lab
切換到 notebook 資料夾後,執行 Jupyter Lab。
cd apps/delta_lake_tutorial/notebook
jupyter lab
基本操作
Init Spark application
在建立 Spark session 時,只要在 config 中設定好 spark.sql.extensions
, spark.sql.catalog.spark_catalog
和 org.apache.spark.sql.delta.catalog.DeltaCatalog
後,就可以使用 Delta Lake 了,
因為我們並沒有啟動 Spark Cluster 給 notebook 使用,所以這裡的 Spark session 是以 local 的方式啟動,因此多加 1 個 config spark.sql.warehouse.dir
來指定資料儲存路徑。
若加入 config
spark.driver.extraJavaOptions
和呼叫 functionenableHiveSupport
,可啟用 hive。
import pyspark
from delta import *
builder = (
pyspark.sql.SparkSession.builder.appName("delta_lake_tutorial")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
.config("spark.sql.warehouse.dir", "../spark-warehouse")
# .config("spark.driver.extraJavaOptions", "-Dderby.system.home=../")
# .enableHiveSupport()
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Create delta table
下一步就是建立 delta table,
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("../data/people_10m.csv")
# If you know the table does not already exist, you can call this instead:
df.write.format("delta").mode("Overwrite").saveAsTable("people_10m")
建立完成後,實際的資料會存放在 Spark 預設的資料儲存位置 spark-warehouse
下。

Describe created table
這時就可以用以下程式查看 table 資訊,尤其 describe table EXTENDED
能看到更多細節。
spark.sql("SHOW tables").show()
spark.sql("describe table EXTENDED people_10m").show(100, False)
+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
| default|people_10m| false|
+---------+----------+-----------+
+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+
|col_name |data_type |comment|
+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+
|id |int |NULL |
|firstName |string |NULL |
|middleName |string |NULL |
|lastName |string |NULL |
|gender |string |NULL |
|birthDate |timestamp |NULL |
|ssn |string |NULL |
|salary |int |NULL |
| | | |
|# Detailed Table Information| | |
|Name |spark_catalog.default.people_10m | |
|Type |MANAGED | |
|Location |file:/Users/tshine73/geek/engineer/repository/docker-spark-cluster/apps/delta_lake_tutorial/spark-warehouse/people_10m| |
|Provider |delta | |
|Table Properties |[delta.minReaderVersion=1,delta.minWriterVersion=2] | |
+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+
Query table
只要取得 df,就可用 show()
方式查看資料,資料有 1000 筆。
people_df = spark.read.table("people_10m")
people_df.show()
people_df.count()

Update a table
Delta Lake 的 update 可選擇用 SQL-formatted 或 Spark SQL function 的方式指定更新條件,這裡我們把 gender 欄位的代號改為完整的字串,
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
然後在看一下資料是否有成功被更新。
people_df = spark.read.table("people_10m")
people_df.show()

Merge table
Delta Lake 也支持是用 merge 的方式更新資料,可以留意到只要 Spark df 只要沒有調用 write
function,該資料並不會出現在 spark-warehouse
底下。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Query with conditions
想當然爾我們可以查詢一下剛剛 merge 進去的資料,共 6 筆資料被新增,資料筆數是 1006 筆。
df = spark.read.table("people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
df_filtered.show()
df_filtered.count()

Delete from a table
刪除資料時,Delta Lake 一樣可以選擇 SQL-formatted 或 Spark SQL function 來做為條件指定的方式,這裡我們進行刪除操作 2 次,最終是把生日年份小於 1960 年的都刪掉。
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
確認一下資料是否被刪除,其資料筆數縮小為 839。
df = spark.read.table("people_10m")
cnt = df.count()
print(f"the data count: {cnt}")
the data count: 839
Query an earlier version of the table (time travel)
然後就要來體驗一下 Delta Lake 強大的功能之一了,time travel ,如同 delta lake 101 所述,它能從快照歷史中查詢到舊的資料,首先我們先來看一下我們有幾個快照歷史版本,
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "people_10m")
deltaHistory = deltaTable.history()
deltaHistory.show(100, False)
共有 5 個版本,可透過 operation 欄位知道這幾個版本分別了是做了什麼操作,因為 delete 操作執行了 2 次,1 次是刪除生日年份小於 1955,1 次是小於 1960,因此有 2 個版本。

Query older version
最後我們可以從這些快照歷史中取得舊的資料,這裡我們查詢 gender 欄位改為單字的版本 1,
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "people_10m")
deltaHistory = deltaTable.history()
df = spark.read.option('versionAsOf', 1).table("people_10m")
# # Or:
# df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("people_10m")
df.show()
df.count()

Save older version as new table
然後我們可以把最初的版本在存成新 table 並命名為 people_10m_bronze
,
df = spark.read.option('versionAsOf', 1).table("people_10m")
df.write.format("delta").mode("Overwrite").saveAsTable("people_10m_bronze")
同時在 spark-warehouse
下我們看到對應 table 名稱的資料夾被建立了,裡頭的資料就是透過 time travel 從 people_10m
取回的舊快照資料。

Clean up snapshots with VACUUM
time travel 的快照歷史預設會保留 7 天,除非你執行 Delta Lake 的專屬 VACUUM 指令,否則對應的空間不會被釋放。
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "people_10m")
deltaTable.vacuum()
結論
以上就是在 Spark 上執行的 Delta Lake 基本操作,Delta Lake 用起來真的挺方便,因為預設就是用 parquet 儲存,你也不用特別設定,又有 time travel 這種功能保留歷史資料,若你平常會用 SCD 保存歷史資料,Delta Lake 的 time travel 功能可以讓我們重新考量、設計我們的 data model,讓資料更完整且容易取用。
下一篇就來結合一下如何將 notebook 掛在既有的 Docker Spark Application 上。