Delta Lake 基本操作

前言

前一篇文章 delta lake 101 簡單的介紹了什麼是 Delta Lake,接下來這篇文章就來實際操作一下 Delta Lake 啦,透過基本操作來體驗一下 Delta Lake 的主要特點,今天我們是用 jupyter notebook 的方式來執行,然後分散式引擎用的是 Spark,就讓我們開始吧!

環境準備

Java 和 Python

若你是第一次執行 Spark,還沒安裝 Java 和 Python 的話,建議參考 第一個 Spark Application 文章所寫的安裝 Java 和 Python。

安裝 Python Library

相關程式都放在我的 Github 底下了,clone 後,使用以下指令安裝 python library。

cd apps/delta_lake_tutorial
pip install -r requirements.txt

啟動 Jupyter Lab

切換到 notebook 資料夾後,執行 Jupyter Lab。

cd apps/delta_lake_tutorial/notebook
jupyter lab

基本操作

Init Spark application

在建立 Spark session 時,只要在 config 中設定好 spark.sql.extensions, spark.sql.catalog.spark_catalogorg.apache.spark.sql.delta.catalog.DeltaCatalog 後,就可以使用 Delta Lake 了,

因為我們並沒有啟動 Spark Cluster 給 notebook 使用,所以這裡的 Spark session 是以 local 的方式啟動,因此多加 1 個 config spark.sql.warehouse.dir 來指定資料儲存路徑。

若加入 config spark.driver.extraJavaOptions 和呼叫 function enableHiveSupport,可啟用 hive。

import pyspark
from delta import *

builder = (
    pyspark.sql.SparkSession.builder.appName("delta_lake_tutorial")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )    
    .config("spark.sql.warehouse.dir", "../spark-warehouse")    
    # .config("spark.driver.extraJavaOptions", "-Dderby.system.home=../")        
    # .enableHiveSupport()
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Create delta table

下一步就是建立 delta table,

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("../data/people_10m.csv")


# If you know the table does not already exist, you can call this instead:
df.write.format("delta").mode("Overwrite").saveAsTable("people_10m")

建立完成後,實際的資料會存放在 Spark 預設的資料儲存位置 spark-warehouse 下。

Describe created table

這時就可以用以下程式查看 table 資訊,尤其 describe table EXTENDED 能看到更多細節。

spark.sql("SHOW tables").show()

spark.sql("describe table EXTENDED people_10m").show(100, False)
+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|  default|people_10m|      false|
+---------+----------+-----------+

+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                             |comment|
+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+
|id                          |int                                                                                                                   |NULL   |
|firstName                   |string                                                                                                                |NULL   |
|middleName                  |string                                                                                                                |NULL   |
|lastName                    |string                                                                                                                |NULL   |
|gender                      |string                                                                                                                |NULL   |
|birthDate                   |timestamp                                                                                                             |NULL   |
|ssn                         |string                                                                                                                |NULL   |
|salary                      |int                                                                                                                   |NULL   |
|                            |                                                                                                                      |       |
|# Detailed Table Information|                                                                                                                      |       |
|Name                        |spark_catalog.default.people_10m                                                                                      |       |
|Type                        |MANAGED                                                                                                               |       |
|Location                    |file:/Users/tshine73/geek/engineer/repository/docker-spark-cluster/apps/delta_lake_tutorial/spark-warehouse/people_10m|       |
|Provider                    |delta                                                                                                                 |       |
|Table Properties            |[delta.minReaderVersion=1,delta.minWriterVersion=2]                                                                   |       |
+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+

Query table

只要取得 df,就可用 show() 方式查看資料,資料有 1000 筆。

people_df = spark.read.table("people_10m")

people_df.show()
people_df.count()

Update a table

Delta Lake 的 update 可選擇用 SQL-formatted 或 Spark SQL function 的方式指定更新條件,這裡我們把 gender 欄位的代號改為完整的字串,

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

然後在看一下資料是否有成功被更新。

people_df = spark.read.table("people_10m")

people_df.show()

Merge table

Delta Lake 也支持是用 merge 的方式更新資料,可以留意到只要 Spark df 只要沒有調用 write function,該資料並不會出現在 spark-warehouse 底下。

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Query with conditions

想當然爾我們可以查詢一下剛剛 merge 進去的資料,共 6 筆資料被新增,資料筆數是 1006 筆。

df = spark.read.table("people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
df_filtered.show()

df_filtered.count()

Delete from a table

刪除資料時,Delta Lake 一樣可以選擇 SQL-formatted 或 Spark SQL function 來做為條件指定的方式,這裡我們進行刪除操作 2 次,最終是把生日年份小於 1960 年的都刪掉。

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

確認一下資料是否被刪除,其資料筆數縮小為 839。

df = spark.read.table("people_10m")
cnt = df.count()
print(f"the data count: {cnt}")
the data count: 839

Query an earlier version of the table (time travel)

然後就要來體驗一下 Delta Lake 強大的功能之一了,time travel ,如同 delta lake 101 所述,它能從快照歷史中查詢到舊的資料,首先我們先來看一下我們有幾個快照歷史版本,

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "people_10m")
deltaHistory = deltaTable.history()

deltaHistory.show(100, False)

共有 5 個版本,可透過 operation 欄位知道這幾個版本分別了是做了什麼操作,因為 delete 操作執行了 2 次,1 次是刪除生日年份小於 1955,1 次是小於 1960,因此有 2 個版本。

Query older version

最後我們可以從這些快照歷史中取得舊的資料,這裡我們查詢 gender 欄位改為單字的版本 1,

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "people_10m")
deltaHistory = deltaTable.history()


df = spark.read.option('versionAsOf', 1).table("people_10m")
# # Or:
# df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("people_10m")

df.show()
df.count()

Save older version as new table

然後我們可以把最初的版本在存成新 table 並命名為 people_10m_bronze

df = spark.read.option('versionAsOf', 1).table("people_10m")
df.write.format("delta").mode("Overwrite").saveAsTable("people_10m_bronze")

同時在 spark-warehouse 下我們看到對應 table 名稱的資料夾被建立了,裡頭的資料就是透過 time travel 從 people_10m 取回的舊快照資料。

Clean up snapshots with VACUUM

time travel 的快照歷史預設會保留 7 天,除非你執行 Delta Lake 的專屬 VACUUM 指令,否則對應的空間不會被釋放。

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "people_10m")
deltaTable.vacuum()

結論

以上就是在 Spark 上執行的 Delta Lake 基本操作,Delta Lake 用起來真的挺方便,因為預設就是用 parquet 儲存,你也不用特別設定,又有 time travel 這種功能保留歷史資料,若你平常會用 SCD 保存歷史資料,Delta Lake 的 time travel 功能可以讓我們重新考量、設計我們的 data model,讓資料更完整且容易取用。

下一篇就來結合一下如何將 notebook 掛在既有的 Docker Spark Application 上。

tshine73
tshine73
文章: 67

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *