SCD2 Compare-based – Chi tiết các bước xử lý¶
🔴 Rất quan trọng · Data Platform
Tổng quan 3 cách implement SCD2¶
| Cách | Mô tả | Khi nào dùng |
|---|---|---|
| 1. Compare-based ⭐ | Load source + target, compare, phân loại I/U/D, MERGE | Phổ biến nhất, không phụ thuộc CDC |
| 2. CDC-based | Source đã phân loại I/U/D sẵn trong CDC event | Có CDC infrastructure (DMS, Debezium) |
| 3. Delta MERGE đơn giản | 1 câu MERGE trực tiếp | Bảng nhỏ-trung bình, logic đơn giản |
Cách 1: Compare-based (Chi tiết)¶
Flow tổng quan¶
df_source ──┐
├──→ [Compare] ──→ df_change (I/U/D) ──→ [5a: MERGE close] ──→ [5b: APPEND new] ──→ Target
df_target ──┘ JOIN on biz_key rec_st = 0 rec_st = 1
(rec_st=1) hash compare
Step 1: Load source (df_source)¶
Lấy toàn bộ bản ghi hiện tại từ source database. Đây là "truth" của ngày hôm nay.
Step 2: Load target current (df_target)¶
Lấy các bản ghi đang active trên bảng đích. Filter bằng 1 trong 2 cách:
| Cách filter | Điều kiện |
|---|---|
| Theo rec_st | WHERE rec_st = 1 |
| Theo end_dt | WHERE end_dt = '9999-12-31' |
Step 3: Compare & Classify¶
FULL OUTER JOIN df_source vs df_target trên business key:
| Loại | Điều kiện | Ý nghĩa |
|---|---|---|
| I (Insert) | source.biz_key NOT IN target | Khách hàng mới |
| U (Update) | Cả hai có, nhưng hash(tracked_cols) khác nhau |
Dữ liệu thay đổi |
| D (Delete) | target.biz_key NOT IN source | KH đóng TK / xóa |
| - (Skip) | Cả hai có, hash bằng nhau | Không thay đổi |
💡 So sánh bằng
hash(tracked_cols)nhanh hơn compare từng cột. Dùng MD5/SHA256.
Step 4: Build df_change¶
Gán change_type = I/U/D, thêm:
- eff_dt = data_date
- end_dt = '9999-12-31'
- rec_st = 1
Step 5: Apply changes (2 bước)¶
5a: MERGE – Close old records (U và D)¶
MERGE INTO dim_customer AS tgt
USING df_change AS src
ON tgt.customer_id = src.customer_id AND tgt.rec_st = 1
WHEN MATCHED AND src.change_type IN ('U', 'D')
THEN UPDATE SET
tgt.end_dt = src.data_date,
tgt.rec_st = 0
5b: APPEND – Insert new records (I và U)¶
df_change \
.filter("change_type IN ('I', 'U')") \
.withColumn("eff_dt", lit(data_date)) \
.withColumn("end_dt", lit("9999-12-31")) \
.withColumn("rec_st", lit(1)) \
.write.mode("append") \
.saveAsTable("dim_customer")
⚠️ Tại sao cần 2 bước? Delta MERGE chỉ làm được 1 action per matched row. SCD2 cần 2 thao tác cho Update: (a) close old record, (b) insert new record. Nên phải tách thành MERGE + APPEND.
MERGE action tóm tắt¶
| Change type | 5a: MERGE | 5b: APPEND |
|---|---|---|
| I | — | INSERT new row (eff_dt=data_date, end_dt=9999-12-31, rec_st=1) |
| U | Close old (end_dt=data_date, rec_st=0) | INSERT new row với giá trị mới |
| D | Close old (end_dt=data_date, rec_st=0) | Không insert |
Ưu nhược điểm¶
| Ưu điểm | Nhược điểm |
|---|---|
| Xử lý được I/U/D đầy đủ | Cần load toàn bộ source + target current |
| Không phụ thuộc CDC | Với bảng lớn (100M+ rows), compare có thể chậm |
| Dễ debug (so sánh 2 DataFrame) | Cần hash hoặc compare nhiều cột |