7 MassTransit Saga 模式

╰+攻爆jí腚メ 2022-11-27 12:09 294阅读 0赞

阅读本篇文章前,你应该对CQRS Saga模式有了解

定义事件和命令

  1. // 下单命令,用于开启 Saga
  2. public class PlaceOrderCommand : CorrelatedBy<Guid>
  3. {
  4. // SagaID,此字段的值将被用来初始化 Saga 的 SagaID
  5. public Guid CorrelationId { get; }
  6. public PlaceOrderCommand () {
  7. CorrelationId = Guid.NewGuid ();
  8. }
  9. }
  10. // 检查库存命令
  11. public class CheckInStoreCommand : CorrelatedBy<Guid>
  12. {
  13. public Guid CorrelationId { get; }
  14. public CheckInStoreCommand (Guid correlationId) {
  15. CorrelationId = correlationId;
  16. }
  17. }
  18. // 支付完成事件
  19. public class PayCompletedEnvet : CorrelatedBy<Guid>
  20. {
  21. public Guid CorrelationId { get; }
  22. public PayCompletedEnvet (Guid correlationId) {
  23. CorrelationId = correlationId;
  24. }
  25. }

定义Saga

  1. // 下单 Saga
  2. public class PlaceOrderSaga:
  3. // Saga 接口,必须继承
  4. ISaga,
  5. // 用于创建Saga实例的命令
  6. InitiatedBy<PlaceOrderCommand>,
  7. // 后续命令
  8. Orchestrates<CheckInStoreCommand>,
  9. Orchestrates<PayCompletedEnvet>
  10. {
  11. // SagaId
  12. public Guid CorrelationId { get; set; }
  13. // 处理"下单开始命令"
  14. public Task Consume (ConsumeContext<PlaceOrderCommand> context) {
  15. Console.WriteLine ("下单开始");
  16. // 发送"检查库存命令"
  17. context.Publish (new CheckInStoreCommand (context.CorrelationId.Value));
  18. return Task.CompletedTask;
  19. }
  20. // 处理"检查库存命令"
  21. public Task Consume (ConsumeContext<CheckInStoreCommand> context) {
  22. Console.WriteLine ("检查库存");
  23. return Task.CompletedTask;
  24. }
  25. // 处理"支付完成事件"
  26. public Task Consume (ConsumeContext<PayCompletedEnvet> context) {
  27. Console.WriteLine ("支付完成");
  28. return Task.CompletedTask;
  29. }
  30. }

将Saga添加到队列

  1. // 创建基于内存的总线
  2. var bus = Bus.Factory.CreateUsingInMemory (config => {
  3. // 设置接收队列,队列名 test_queue
  4. config.ReceiveEndpoint ("test_queue", ep => {
  5. // 添加 Saga PlaceOrderSaga
  6. // 使用内存仓库持久化Saga
  7. ep.Saga<PlaceOrderSaga> (new InMemorySagaRepository<PlaceOrderSaga> ());
  8. });
  9. });
  10. bus.Start ();
  11. // 发布下单开始命令
  12. bus.Publish (new PlaceOrderCommand ()).Wait ();
  13. bus.Stop ();

发表评论

表情:
评论列表 (有 0 条评论,294人围观)

还没有评论,来说两句吧...

相关阅读

    相关 分布式事务中的Saga模式

    服务架构(MSA)已经变得非常流行。但是,一个常见问题是如何跨多个微服务管理分布式事务。当微服务架构将单体系统分解为自封装服务时,意味着单体系统中的本地事务现在**分布*...

    相关 Seata Saga 模式详解

    Saga介绍 Saga是一种补偿协议,在Saga模式下,分布式事务内有多个参与者,每一个参与者都是一个冲正补偿服务,需要用户根据业务场景实现其正向操作和逆向回滚操作。Sa