分布式注册中心xxl-registry源码分析

野性酷女 2024-04-17 23:38 138阅读 0赞

官网概述:XXL-REGISTRY 是一个轻量级分布式服务注册中心,拥有”轻量级、秒级注册上线、多环境、跨语言、跨机房”等特性。现已开放源代码,开箱即用。

基本思路:注册中心的基本的功能一般包括服务注册(registry)、服务发现(discovery)、服务摘除(remove)等;然后就涉及到服务宕机后能否及时摘除、如何高效的服务发现、注册中心数据备份等;这些任务由注册中心的服务端完成(server),另外注册中心还会提供一个客户端(client),服务提供者通过client端进行服务注册、摘除、心跳检测等,服务消费者通过client端进行服务发现;

项目结构:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xkdzIwMTUxMDgwMzAwNg_size_16_color_FFFFFF_t_70

设计的三张表:

20190903075456260.png

a、xxlRegistry实体:

  1. public class XxlRegistry {
  2. private int id;
  3. private String biz; // 业务标识
  4. private String env; // 环境标识
  5. private String key; // 注册Key
  6. private String data; // 注册Value有效数据
  7. private int status; // 状态:0-正常、1-锁定、2-禁用

其中data为服务的注册地址集合以String类型在数据库中存储如:[“address001”, “address002”]

b、xxlRegistryData实体:

  1. public class XxlRegistryData {
  2. private int id;
  3. private String biz; // 业务标识
  4. private String env; // 环境标识
  5. private String key; // 注册Key
  6. private String value; // 注册Value
  7. private Date updateTime; // 更新时间

xxlRegistryData与xxlRegistry相当于是多对一的关系, 存数据的时候将biz+env+key对应的value取出来存到xxlRegistry的data中。

c、xxlRegistryMessage实体:

  1. public class XxlRegistryMessage {
  2. private int id;
  3. private int type; // 消息类型:0-注册更新
  4. private String data; // 消息内容
  5. private Date addTime;

这里data存的是xxlRegistry信息

一、注册中心Server端

1、API:包括Admin界面管理api、Client端访问api

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xkdzIwMTUxMDgwMzAwNg_size_16_color_FFFFFF_t_70 1

其中 Api的service层实现以及服务定时处理的各类线程都在XxlRegistryServiceImpl中完成:

  1. @Service
  2. public class XxlRegistryServiceImpl implements IXxlRegistryService, InitializingBean, DisposableBean {
  3. private static Logger logger = LoggerFactory.getLogger(XxlRegistryServiceImpl.class);
  4. @Resource
  5. private IXxlRegistryDao xxlRegistryDao;
  6. @Resource
  7. private IXxlRegistryDataDao xxlRegistryDataDao;
  8. @Resource
  9. private IXxlRegistryMessageDao xxlRegistryMessageDao;
  10. @Value("${xxl.registry.data.filepath}")
  11. private String registryDataFilePath;
  12. @Value("${xxl.registry.accessToken}")
  13. private String accessToken;
  14. private int registryBeatTime = 10;

1.1、RegistryController:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xkdzIwMTUxMDgwMzAwNg_size_16_color_FFFFFF_t_70 2

对注册中心的基本信息进行人工维护, 可以修改状态信息, 禁止自动服务注册与发现等;这里是直接针对数据库层面的修改。

1.2、ApiController:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xkdzIwMTUxMDgwMzAwNg_size_16_color_FFFFFF_t_70 3

提供服务注册、发现、摘除、心跳维护等接口

a、registry实现:

  1. @Override
  2. public ReturnT<String> registry(String accessToken, String biz, String env, List<XxlRegistryData> registryDataList) {
  3. // valid
  4. if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
  5. return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
  6. }
  7. if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
  8. return new ReturnT<String>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]");
  9. }
  10. if (env==null || env.trim().length()<2 || env.trim().length()>255) {
  11. return new ReturnT<String>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
  12. }
  13. if (registryDataList==null || registryDataList.size()==0) {
  14. return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry DataList Invalid");
  15. }
  16. for (XxlRegistryData registryData: registryDataList) {
  17. if (registryData.getKey()==null || registryData.getKey().trim().length()<4 || registryData.getKey().trim().length()>255) {
  18. return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Key Invalid[4~255]");
  19. }
  20. if (registryData.getValue()==null || registryData.getValue().trim().length()<4 || registryData.getValue().trim().length()>255) {
  21. return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Value Invalid[4~255]");
  22. }
  23. }
  24. // fill + add queue
  25. for (XxlRegistryData registryData: registryDataList) {
  26. registryData.setBiz(biz);
  27. registryData.setEnv(env);
  28. }
  29. registryQueue.addAll(registryDataList);
  30. return ReturnT.SUCCESS;
  31. }

可以看到前面都在对参赛进行校验,后面是直接将数据存如registryQueue队列中,持久到数据库是通过开启线程同步到数据库中:

  1. /**
  2. * registry registry data (client-num/10 s)
  3. */
  4. for (int i = 0; i < 10; i++) {
  5. executorService.execute(new Runnable() {
  6. @Override
  7. public void run() {
  8. while (!executorStoped) {
  9. try {
  10. XxlRegistryData xxlRegistryData = registryQueue.take();
  11. if (xxlRegistryData !=null) {
  12. // refresh or add
  13. int ret = xxlRegistryDataDao.refresh(xxlRegistryData);
  14. if (ret == 0) {
  15. xxlRegistryDataDao.add(xxlRegistryData);
  16. }
  17. // valid file status
  18. XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
  19. if (fileXxlRegistry == null) {
  20. // go on
  21. } else if (fileXxlRegistry.getStatus() != 0) {
  22. continue; // "Status limited."
  23. } else {
  24. if (fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) {
  25. continue; // "Repeated limited."
  26. }
  27. }
  28. // checkRegistryDataAndSendMessage
  29. checkRegistryDataAndSendMessage(xxlRegistryData);
  30. }
  31. } catch (Exception e) {
  32. if (!executorStoped) {
  33. logger.error(e.getMessage(), e);
  34. }
  35. }
  36. }
  37. }
  38. });
  39. }

这里就是从注册服务的队列取出数据并同步更新到xxl-registry-data表中,并将信息同步到xxl-registry和xxl-registry-message表中;

服务摘除与注册方式类似,通过接口存到队列,再另跑线程同步到表中(xxl-registry-data);服务摘除时会同步更新xxl-regsitry和xxl-registry-message表,另跑线程将xxl-registry-message的最新信息同步到数据文件中,以保证服务的最新可用性。

b、服务发现:其高效性就体现在从文件中直接读取数据

  1. @Override
  2. public ReturnT<Map<String, List<String>>> discovery(String accessToken, String biz, String env, List<String> keys) {
  3. // valid
  4. if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
  5. return new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid");
  6. }
  7. if (biz==null || biz.trim().length()<2 || biz.trim().length()>255) {
  8. return new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[2~255]");
  9. }
  10. if (env==null || env.trim().length()<2 || env.trim().length()>255) {
  11. return new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
  12. }
  13. if (keys==null || keys.size()==0) {
  14. return new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid.");
  15. }
  16. for (String key: keys) {
  17. if (key==null || key.trim().length()<4 || key.trim().length()>255) {
  18. return new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]");
  19. }
  20. }
  21. Map<String, List<String>> result = new HashMap<String, List<String>>();
  22. for (String key: keys) {
  23. XxlRegistryData xxlRegistryData = new XxlRegistryData();
  24. xxlRegistryData.setBiz(biz);
  25. xxlRegistryData.setEnv(env);
  26. xxlRegistryData.setKey(key);
  27. List<String> dataList = new ArrayList<String>();
  28. XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
  29. if (fileXxlRegistry!=null) {
  30. dataList = fileXxlRegistry.getDataList();
  31. }
  32. result.put(key, dataList);
  33. }
  34. return new ReturnT<Map<String, List<String>>>(result);
  35. }

文件中的注册信息通过xxl-registry-message表中同步而来,并定时清除表中

  1. /**
  2. * broadcase new one registry-data-file (1/1s)
  3. *
  4. * clean old message (1/10s)
  5. */
  6. executorService.execute(new Runnable() {
  7. @Override
  8. public void run() {
  9. while (!executorStoped) {
  10. try {
  11. // new message, filter readed
  12. List<XxlRegistryMessage> messageList = xxlRegistryMessageDao.findMessage(readedMessageIds);
  13. if (messageList!=null && messageList.size()>0) {
  14. for (XxlRegistryMessage message: messageList) {
  15. readedMessageIds.add(message.getId());
  16. if (message.getType() == 0) { // from registry、add、update、deelete,ne need sync from db, only write
  17. XxlRegistry xxlRegistry = JacksonUtil.readValue(message.getData(), XxlRegistry.class);
  18. // process data by status
  19. if (xxlRegistry.getStatus() == 1) {
  20. // locked, not updated
  21. } else if (xxlRegistry.getStatus() == 2) {
  22. // disabled, write empty
  23. xxlRegistry.setData(JacksonUtil.writeValueAsString(new ArrayList<String>()));
  24. } else {
  25. // default, sync from db (aready sync before message, only write)
  26. }
  27. // sync fileI(将数据同步到文件中)
  28. setFileRegistryData(xxlRegistry);
  29. }
  30. }
  31. }
  32. // clean old message;
  33. if ( (System.currentTimeMillis()/1000) % registryBeatTime ==0) {
  34. xxlRegistryMessageDao.cleanMessage(registryBeatTime);
  35. readedMessageIds.clear();
  36. }
  37. } catch (Exception e) {
  38. if (!executorStoped) {
  39. logger.error(e.getMessage(), e);
  40. }
  41. }
  42. try {
  43. TimeUnit.SECONDS.sleep(1);
  44. } catch (Exception e) {
  45. if (!executorStoped) {
  46. logger.error(e.getMessage(), e);
  47. }
  48. }
  49. }
  50. }
  51. });

c、定时清理xxl-registry-data数据表、同步xxl-regsitry-data到xxl-regsitry以及数据文件、清理无效数据文件

  1. /**
  2. * clean old registry-data (1/10s)
  3. *
  4. * sync total registry-data db + file (1+N/10s)
  5. *
  6. * clean old registry-data file
  7. */
  8. executorService.execute(new Runnable() {
  9. @Override
  10. public void run() {
  11. while (!executorStoped) {
  12. // align to beattime
  13. try {
  14. long sleepSecond = registryBeatTime - (System.currentTimeMillis()/1000)%registryBeatTime;
  15. if (sleepSecond>0 && sleepSecond<registryBeatTime) {
  16. TimeUnit.SECONDS.sleep(sleepSecond);
  17. }
  18. } catch (Exception e) {
  19. if (!executorStoped) {
  20. logger.error(e.getMessage(), e);
  21. }
  22. }
  23. try {
  24. // clean old registry-data in db
  25. xxlRegistryDataDao.cleanData(registryBeatTime * 3);
  26. // sync registry-data, db + file
  27. int offset = 0;
  28. int pagesize = 1000;
  29. List<String> registryDataFileList = new ArrayList<>();
  30. List<XxlRegistry> registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null);
  31. while (registryList!=null && registryList.size()>0) {
  32. for (XxlRegistry registryItem: registryList) {
  33. // process data by status
  34. if (registryItem.getStatus() == 1) {
  35. // locked, not updated
  36. } else if (registryItem.getStatus() == 2) {
  37. // disabled, write empty
  38. String dataJson = JacksonUtil.writeValueAsString(new ArrayList<String>());
  39. registryItem.setData(dataJson);
  40. } else {
  41. // default, sync from db
  42. List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(registryItem.getBiz(), registryItem.getEnv(), registryItem.getKey());
  43. List<String> valueList = new ArrayList<String>();
  44. if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) {
  45. for (XxlRegistryData dataItem: xxlRegistryDataList) {
  46. valueList.add(dataItem.getValue());
  47. }
  48. }
  49. String dataJson = JacksonUtil.writeValueAsString(valueList);
  50. // check update, sync db
  51. if (!registryItem.getData().equals(dataJson)) {
  52. registryItem.setData(dataJson);
  53. xxlRegistryDao.update(registryItem);
  54. }
  55. }
  56. // sync file(将最新数据同步到数据文件)
  57. String registryDataFile = setFileRegistryData(registryItem);
  58. // collect registryDataFile
  59. registryDataFileList.add(registryDataFile);
  60. }
  61. offset += 1000;
  62. registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null);
  63. }
  64. // clean old registry-data file(清除不在registryDataFileList中的无效数据文件)
  65. cleanFileRegistryData(registryDataFileList);
  66. } catch (Exception e) {
  67. if (!executorStoped) {
  68. logger.error(e.getMessage(), e);
  69. }
  70. }
  71. try {
  72. TimeUnit.SECONDS.sleep(registryBeatTime);
  73. } catch (Exception e) {
  74. if (!executorStoped) {
  75. logger.error(e.getMessage(), e);
  76. }
  77. }
  78. }
  79. }
  80. });

d、心跳检测(monitor):

  1. @Override
  2. public DeferredResult<ReturnT<String>> monitor(String accessToken, String biz, String env, List<String> keys) {
  3. // init
  4. DeferredResult deferredResult = new DeferredResult(30 * 1000L, new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor timeout, no key updated."));
  5. // valid
  6. if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
  7. deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid"));
  8. return deferredResult;
  9. }
  10. if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
  11. deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]"));
  12. return deferredResult;
  13. }
  14. if (env==null || env.trim().length()<2 || env.trim().length()>255) {
  15. deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]"));
  16. return deferredResult;
  17. }
  18. if (keys==null || keys.size()==0) {
  19. deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid."));
  20. return deferredResult;
  21. }
  22. for (String key: keys) {
  23. if (key==null || key.trim().length()<4 || key.trim().length()>255) {
  24. deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]"));
  25. return deferredResult;
  26. }
  27. }
  28. // monitor by client
  29. for (String key: keys) {
  30. String fileName = parseRegistryDataFileName(biz, env, key);
  31. List<DeferredResult> deferredResultList = registryDeferredResultMap.get(fileName);
  32. if (deferredResultList == null) {
  33. deferredResultList = new ArrayList<>();
  34. registryDeferredResultMap.put(fileName, deferredResultList);
  35. }
  36. deferredResultList.add(deferredResult);
  37. }
  38. return deferredResult;
  39. }

通过Spring提供的DeferredResult类实现自动延迟返回信息给客户端,在更新文件数据时修改deferredResultList返回信息,因此文件数据是否最新都会反应在这个结果集中。

以上就是注册中心服务端的基本实现

一般问题:如果服务提供者意外宕机如何处理:一般情况下对应用程序会设置服务停止时的一些正常的清理工作,比如调用remove接口摘除宕机的服务等;另一方便xxl-regsitry-data数据三倍于心跳时间清除,因此也可保证异常服务的正常清除。

二、注册中心Client端

Client端维护着两个重要的线程,服务注册、服务发现

1、服务注册:

  1. // registry thread
  2. registryThread = new Thread(new Runnable() {
  3. @Override
  4. public void run() {
  5. while (!registryThreadStop) {
  6. try {
  7. if (registryData.size() > 0) {
  8. boolean ret = registryBaseClient.registry(new ArrayList<XxlRegistryDataParamVO>(registryData));
  9. logger.debug(">>>>>>>>>>> xxl-registry, refresh registry data {}, registryData = {}", ret?"success":"fail",registryData);
  10. }
  11. } catch (Exception e) {
  12. if (!registryThreadStop) {
  13. logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
  14. }
  15. }
  16. try {
  17. TimeUnit.SECONDS.sleep(10);
  18. } catch (Exception e) {
  19. if (!registryThreadStop) {
  20. logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
  21. }
  22. }
  23. }
  24. logger.info(">>>>>>>>>>> xxl-registry, registryThread stoped.");
  25. }
  26. });
  27. registryThread.setName("xxl-registry, XxlRegistryClient registryThread.");
  28. registryThread.setDaemon(true);
  29. registryThread.start();

定期的将注册信息同步到注册中心,通过服务摘除自动从注册中心下线等操作,供服务提供方使用

2、服务发现:

  1. // discovery thread
  2. discoveryThread = new Thread(new Runnable() {
  3. @Override
  4. public void run() {
  5. while (!registryThreadStop) {
  6. if (discoveryData.size() == 0) {
  7. try {
  8. TimeUnit.SECONDS.sleep(3);
  9. } catch (Exception e) {
  10. if (!registryThreadStop) {
  11. logger.error(">>>>>>>>>>> xxl-registry, discoveryThread error.", e);
  12. }
  13. }
  14. } else {
  15. try {
  16. // monitor
  17. boolean monitorRet = registryBaseClient.monitor(discoveryData.keySet());
  18. // avoid fail-retry request too quick
  19. if (!monitorRet){
  20. TimeUnit.SECONDS.sleep(10);
  21. }
  22. // refreshDiscoveryData, all
  23. refreshDiscoveryData(discoveryData.keySet());
  24. } catch (Exception e) {
  25. if (!registryThreadStop) {
  26. logger.error(">>>>>>>>>>> xxl-registry, discoveryThread error.", e);
  27. }
  28. }
  29. }
  30. }
  31. logger.info(">>>>>>>>>>> xxl-registry, discoveryThread stoped.");
  32. }
  33. });
  34. discoveryThread.setName("xxl-registry, XxlRegistryClient discoveryThread.");
  35. discoveryThread.setDaemon(true);
  36. discoveryThread.start();

通过心跳检测发现服务是否可用,避免请求过快造成的问题

一般问题:注册中心意外宕机对服务提供者和服务消费者的影响,因为客户端存在缓存,注册中心宕机后一般可以正常请求服务;但此过程如出现服务宕机,就会出现一些问题,所以一般需要保证尽快恢复注册中心或者注册中心的高可用

发表评论

表情:
评论列表 (有 0 条评论,138人围观)

还没有评论,来说两句吧...

相关阅读