Fescar源码学习--服务协调器TC

妖狐艹你老母 2022-04-23 05:40 219阅读 0赞

之前我们已经用两篇博客分别介绍了Fescar中的TM和RM两个角色的相关操作,这篇博客我们来介绍一下TC

《Fescar源码学习—事物管理者TM(服务调用方)》

《Fescar源码学习—资源管理者RM(服务提供方)》

一、简介

TC(Fescar Server)作为全局事务协调器主要做了以下操作。

(1)TM和RM启动时会注册到TC,TC会将注册信息持久化。

(2)TM在begin事务会首先向TC申请获取全局事务xid。

(3)RM在执行数据库事务之前首先向TC申请获取分片事务branchId。

(4)TM进行事务commit或rollback时会将根据全局事务xid提交到TC,TC根据全局事务xid分别通知分片事务RM,TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

二、源码分析

1、接收请求

Fescar提供 了RpcServer用来接收所有的请求并进行处理。

(1)服务注册请求由ServerMessageListener的onRegRmMessage方法处理

(2)与事务管理相关的请求由ServerMessageListener.onTrxMessage方法处理

  1. @Override
  2. public void dispatch(long msgId, ChannelHandlerContext ctx, Object msg) {
  3. if (msg instanceof RegisterRMRequest) {
  4. //服务注册请求
  5. serverMessageListener.onRegRmMessage(msgId, ctx, (RegisterRMRequest)msg, this,
  6. checkAuthHandler);
  7. } else {
  8. if (ChannelManager.isRegistered(ctx.channel())) {
  9. //事务管理请求
  10. serverMessageListener.onTrxMessage(msgId, ctx, msg, this);
  11. } else {
  12. try {
  13. closeChannelHandlerContext(ctx);
  14. } catch (Exception exx) {
  15. LOGGER.error(exx.getMessage());
  16. }
  17. if (LOGGER.isInfoEnabled()) {
  18. LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
  19. }
  20. }
  21. }
  22. }

2、服务注册处理

服务注册由ServerMessageListener的实现类DefaultServerMessageListenerImpl的onRegRmMessage或registerTMChannel方法处理,最终RM和TM信息都添加到Map中。

RM:dbkey+appname+ip port context

  1. /**
  2. * dbkey+appname+ip port context
  3. */
  4. private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
  5. RpcContext>>>>
  6. RM_CHANNELS
  7. = new ConcurrentHashMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
  8. RpcContext>>>>();

TM:ip+appname,port

  1. /**
  2. * ip+appname,port
  3. */
  4. private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS
  5. = new ConcurrentHashMap<String, ConcurrentMap<Integer, RpcContext>>();

服务注册还是比较简单的,就是将Channel信息添加到Map中

  1. /**
  2. * Register tm channel.
  3. *
  4. * @param request the request
  5. * @param channel the channel
  6. * @throws IncompatibleVersionException the incompatible version exception
  7. */
  8. public static void registerTMChannel(RegisterTMRequest request, Channel channel)
  9. throws IncompatibleVersionException {
  10. Version.checkVersion(request.getVersion());
  11. RpcContext rpcContext = buildChannelHolder(TransactionRole.TMROLE, request.getVersion(),
  12. request.getApplicationId(),
  13. request.getTransactionServiceGroup(),
  14. null, channel);
  15. rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
  16. String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR
  17. + getClientIpFromChannel(channel);
  18. TM_CHANNELS.putIfAbsent(clientIdentified, new ConcurrentHashMap<Integer, RpcContext>());
  19. ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = TM_CHANNELS.get(clientIdentified);
  20. rpcContext.holdInClientChannels(clientIdentifiedMap);
  21. }
  22. /**
  23. * Register rm channel.
  24. *
  25. * @param resourceManagerRequest the resource manager request
  26. * @param channel the channel
  27. * @throws IncompatibleVersionException the incompatible version exception
  28. */
  29. public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)
  30. throws IncompatibleVersionException {
  31. Version.checkVersion(resourceManagerRequest.getVersion());
  32. Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
  33. RpcContext rpcContext;
  34. if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
  35. rpcContext = buildChannelHolder(TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
  36. resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
  37. resourceManagerRequest.getResourceIds(), channel);
  38. rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
  39. } else {
  40. rpcContext = IDENTIFIED_CHANNELS.get(channel);
  41. rpcContext.addResources(dbkeySet);
  42. }
  43. if (null == dbkeySet || dbkeySet.isEmpty()) { return; }
  44. for (String resourceId : dbkeySet) {
  45. RM_CHANNELS.putIfAbsent(resourceId,
  46. new ConcurrentHashMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>>());
  47. ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap
  48. = RM_CHANNELS.get(resourceId);
  49. applicationIdMap.putIfAbsent(resourceManagerRequest.getApplicationId(),
  50. new ConcurrentHashMap<String, ConcurrentMap<Integer, RpcContext>>());
  51. ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> clientIpMap = applicationIdMap.get(
  52. resourceManagerRequest.getApplicationId());
  53. String clientIp = getClientIpFromChannel(channel);
  54. clientIpMap.putIfAbsent(clientIp, new ConcurrentHashMap<Integer, RpcContext>());
  55. ConcurrentMap<Integer, RpcContext> portMap = clientIpMap.get(clientIp);
  56. rpcContext.holdInResourceManagerChannels(resourceId, portMap);
  57. updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
  58. }
  59. }

3、begin操作

TM在执行dubbo远程调用之前会调用begin方法向TC申请一个全局事务xid,TC由GlobalBeginRequest的handle方法进行处理

  1. @Override
  2. public AbstractTransactionResponse handle(RpcContext rpcContext) {
  3. return handler.handle(this, rpcContext);
  4. }

在AbstractTCInboundHandler中调用handle处理操作。

  1. @Override
  2. public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
  3. GlobalBeginResponse response = new GlobalBeginResponse();
  4. exceptionHandleTemplate(new Callback<GlobalBeginRequest, GlobalBeginResponse>() {
  5. @Override
  6. public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
  7. doGlobalBegin(request, response, rpcContext);
  8. }
  9. }, request, response);
  10. return response;
  11. }

在DefaultCoordinator中执行doGlobalBegin操作,最终调用DefaultCore的begin方法

  1. @Override
  2. protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {
  3. response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout()));
  4. }

在DefaultCore的begin方法中完成全局事务xid的生成

  1. @Override
  2. public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
  3. GlobalSession session = GlobalSession.createGlobalSession(
  4. applicationId, transactionServiceGroup, name, timeout);
  5. session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
  6. //全局xid会进行持久化操作
  7. session.begin();
  8. return XID.generateXID(session.getTransactionId());
  9. }

这样就生成全局事务xid了。

4、commit操作

TM在进行commit操作后最终也是有TC的DefaultCore的commit方法进行操作,根据xid找到GlobalSession,然后依次调用分片事务进行commit操作。

  1. @Override
  2. public GlobalStatus commit(String xid) throws TransactionException {
  3. GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
  4. if (globalSession == null) {
  5. return GlobalStatus.Finished;
  6. }
  7. GlobalStatus status = globalSession.getStatus();
  8. globalSession.closeAndClean(); // Highlight: Firstly, close the session, then no more branch can be registered.
  9. if (status == GlobalStatus.Begin) {
  10. if (globalSession.canBeCommittedAsync()) {
  11. asyncCommit(globalSession);
  12. } else {
  13. doGlobalCommit(globalSession, false);
  14. }
  15. }
  16. return globalSession.getStatus();
  17. }
  18. @Override
  19. public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
  20. //查找所有的分片事务,依次进行commit操作
  21. for (BranchSession branchSession : globalSession.getSortedBranches()) {
  22. BranchStatus currentStatus = branchSession.getStatus();
  23. if (currentStatus == BranchStatus.PhaseOne_Failed) {
  24. continue;
  25. }
  26. try {
  27. BranchStatus branchStatus = resourceManagerInbound.branchCommit(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
  28. branchSession.getResourceId(), branchSession.getApplicationData());
  29. switch (branchStatus) {
  30. case PhaseTwo_Committed:
  31. globalSession.removeBranch(branchSession);
  32. continue;
  33. case PhaseTwo_CommitFailed_Unretriable:
  34. if (globalSession.canBeCommittedAsync()) {
  35. LOGGER.error("By [" + branchStatus + "], failed to commit branch " + branchSession);
  36. continue;
  37. } else {
  38. globalSession.changeStatus(GlobalStatus.CommitFailed);
  39. globalSession.end();
  40. LOGGER.error("Finally, failed to commit global[" + globalSession.getTransactionId() + "] since branch[" + branchSession.getBranchId() + "] commit failed");
  41. return;
  42. }
  43. default:
  44. if (!retrying) {
  45. queueToRetryCommit(globalSession);
  46. return;
  47. }
  48. if (globalSession.canBeCommittedAsync()) {
  49. LOGGER.error("By [" + branchStatus + "], failed to commit branch " + branchSession);
  50. continue;
  51. } else {
  52. LOGGER.error("Failed to commit global[" + globalSession.getTransactionId() + "] since branch[" + branchSession.getBranchId() + "] commit failed, will retry later.");
  53. return;
  54. }
  55. }
  56. } catch (Exception ex) {
  57. LOGGER.info("Exception committing branch " + branchSession, ex);
  58. if (!retrying) {
  59. queueToRetryCommit(globalSession);
  60. if (ex instanceof TransactionException) {
  61. throw (TransactionException) ex;
  62. } else {
  63. throw new TransactionException(ex);
  64. }
  65. }
  66. }
  67. }
  68. if (globalSession.hasBranch()) {
  69. LOGGER.info("Global[" + globalSession.getTransactionId() + "] committing is NOT done.");
  70. return;
  71. }
  72. globalSession.changeStatus(GlobalStatus.Committed);
  73. globalSession.end();
  74. LOGGER.info("Global[" + globalSession.getTransactionId() + "] committing is successfully done.");
  75. }

5、rollback操作

rollback与commit的操作几乎类似,也是依次调用分片事务的rollback操作

  1. @Override
  2. public GlobalStatus rollback(String xid) throws TransactionException {
  3. GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
  4. if (globalSession == null) {
  5. return GlobalStatus.Finished;
  6. }
  7. GlobalStatus status = globalSession.getStatus();
  8. globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
  9. if (status == GlobalStatus.Begin) {
  10. globalSession.changeStatus(GlobalStatus.Rollbacking);
  11. doGlobalRollback(globalSession, false);
  12. }
  13. return globalSession.getStatus();
  14. }
  15. @Override
  16. public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
  17. for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
  18. BranchStatus currentBranchStatus = branchSession.getStatus();
  19. if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
  20. continue;
  21. }
  22. try {
  23. BranchStatus branchStatus = resourceManagerInbound.branchRollback(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
  24. branchSession.getResourceId(), branchSession.getApplicationData());
  25. switch (branchStatus) {
  26. case PhaseTwo_Rollbacked:
  27. globalSession.removeBranch(branchSession);
  28. LOGGER.error("Successfully rolled back branch " + branchSession);
  29. continue;
  30. case PhaseTwo_RollbackFailed_Unretriable:
  31. GlobalStatus currentStatus = globalSession.getStatus();
  32. if (currentStatus.name().startsWith("Timeout")) {
  33. globalSession.changeStatus(GlobalStatus.TimeoutRollbackFailed);
  34. } else {
  35. globalSession.changeStatus(GlobalStatus.RollbackFailed);
  36. }
  37. globalSession.end();
  38. LOGGER.error("Failed to rollback global[" + globalSession.getTransactionId() + "] since branch[" + branchSession.getBranchId() + "] rollback failed");
  39. return;
  40. default:
  41. LOGGER.info("Failed to rollback branch " + branchSession);
  42. if (!retrying) {
  43. queueToRetryRollback(globalSession);
  44. }
  45. return;
  46. }
  47. } catch (Exception ex) {
  48. LOGGER.info("Exception rollbacking branch " + branchSession, ex);
  49. if (!retrying) {
  50. queueToRetryRollback(globalSession);
  51. if (ex instanceof TransactionException) {
  52. throw (TransactionException) ex;
  53. } else {
  54. throw new TransactionException(ex);
  55. }
  56. }
  57. }
  58. }
  59. GlobalStatus currentStatus = globalSession.getStatus();
  60. if (currentStatus.name().startsWith("Timeout")) {
  61. globalSession.changeStatus(GlobalStatus.TimeoutRollbacked);
  62. } else {
  63. globalSession.changeStatus(GlobalStatus.Rollbacked);
  64. }
  65. globalSession.end();
  66. }

总结:

(1)TC作为一个注册中心,保存了TM和RM服务信息。

(2)TC提供全局事务xid和分片事务branchId的创建操作。

(3)TC转发TM的commit或rollback相关操作。

发表评论

表情:
评论列表 (有 0 条评论,219人围观)

还没有评论,来说两句吧...

相关阅读

    相关 TCS 贪吃蛇C

    linux版:这是我自己独立制作的自认为比较成功的小游戏,贪吃蛇主要运用了链表技术以及排序算法。 原版是2014年的windows版后期我移植到了linux: //

    相关 分布式协调服务Zookeeper

    分布式系统介绍 分布式系统的定义 《分布式系统原理和范型》一书中定义:分布式系统是若干独立计算机的集合,这些计算机对于用户来说就像是单个相关系统。 从进程角度看,两