3 CQRS Saga 模式

刺骨的言语ヽ痛彻心扉 2022-11-28 10:43 297阅读 0赞

概念

总线:负责发送命令、事件、持久化Saga
命令:描述一个命令
事件:描述一个事件
处理器:处理命令或事件
Saga:代表一个流程,是由完成一个流程的一系列处理器组成

什么是Saga

Saga代表一个流程

如下订单流程
下订单包含如下命令和事件:

下订单命令:
用于开启saga流程,此时saga被创建并持久化(如保存到数据库中),用户点击“下单”时,发送一个Http请求触发该命令,该命令会发送“检查商品库存命令”

检查商品库存命令:
这里会检查商品库存,如果库存充足,则创建“订单聚合”,状态为未支付,并结束Http请求

支付完成事件:
如果用户支付完成,会触发该事件,更改“订单聚合”,状态为以支付,并发送“订单创建完成事件”
此时saga以完成,将会被移除

同理配送流程
订单创建完成事件:
开启saga流程,该事件会创建配送单

Saga与处理器

Saga是执行上有顺序的步骤集合,处理器是一个步骤
应用场景示例:
Saga:下订单Saga,用于下订单
处理器:订单创建完成事件处理器,用于记录谁在什么时候下了什么订单

代码示例

命令和事件:

  1. // 表示一个命令或事件
  2. public abstract class Message {
  3. public Guid SagaId { get; }
  4. public Message (Guid sagaId) {
  5. SagaId = sagaId;
  6. }
  7. }
  8. // 下订单命令
  9. public class PlaceOrderCommand : Message {
  10. public PlaceOrderCommand () : base (Guid.NewGuid ()) { }
  11. }
  12. ...

Saga:

  1. public abstract class SagaBase {
  2. public SagaBase () {
  3. SagaId = Guid.NewGuid ();
  4. }
  5. public Guid SagaId { get; set; }
  6. public bool IsCompleted { get; set; }
  7. }
  8. public class PlaceOrderSaga : SagaBase,
  9. IHandler<PlaceOrderCommand>,
  10. IHandler<CheckInStoreCommand>,
  11. IHandler<PayCompletedEnvet> {
  12. static PlaceOrderSaga () {
  13. Bus.RegisterSaga<PlaceOrderCommand, PlaceOrderSaga> ();
  14. }
  15. public void Handle (PlaceOrderCommand message) {
  16. // 发送检查库存命令
  17. Bus.Send (new CheckInStoreCommand (SagaId));
  18. }
  19. public void Handle (CheckInStoreCommand message) {
  20. /* 检查库存 */
  21. }
  22. public void Handle (PayCompletedEnvet message) {
  23. // 发送订单完成事件
  24. Bus.Send (new OrderCompletedEvent (SagaId));
  25. // saga 完成
  26. IsCompleted = true;
  27. }
  28. }

总线:

  1. public class Bus {
  2. // 注册的处理器
  3. private static Dictionary<Type, Type> _handlers = new Dictionary<Type, Type> ();
  4. // 注册的 saga 类型
  5. private static Dictionary<Type, Type> _sagaTypes = new Dictionary<Type, Type> ();
  6. // 已启动的 saga(saga应该保存到数据库中,但这里为了方便)
  7. private static Dictionary<Guid, SagaBase> _sagaInstances = new Dictionary<Guid, SagaBase> ();
  8. // 注册 Saga
  9. public static void RegisterSaga<TMessage, TSaga> () where TMessage : Message where TSaga : SagaBase {
  10. _sagaTypes.Add (typeof (TMessage), typeof (TSaga));
  11. }
  12. // 注册 处理器
  13. public static void RegisterHandler<TMessage, THandler> () where TMessage : Message where THandler : IHandler<TMessage> {
  14. _handlers.Add (typeof (TMessage), typeof (THandler));
  15. }
  16. // 发送事件或命令
  17. public static void Send<T> (T message) where T : Message {
  18. // 运行以注册的 处理器
  19. if (_handlers.ContainsKey (typeof (T))) {
  20. var handlerType = _handlers[typeof (T)];
  21. var handler = (IHandler<T>) Activator.CreateInstance (handlerType);
  22. handler.Handle (message);
  23. }
  24. // message 是否可以启动一个 saga
  25. if (_sagaTypes.ContainsKey (typeof (T))) {
  26. var sagaType = _sagaTypes[typeof (T)];
  27. var saga = (SagaBase) Activator.CreateInstance (sagaType);
  28. var handler = (IHandler<T>) saga;
  29. handler.Handle (message);
  30. // 将 saga 持久化
  31. _sagaInstances.Add (saga.SagaId, saga);
  32. return;
  33. }
  34. // 查看已启动的 Saga 中是否存在对应的 Saga
  35. if (_sagaInstances.ContainsKey (message.SagaId)) {
  36. var saga = _sagaInstances[message.SagaId];
  37. var handler = (IHandler<T>) saga;
  38. handler.Handle (message);
  39. // 如果该 saga 以完成,则移除
  40. if (saga.IsCompleted) {
  41. _sagaInstances.Remove (message.SagaId);
  42. return;
  43. }
  44. // 否则更新 saga 的状态到数据库
  45. return;
  46. }
  47. }
  48. }

发表评论

表情:
评论列表 (有 0 条评论,297人围观)

还没有评论,来说两句吧...

相关阅读

    相关 分布式事务中的Saga模式

    服务架构(MSA)已经变得非常流行。但是,一个常见问题是如何跨多个微服务管理分布式事务。当微服务架构将单体系统分解为自封装服务时,意味着单体系统中的本地事务现在**分布*...

    相关 Seata Saga 模式详解

    Saga介绍 Saga是一种补偿协议,在Saga模式下,分布式事务内有多个参与者,每一个参与者都是一个冲正补偿服务,需要用户根据业务场景实现其正向操作和逆向回滚操作。Sa