Event-Driven Architecture(事件驅動架構)

是什麼?

Event-Driven Architecture(EDA)是一種以事件為核心的架構模式。系統中的元件透過產生和消費事件來溝通,而不是直接呼叫彼此。

ℹ️三個關鍵概念

Event Notification — 通知其他服務「某件事發生了」。Event-Carried State Transfer — 事件中攜帶足夠的資料讓消費者不需要回呼。Event Sourcing — 用事件序列作為資料的唯一真相來源。

核心觀念

Event Sourcing vs 傳統 CRUD

| 面向 | 傳統 CRUD | Event Sourcing | |------|-----------|---------------| | 儲存 | 最新狀態 | 事件序列 | | 歷史 | 覆蓋舊資料 | 完整保留 | | 查詢 | 直接查詢 | 從事件重建狀態 | | 除錯 | 只看到現在 | 可以重播到任何時間點 | | 審計 | 需要額外的 audit log | 事件本身就是 audit log |

CQRS(Command Query Responsibility Segregation)

CQRS 將「寫入」和「讀取」分離成兩個獨立的模型:

事件設計原則

常見誤區

⚠️常犯錯誤

  • 所有東西都用 Event Sourcing(簡單的 CRUD 不需要,複雜度會大幅增加)
  • 事件中包含太少資訊(消費者被迫回呼生產者取資料,重新產生耦合)
  • 沒有處理事件的順序問題(同一個 entity 的事件必須按序處理)
  • 把 Event Sourcing 和 Message Queue 搞混(Event Store 是永久儲存,MQ 是暫時性的傳輸通道)

執行流程

1

Command 進入

使用者操作產生 Command(如 PlaceOrder)

2

產生事件

Command Handler 驗證後產生事件(如 OrderPlaced)

3

持久化事件

事件寫入 Event Store(唯一真相來源)

4

發布事件

事件發布到 Message Broker 通知其他服務

5

更新投影

Event Handler 更新 Read Model 供查詢使用

流程解讀:在 Event Sourcing + CQRS 架構中,寫入流程從 Command 開始。Command Handler 執行業務邏輯驗證後產生一或多個事件,這些事件是系統的唯一真相。事件被持久化到 Event Store 後,發布到 Message Broker。下游的 Event Handler 消費事件並更新各種讀取優化的投影(如 SQL 報表、Elasticsearch 索引)。讀取時直接查詢投影,不需要重播事件。

程式碼範例

C# 版本

csharp
// Event 定義
public record OrderPlaced(
    string OrderId, string CustomerId,
    List<OrderItem> Items, decimal Total, DateTime OccurredAt);
 
public record OrderShipped(
    string OrderId, string TrackingNumber, DateTime OccurredAt);
 
// Event Store(簡化版)
public class EventStore
{
    private readonly List<DomainEvent> _events = new();
 
    public void Append(string streamId, DomainEvent evt)
    {
        _events.Add(evt);
    }
 
    public IEnumerable<DomainEvent> GetStream(string streamId)
    {
        return _events.Where(e => e.StreamId == streamId)
                      .OrderBy(e => e.Version);
    }
}
 
// Aggregate 從事件重建狀態
public class Order
{
    public string Id { get; private set; }
    public string Status { get; private set; }
    public decimal Total { get; private set; }
 
    public static Order FromEvents(IEnumerable<DomainEvent> events)
    {
        var order = new Order();
        foreach (var evt in events)
            order.Apply(evt);
        return order;
    }
 
    private void Apply(DomainEvent evt) => _ = evt switch
    {
        OrderPlaced e => (Id, Status, Total) = (e.OrderId, "Placed", e.Total),
        OrderShipped e => Status = "Shipped",
        _ => throw new InvalidOperationException()
    };
}

TypeScript 版本

typescript
// Event 定義
interface OrderPlaced {
  type: "OrderPlaced";
  orderId: string;
  customerId: string;
  items: { productId: string; quantity: number; price: number }[];
  total: number;
  occurredAt: string;
}
 
interface OrderShipped {
  type: "OrderShipped";
  orderId: string;
  trackingNumber: string;
  occurredAt: string;
}
 
type OrderEvent = OrderPlaced | OrderShipped;
 
// Event Store
class EventStore {
  private streams = new Map<string, OrderEvent[]>();
 
  append(streamId: string, event: OrderEvent) {
    const stream = this.streams.get(streamId) ?? [];
    stream.push(event);
    this.streams.set(streamId, stream);
  }
 
  getStream(streamId: string): OrderEvent[] {
    return this.streams.get(streamId) ?? [];
  }
}
 
// 從事件重建狀態
function rebuildOrder(events: OrderEvent[]) {
  let state = { id: "", status: "", total: 0 };
  for (const event of events) {
    switch (event.type) {
      case "OrderPlaced":
        state = { id: event.orderId, status: "Placed", total: event.total };
        break;
      case "OrderShipped":
        state.status = "Shipped";
        break;
    }
  }
  return state;
}
 
// CQRS — 讀取側的投影更新
async function handleOrderPlaced(event: OrderPlaced) {
  // 更新讀取模型(例如 PostgreSQL 的報表 table)
  await db.query(
    "INSERT INTO order_summary (id, customer_id, total, status) VALUES ($1, $2, $3, $4)",
    [event.orderId, event.customerId, event.total, "Placed"]
  );
}

Python 版本

python
from dataclasses import dataclass
from datetime import datetime
from typing import Union
 
# Event 定義
@dataclass(frozen=True)  # frozen = immutable
class OrderPlaced:
    order_id: str
    customer_id: str
    items: list
    total: float
    occurred_at: datetime
 
@dataclass(frozen=True)
class OrderShipped:
    order_id: str
    tracking_number: str
    occurred_at: datetime
 
OrderEvent = Union[OrderPlaced, OrderShipped]
 
# Event Store
class EventStore:
    def __init__(self):
        self._streams: dict[str, list[OrderEvent]] = {}
 
    def append(self, stream_id: str, event: OrderEvent):
        self._streams.setdefault(stream_id, []).append(event)
 
    def get_stream(self, stream_id: str) -> list[OrderEvent]:
        return self._streams.get(stream_id, [])
 
# 從事件重建 Aggregate
class Order:
    def __init__(self):
        self.id = ""
        self.status = ""
        self.total = 0.0
 
    @classmethod
    def from_events(cls, events: list[OrderEvent]) -> "Order":
        order = cls()
        for event in events:
            order._apply(event)
        return order
 
    def _apply(self, event: OrderEvent):
        match event:
            case OrderPlaced():
                self.id = event.order_id
                self.status = "Placed"
                self.total = event.total
            case OrderShipped():
                self.status = "Shipped"
 
# Projection(讀取側投影)
class OrderSummaryProjection:
    def __init__(self, db):
        self.db = db
 
    async def handle(self, event: OrderEvent):
        match event:
            case OrderPlaced():
                await self.db.execute(
                    "INSERT INTO order_summary VALUES (%s, %s, %s)",
                    (event.order_id, event.total, "Placed")
                )
            case OrderShipped():
                await self.db.execute(
                    "UPDATE order_summary SET status = %s WHERE id = %s",
                    ("Shipped", event.order_id)
                )

結構圖

Command (Write)
PlaceOrder
Command Handler
append events
Event Store
publish
Message Broker
update
Read Model (Projection)
Query (Read)

圖中展示了 CQRS + Event Sourcing 的完整流程。Command(寫入)經過 Command Handler 產生事件,存入 Event Store。Event Store 是唯一的真相來源。事件透過 Message Broker 發布給 Projection Handler,更新 Read Model。Query(讀取)直接查詢 Read Model,不經過 Event Store。讀寫路徑完全分離。

面試常見問題

Q: Event Sourcing 的主要優勢和代價?

A: 優勢:完整的歷史審計追蹤、可以重播到任何時間點重建狀態、天然的事件驅動整合、利於除錯和分析。代價:查詢複雜度增加(需要 CQRS 搭配投影)、Event Store 的 schema 演進困難、最終一致性增加 UX 設計難度、學習曲線陡峭。適合金融交易、審計需求高、領域邏輯複雜的系統。

Q: 如何處理 Event Schema 的演進?

A: 三種策略:Upcasting(讀取舊事件時即時轉換為新 schema)、Versioned Events(事件帶版本號,消費者根據版本處理)、Event Transformation(批次將 Event Store 中的舊事件轉換為新格式)。最常用的是 Upcasting,因為它不需要修改已存儲的事件。

Q: Event Sourcing 和 Change Data Capture(CDC)的差異?

A: Event Sourcing 在應用層產生事件,事件是明確的業務概念(OrderPlaced)。CDC 在資料庫層捕獲資料變更(INSERT/UPDATE/DELETE),是技術層級的事件。Event Sourcing 的事件包含業務語義;CDC 的事件只有資料變更,需要消費者自己推斷業務含義。

理解測驗

🤔 Event Sourcing 中的 Event Store 儲存的是什麼?

🤔 CQRS 的核心概念是什麼?

🤔 事件名稱為什麼要用過去式(如 OrderPlaced)?

重點整理

💡一句話記住

Event-Driven Architecture = 記錄發生了什麼,而非現在是什麼。 口訣:「事件不可變,歷史可重播,讀寫要分離」

| 概念 | 說明 | |------|------| | Event Sourcing | 用事件序列作為唯一真相來源 | | CQRS | 讀寫分離,各自優化 | | Event Store | 持久化事件的儲存(如 EventStoreDB) | | Projection | 從事件建立的讀取優化模型 | | 核心價值 | 完整歷史、可重播、天然審計 |

你可能也想看

Circuit BreakerDecomposition

按 ← → 鍵切換課程