Domain Events(領域事件)

是什麼?

Domain Event 是對領域中已經發生的重要事情的描述。它是不可變的、用過去式命名的物件,用來實現 Aggregate 之間的鬆耦合通訊。

ℹ️DDD 整合模式

Domain Events 是解決「一個交易只修改一個 Aggregate」限制的關鍵工具。當一個 Aggregate 的操作需要觸發另一個 Aggregate 的變更時,用事件來通訊。

核心觀念

常見誤區

⚠️常見誤區

誤區一:用現在式命名事件(如 PlaceOrder)。事件描述的是「已經發生的事」,必須用過去式(OrderPlaced)。現在式的是 Command,不是 Event。

誤區二:在事件處理器中修改發出事件的那個 Aggregate。事件處理器應該觸發其他 Aggregate 的操作,或執行副作用(寄信、通知)。

誤區三:事件攜帶太少資訊,導致訂閱者需要回頭查詢發布者的資料,造成耦合。

事件處理流程

1

領域操作發生

Aggregate Root 執行了一個業務操作(如 Order.Place())

2

產生事件

在 Aggregate 內部建立 Domain Event 物件(OrderPlaced)

3

收集事件

事件暫存在 Aggregate 內部的清單中

4

發布事件

在交易完成後,由基礎設施層取出並發布事件

5

處理事件

訂閱者收到事件後執行各自的邏輯(寄信、更新庫存等)

程式碼範例

C# 版本

csharp
// Domain Event 基底
public interface IDomainEvent
{
    DateTime OccurredAt { get; }
}
 
// 具體事件
public record OrderPlacedEvent(
    Guid OrderId,
    Guid CustomerId,
    decimal TotalAmount,
    DateTime OccurredAt
) : IDomainEvent;
 
public record PaymentReceivedEvent(
    Guid OrderId,
    decimal Amount,
    string PaymentMethod,
    DateTime OccurredAt
) : IDomainEvent;
 
// Aggregate Root 帶事件收集能力
public abstract class AggregateRoot<TId> : Entity<TId> where TId : notnull
{
    private readonly List<IDomainEvent> _domainEvents = new();
    public IReadOnlyList<IDomainEvent> DomainEvents => _domainEvents.AsReadOnly();
 
    protected AggregateRoot(TId id) : base(id) { }
 
    protected void AddDomainEvent(IDomainEvent domainEvent)
    {
        _domainEvents.Add(domainEvent);
    }
 
    public void ClearDomainEvents() => _domainEvents.Clear();
}
 
// Order Aggregate 發出事件
public class Order : AggregateRoot<Guid>
{
    public void Place()
    {
        if (!_lineItems.Any())
            throw new InvalidOperationException("Cannot place an empty order.");
 
        Status = OrderStatus.Placed;
 
        AddDomainEvent(new OrderPlacedEvent(
            OrderId: Id,
            CustomerId: CustomerId,
            TotalAmount: TotalAmount.Amount,
            OccurredAt: DateTime.UtcNow
        ));
    }
}
 
// 事件處理器
public class SendOrderConfirmationEmail : IEventHandler<OrderPlacedEvent>
{
    public async Task Handle(OrderPlacedEvent @event)
    {
        // 寄出訂單確認信
        await _emailService.SendOrderConfirmation(@event.OrderId, @event.CustomerId);
    }
}
 
public class UpdateInventory : IEventHandler<OrderPlacedEvent>
{
    public async Task Handle(OrderPlacedEvent @event)
    {
        // 扣減庫存
        await _inventoryService.DeductStock(@event.OrderId);
    }
}

TypeScript 版本

typescript
// Domain Event
interface DomainEvent {
  readonly occurredAt: Date;
}
 
class OrderPlacedEvent implements DomainEvent {
  readonly occurredAt = new Date();
  constructor(
    public readonly orderId: string,
    public readonly customerId: string,
    public readonly totalAmount: number,
  ) {}
}
 
// Aggregate Root 帶事件收集
abstract class AggregateRoot<TId> {
  private domainEvents: DomainEvent[] = [];
 
  constructor(public readonly id: TId) {}
 
  protected addDomainEvent(event: DomainEvent): void {
    this.domainEvents.push(event);
  }
 
  pullDomainEvents(): DomainEvent[] {
    const events = [...this.domainEvents];
    this.domainEvents = [];
    return events;
  }
}
 
// Order 發出事件
class Order extends AggregateRoot<string> {
  place(): void {
    if (this.lineItems.length === 0) {
      throw new Error("Cannot place an empty order.");
    }
    this.status = OrderStatus.Placed;
    this.addDomainEvent(
      new OrderPlacedEvent(this.id, this.customerId, this.totalAmount.amount)
    );
  }
}

Python 版本

python
from dataclasses import dataclass, field
from datetime import datetime
from typing import List
from abc import ABC, abstractmethod
 
@dataclass(frozen=True)
class DomainEvent(ABC):
    occurred_at: datetime = field(default_factory=datetime.utcnow)
 
@dataclass(frozen=True)
class OrderPlacedEvent(DomainEvent):
    order_id: str = ""
    customer_id: str = ""
    total_amount: float = 0.0
 
class AggregateRoot(ABC):
    def __init__(self):
        self._domain_events: List[DomainEvent] = []
 
    def _add_domain_event(self, event: DomainEvent) -> None:
        self._domain_events.append(event)
 
    def pull_domain_events(self) -> List[DomainEvent]:
        events = list(self._domain_events)
        self._domain_events.clear()
        return events
 
class Order(AggregateRoot):
    def place(self) -> None:
        if not self._line_items:
            raise ValueError("Cannot place an empty order.")
        self._status = OrderStatus.PLACED
        self._add_domain_event(OrderPlacedEvent(
            order_id=str(self.id),
            customer_id=str(self.customer_id),
            total_amount=self._total_amount.amount,
        ))

概念圖

Aggregate (Publisher)
產生
Domain Event
發布到
Event Handler A(寄信)
Event Handler B(更新庫存)
Event Handler C(通知)
Event Bus / Mediator

此圖展示事件驅動的廣播模型:Aggregate 產生事件,Event Bus 負責分發,多個 Handler 各自獨立處理。新增或移除 Handler 不需要改動 Aggregate 的程式碼,這就是鬆耦合的體現。

Domain Event vs Integration Event

| 面向 | Domain Event | Integration Event | |------|-------------|-------------------| | 作用範圍 | 單一 Bounded Context 內部 | 跨 Bounded Context / 跨服務 | | 傳遞方式 | 進程內(MediatR、事件匯流排) | 跨進程(RabbitMQ、Azure Service Bus、Kafka) | | 一致性 | 通常同步或同一交易 | 非同步、最終一致性 | | 失敗處理 | 可以拋例外回滾交易 | 需要 Retry、Dead Letter Queue | | 實作複雜度 | 低 | 高(序列化、冪等性、順序性) |

實戰補充

💡資深開發者筆記

在 C# 專案中,MediatR 是最常用的 in-process 事件發布工具。搭配 EF Core 的 SaveChangesInterceptor,可以在 SaveChanges 時自動取出所有 Aggregate 的 Domain Events 並發布。

事件分為兩層:Domain Events(進程內,同步或非同步)和 Integration Events(跨服務,透過 Message Queue)。Domain Events 用 MediatR 處理;Integration Events 用 RabbitMQ、Azure Service Bus 等。

設計事件時,攜帶的資料要遵守「胖事件」原則:包含訂閱者需要的所有資訊,避免訂閱者還要回頭查詢發布者。但也不要把整個 Aggregate 塞進事件裡。

理解測驗

🤔 Domain Event 的命名規則是什麼?

🤔 Domain Events 主要解決什麼問題?

🤔 以下哪個是正確的事件處理方式?

重點整理

💡一句話記住

口訣:「過去式、不可改、帶足料」—— 事件三要素,少一個就出問題。

| 概念 | 說明 | |------|------| | 過去式命名 | OrderPlaced、PaymentReceived | | 不可變 | 事件發出後不能修改 | | 發布/訂閱 | 發布者和訂閱者互不依賴 | | 最終一致性 | 跨 Aggregate 的一致性透過事件實現 | | 攜帶足夠資訊 | 訂閱者不需要回頭查詢發布者 |

你可能也想看

Aggregate(聚合)Bounded Context

按 ← → 鍵切換課程