Event-Driven Architecture(事件驅動架構)
是什麼?
Event-Driven Architecture(EDA)是一種以事件為核心的架構模式。系統中的元件透過產生和消費事件來溝通,而不是直接呼叫彼此。
ℹ️三個關鍵概念
Event Notification — 通知其他服務「某件事發生了」。Event-Carried State Transfer — 事件中攜帶足夠的資料讓消費者不需要回呼。Event Sourcing — 用事件序列作為資料的唯一真相來源。
核心觀念
Event Sourcing vs 傳統 CRUD
| 面向 | 傳統 CRUD | Event Sourcing | |------|-----------|---------------| | 儲存 | 最新狀態 | 事件序列 | | 歷史 | 覆蓋舊資料 | 完整保留 | | 查詢 | 直接查詢 | 從事件重建狀態 | | 除錯 | 只看到現在 | 可以重播到任何時間點 | | 審計 | 需要額外的 audit log | 事件本身就是 audit log |
CQRS(Command Query Responsibility Segregation)
CQRS 將「寫入」和「讀取」分離成兩個獨立的模型:
- Command Side:處理寫入,將變更存為事件(Event Store)
- Query Side:從事件建立讀取優化的投影(Read Model / Projection)
- 讀寫可以獨立擴展:讀多寫少就擴展 Query Side
事件設計原則
- 事件名稱用過去式:
OrderPlaced、PaymentCompleted(已發生的事實) - 事件是不可變的(Immutable)— 已發生的事實不能修改
- 事件應包含足夠的上下文資訊(避免消費者需要回呼取資料)
- 事件的 schema 要考慮向後相容(新版本的消費者能處理舊事件)
常見誤區
⚠️常犯錯誤
- 所有東西都用 Event Sourcing(簡單的 CRUD 不需要,複雜度會大幅增加)
- 事件中包含太少資訊(消費者被迫回呼生產者取資料,重新產生耦合)
- 沒有處理事件的順序問題(同一個 entity 的事件必須按序處理)
- 把 Event Sourcing 和 Message Queue 搞混(Event Store 是永久儲存,MQ 是暫時性的傳輸通道)
執行流程
Command 進入
使用者操作產生 Command(如 PlaceOrder)
產生事件
Command Handler 驗證後產生事件(如 OrderPlaced)
持久化事件
事件寫入 Event Store(唯一真相來源)
發布事件
事件發布到 Message Broker 通知其他服務
更新投影
Event Handler 更新 Read Model 供查詢使用
流程解讀:在 Event Sourcing + CQRS 架構中,寫入流程從 Command 開始。Command Handler 執行業務邏輯驗證後產生一或多個事件,這些事件是系統的唯一真相。事件被持久化到 Event Store 後,發布到 Message Broker。下游的 Event Handler 消費事件並更新各種讀取優化的投影(如 SQL 報表、Elasticsearch 索引)。讀取時直接查詢投影,不需要重播事件。
程式碼範例
C# 版本
// Event 定義
public record OrderPlaced(
string OrderId, string CustomerId,
List<OrderItem> Items, decimal Total, DateTime OccurredAt);
public record OrderShipped(
string OrderId, string TrackingNumber, DateTime OccurredAt);
// Event Store(簡化版)
public class EventStore
{
private readonly List<DomainEvent> _events = new();
public void Append(string streamId, DomainEvent evt)
{
_events.Add(evt);
}
public IEnumerable<DomainEvent> GetStream(string streamId)
{
return _events.Where(e => e.StreamId == streamId)
.OrderBy(e => e.Version);
}
}
// Aggregate 從事件重建狀態
public class Order
{
public string Id { get; private set; }
public string Status { get; private set; }
public decimal Total { get; private set; }
public static Order FromEvents(IEnumerable<DomainEvent> events)
{
var order = new Order();
foreach (var evt in events)
order.Apply(evt);
return order;
}
private void Apply(DomainEvent evt) => _ = evt switch
{
OrderPlaced e => (Id, Status, Total) = (e.OrderId, "Placed", e.Total),
OrderShipped e => Status = "Shipped",
_ => throw new InvalidOperationException()
};
}TypeScript 版本
// Event 定義
interface OrderPlaced {
type: "OrderPlaced";
orderId: string;
customerId: string;
items: { productId: string; quantity: number; price: number }[];
total: number;
occurredAt: string;
}
interface OrderShipped {
type: "OrderShipped";
orderId: string;
trackingNumber: string;
occurredAt: string;
}
type OrderEvent = OrderPlaced | OrderShipped;
// Event Store
class EventStore {
private streams = new Map<string, OrderEvent[]>();
append(streamId: string, event: OrderEvent) {
const stream = this.streams.get(streamId) ?? [];
stream.push(event);
this.streams.set(streamId, stream);
}
getStream(streamId: string): OrderEvent[] {
return this.streams.get(streamId) ?? [];
}
}
// 從事件重建狀態
function rebuildOrder(events: OrderEvent[]) {
let state = { id: "", status: "", total: 0 };
for (const event of events) {
switch (event.type) {
case "OrderPlaced":
state = { id: event.orderId, status: "Placed", total: event.total };
break;
case "OrderShipped":
state.status = "Shipped";
break;
}
}
return state;
}
// CQRS — 讀取側的投影更新
async function handleOrderPlaced(event: OrderPlaced) {
// 更新讀取模型(例如 PostgreSQL 的報表 table)
await db.query(
"INSERT INTO order_summary (id, customer_id, total, status) VALUES ($1, $2, $3, $4)",
[event.orderId, event.customerId, event.total, "Placed"]
);
}Python 版本
from dataclasses import dataclass
from datetime import datetime
from typing import Union
# Event 定義
@dataclass(frozen=True) # frozen = immutable
class OrderPlaced:
order_id: str
customer_id: str
items: list
total: float
occurred_at: datetime
@dataclass(frozen=True)
class OrderShipped:
order_id: str
tracking_number: str
occurred_at: datetime
OrderEvent = Union[OrderPlaced, OrderShipped]
# Event Store
class EventStore:
def __init__(self):
self._streams: dict[str, list[OrderEvent]] = {}
def append(self, stream_id: str, event: OrderEvent):
self._streams.setdefault(stream_id, []).append(event)
def get_stream(self, stream_id: str) -> list[OrderEvent]:
return self._streams.get(stream_id, [])
# 從事件重建 Aggregate
class Order:
def __init__(self):
self.id = ""
self.status = ""
self.total = 0.0
@classmethod
def from_events(cls, events: list[OrderEvent]) -> "Order":
order = cls()
for event in events:
order._apply(event)
return order
def _apply(self, event: OrderEvent):
match event:
case OrderPlaced():
self.id = event.order_id
self.status = "Placed"
self.total = event.total
case OrderShipped():
self.status = "Shipped"
# Projection(讀取側投影)
class OrderSummaryProjection:
def __init__(self, db):
self.db = db
async def handle(self, event: OrderEvent):
match event:
case OrderPlaced():
await self.db.execute(
"INSERT INTO order_summary VALUES (%s, %s, %s)",
(event.order_id, event.total, "Placed")
)
case OrderShipped():
await self.db.execute(
"UPDATE order_summary SET status = %s WHERE id = %s",
("Shipped", event.order_id)
)結構圖
圖中展示了 CQRS + Event Sourcing 的完整流程。Command(寫入)經過 Command Handler 產生事件,存入 Event Store。Event Store 是唯一的真相來源。事件透過 Message Broker 發布給 Projection Handler,更新 Read Model。Query(讀取)直接查詢 Read Model,不經過 Event Store。讀寫路徑完全分離。
面試常見問題
Q: Event Sourcing 的主要優勢和代價?
A: 優勢:完整的歷史審計追蹤、可以重播到任何時間點重建狀態、天然的事件驅動整合、利於除錯和分析。代價:查詢複雜度增加(需要 CQRS 搭配投影)、Event Store 的 schema 演進困難、最終一致性增加 UX 設計難度、學習曲線陡峭。適合金融交易、審計需求高、領域邏輯複雜的系統。
Q: 如何處理 Event Schema 的演進?
A: 三種策略:Upcasting(讀取舊事件時即時轉換為新 schema)、Versioned Events(事件帶版本號,消費者根據版本處理)、Event Transformation(批次將 Event Store 中的舊事件轉換為新格式)。最常用的是 Upcasting,因為它不需要修改已存儲的事件。
Q: Event Sourcing 和 Change Data Capture(CDC)的差異?
A: Event Sourcing 在應用層產生事件,事件是明確的業務概念(OrderPlaced)。CDC 在資料庫層捕獲資料變更(INSERT/UPDATE/DELETE),是技術層級的事件。Event Sourcing 的事件包含業務語義;CDC 的事件只有資料變更,需要消費者自己推斷業務含義。
理解測驗
🤔 Event Sourcing 中的 Event Store 儲存的是什麼?
🤔 CQRS 的核心概念是什麼?
🤔 事件名稱為什麼要用過去式(如 OrderPlaced)?
重點整理
💡一句話記住
Event-Driven Architecture = 記錄發生了什麼,而非現在是什麼。 口訣:「事件不可變,歷史可重播,讀寫要分離」
| 概念 | 說明 | |------|------| | Event Sourcing | 用事件序列作為唯一真相來源 | | CQRS | 讀寫分離,各自優化 | | Event Store | 持久化事件的儲存(如 EventStoreDB) | | Projection | 從事件建立的讀取優化模型 | | 核心價值 | 完整歷史、可重播、天然審計 |