分布式注册中心xxl-registry源码分析
官网概述:XXL-REGISTRY 是一个轻量级分布式服务注册中心,拥有”轻量级、秒级注册上线、多环境、跨语言、跨机房”等特性。现已开放源代码,开箱即用。
基本思路:注册中心的基本的功能一般包括服务注册(registry)、服务发现(discovery)、服务摘除(remove)等;然后就涉及到服务宕机后能否及时摘除、如何高效的服务发现、注册中心数据备份等;这些任务由注册中心的服务端完成(server),另外注册中心还会提供一个客户端(client),服务提供者通过client端进行服务注册、摘除、心跳检测等,服务消费者通过client端进行服务发现;
项目结构:
设计的三张表:
a、xxlRegistry实体:
public class XxlRegistry {
private int id;
private String biz; // 业务标识
private String env; // 环境标识
private String key; // 注册Key
private String data; // 注册Value有效数据
private int status; // 状态:0-正常、1-锁定、2-禁用
其中data为服务的注册地址集合以String类型在数据库中存储如:[“address001”, “address002”]
b、xxlRegistryData实体:
public class XxlRegistryData {
private int id;
private String biz; // 业务标识
private String env; // 环境标识
private String key; // 注册Key
private String value; // 注册Value
private Date updateTime; // 更新时间
xxlRegistryData与xxlRegistry相当于是多对一的关系, 存数据的时候将biz+env+key对应的value取出来存到xxlRegistry的data中。
c、xxlRegistryMessage实体:
public class XxlRegistryMessage {
private int id;
private int type; // 消息类型:0-注册更新
private String data; // 消息内容
private Date addTime;
这里data存的是xxlRegistry信息
一、注册中心Server端
1、API:包括Admin界面管理api、Client端访问api
其中 Api的service层实现以及服务定时处理的各类线程都在XxlRegistryServiceImpl中完成:
@Service
public class XxlRegistryServiceImpl implements IXxlRegistryService, InitializingBean, DisposableBean {
private static Logger logger = LoggerFactory.getLogger(XxlRegistryServiceImpl.class);
@Resource
private IXxlRegistryDao xxlRegistryDao;
@Resource
private IXxlRegistryDataDao xxlRegistryDataDao;
@Resource
private IXxlRegistryMessageDao xxlRegistryMessageDao;
@Value("${xxl.registry.data.filepath}")
private String registryDataFilePath;
@Value("${xxl.registry.accessToken}")
private String accessToken;
private int registryBeatTime = 10;
1.1、RegistryController:
对注册中心的基本信息进行人工维护, 可以修改状态信息, 禁止自动服务注册与发现等;这里是直接针对数据库层面的修改。
1.2、ApiController:
提供服务注册、发现、摘除、心跳维护等接口
a、registry实现:
@Override
public ReturnT<String> registry(String accessToken, String biz, String env, List<XxlRegistryData> registryDataList) {
// valid
if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
}
if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]");
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
}
if (registryDataList==null || registryDataList.size()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry DataList Invalid");
}
for (XxlRegistryData registryData: registryDataList) {
if (registryData.getKey()==null || registryData.getKey().trim().length()<4 || registryData.getKey().trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Key Invalid[4~255]");
}
if (registryData.getValue()==null || registryData.getValue().trim().length()<4 || registryData.getValue().trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Value Invalid[4~255]");
}
}
// fill + add queue
for (XxlRegistryData registryData: registryDataList) {
registryData.setBiz(biz);
registryData.setEnv(env);
}
registryQueue.addAll(registryDataList);
return ReturnT.SUCCESS;
}
可以看到前面都在对参赛进行校验,后面是直接将数据存如registryQueue队列中,持久到数据库是通过开启线程同步到数据库中:
/**
* registry registry data (client-num/10 s)
*/
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
while (!executorStoped) {
try {
XxlRegistryData xxlRegistryData = registryQueue.take();
if (xxlRegistryData !=null) {
// refresh or add
int ret = xxlRegistryDataDao.refresh(xxlRegistryData);
if (ret == 0) {
xxlRegistryDataDao.add(xxlRegistryData);
}
// valid file status
XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
if (fileXxlRegistry == null) {
// go on
} else if (fileXxlRegistry.getStatus() != 0) {
continue; // "Status limited."
} else {
if (fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) {
continue; // "Repeated limited."
}
}
// checkRegistryDataAndSendMessage
checkRegistryDataAndSendMessage(xxlRegistryData);
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
}
});
}
这里就是从注册服务的队列取出数据并同步更新到xxl-registry-data表中,并将信息同步到xxl-registry和xxl-registry-message表中;
服务摘除与注册方式类似,通过接口存到队列,再另跑线程同步到表中(xxl-registry-data);服务摘除时会同步更新xxl-regsitry和xxl-registry-message表,另跑线程将xxl-registry-message的最新信息同步到数据文件中,以保证服务的最新可用性。
b、服务发现:其高效性就体现在从文件中直接读取数据
@Override
public ReturnT<Map<String, List<String>>> discovery(String accessToken, String biz, String env, List<String> keys) {
// valid
if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
return new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid");
}
if (biz==null || biz.trim().length()<2 || biz.trim().length()>255) {
return new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[2~255]");
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
return new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
}
if (keys==null || keys.size()==0) {
return new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid.");
}
for (String key: keys) {
if (key==null || key.trim().length()<4 || key.trim().length()>255) {
return new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]");
}
}
Map<String, List<String>> result = new HashMap<String, List<String>>();
for (String key: keys) {
XxlRegistryData xxlRegistryData = new XxlRegistryData();
xxlRegistryData.setBiz(biz);
xxlRegistryData.setEnv(env);
xxlRegistryData.setKey(key);
List<String> dataList = new ArrayList<String>();
XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
if (fileXxlRegistry!=null) {
dataList = fileXxlRegistry.getDataList();
}
result.put(key, dataList);
}
return new ReturnT<Map<String, List<String>>>(result);
}
文件中的注册信息通过xxl-registry-message表中同步而来,并定时清除表中
/**
* broadcase new one registry-data-file (1/1s)
*
* clean old message (1/10s)
*/
executorService.execute(new Runnable() {
@Override
public void run() {
while (!executorStoped) {
try {
// new message, filter readed
List<XxlRegistryMessage> messageList = xxlRegistryMessageDao.findMessage(readedMessageIds);
if (messageList!=null && messageList.size()>0) {
for (XxlRegistryMessage message: messageList) {
readedMessageIds.add(message.getId());
if (message.getType() == 0) { // from registry、add、update、deelete,ne need sync from db, only write
XxlRegistry xxlRegistry = JacksonUtil.readValue(message.getData(), XxlRegistry.class);
// process data by status
if (xxlRegistry.getStatus() == 1) {
// locked, not updated
} else if (xxlRegistry.getStatus() == 2) {
// disabled, write empty
xxlRegistry.setData(JacksonUtil.writeValueAsString(new ArrayList<String>()));
} else {
// default, sync from db (aready sync before message, only write)
}
// sync fileI(将数据同步到文件中)
setFileRegistryData(xxlRegistry);
}
}
}
// clean old message;
if ( (System.currentTimeMillis()/1000) % registryBeatTime ==0) {
xxlRegistryMessageDao.cleanMessage(registryBeatTime);
readedMessageIds.clear();
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
}
});
c、定时清理xxl-registry-data数据表、同步xxl-regsitry-data到xxl-regsitry以及数据文件、清理无效数据文件
/**
* clean old registry-data (1/10s)
*
* sync total registry-data db + file (1+N/10s)
*
* clean old registry-data file
*/
executorService.execute(new Runnable() {
@Override
public void run() {
while (!executorStoped) {
// align to beattime
try {
long sleepSecond = registryBeatTime - (System.currentTimeMillis()/1000)%registryBeatTime;
if (sleepSecond>0 && sleepSecond<registryBeatTime) {
TimeUnit.SECONDS.sleep(sleepSecond);
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
try {
// clean old registry-data in db
xxlRegistryDataDao.cleanData(registryBeatTime * 3);
// sync registry-data, db + file
int offset = 0;
int pagesize = 1000;
List<String> registryDataFileList = new ArrayList<>();
List<XxlRegistry> registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null);
while (registryList!=null && registryList.size()>0) {
for (XxlRegistry registryItem: registryList) {
// process data by status
if (registryItem.getStatus() == 1) {
// locked, not updated
} else if (registryItem.getStatus() == 2) {
// disabled, write empty
String dataJson = JacksonUtil.writeValueAsString(new ArrayList<String>());
registryItem.setData(dataJson);
} else {
// default, sync from db
List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(registryItem.getBiz(), registryItem.getEnv(), registryItem.getKey());
List<String> valueList = new ArrayList<String>();
if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) {
for (XxlRegistryData dataItem: xxlRegistryDataList) {
valueList.add(dataItem.getValue());
}
}
String dataJson = JacksonUtil.writeValueAsString(valueList);
// check update, sync db
if (!registryItem.getData().equals(dataJson)) {
registryItem.setData(dataJson);
xxlRegistryDao.update(registryItem);
}
}
// sync file(将最新数据同步到数据文件)
String registryDataFile = setFileRegistryData(registryItem);
// collect registryDataFile
registryDataFileList.add(registryDataFile);
}
offset += 1000;
registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null);
}
// clean old registry-data file(清除不在registryDataFileList中的无效数据文件)
cleanFileRegistryData(registryDataFileList);
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.SECONDS.sleep(registryBeatTime);
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
}
});
d、心跳检测(monitor):
@Override
public DeferredResult<ReturnT<String>> monitor(String accessToken, String biz, String env, List<String> keys) {
// init
DeferredResult deferredResult = new DeferredResult(30 * 1000L, new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor timeout, no key updated."));
// valid
if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid"));
return deferredResult;
}
if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]"));
return deferredResult;
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]"));
return deferredResult;
}
if (keys==null || keys.size()==0) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid."));
return deferredResult;
}
for (String key: keys) {
if (key==null || key.trim().length()<4 || key.trim().length()>255) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]"));
return deferredResult;
}
}
// monitor by client
for (String key: keys) {
String fileName = parseRegistryDataFileName(biz, env, key);
List<DeferredResult> deferredResultList = registryDeferredResultMap.get(fileName);
if (deferredResultList == null) {
deferredResultList = new ArrayList<>();
registryDeferredResultMap.put(fileName, deferredResultList);
}
deferredResultList.add(deferredResult);
}
return deferredResult;
}
通过Spring提供的DeferredResult类实现自动延迟返回信息给客户端,在更新文件数据时修改deferredResultList返回信息,因此文件数据是否最新都会反应在这个结果集中。
以上就是注册中心服务端的基本实现
一般问题:如果服务提供者意外宕机如何处理:一般情况下对应用程序会设置服务停止时的一些正常的清理工作,比如调用remove接口摘除宕机的服务等;另一方便xxl-regsitry-data数据三倍于心跳时间清除,因此也可保证异常服务的正常清除。
二、注册中心Client端
Client端维护着两个重要的线程,服务注册、服务发现
1、服务注册:
// registry thread
registryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!registryThreadStop) {
try {
if (registryData.size() > 0) {
boolean ret = registryBaseClient.registry(new ArrayList<XxlRegistryDataParamVO>(registryData));
logger.debug(">>>>>>>>>>> xxl-registry, refresh registry data {}, registryData = {}", ret?"success":"fail",registryData);
}
} catch (Exception e) {
if (!registryThreadStop) {
logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!registryThreadStop) {
logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-registry, registryThread stoped.");
}
});
registryThread.setName("xxl-registry, XxlRegistryClient registryThread.");
registryThread.setDaemon(true);
registryThread.start();
定期的将注册信息同步到注册中心,通过服务摘除自动从注册中心下线等操作,供服务提供方使用
2、服务发现:
// discovery thread
discoveryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!registryThreadStop) {
if (discoveryData.size() == 0) {
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
if (!registryThreadStop) {
logger.error(">>>>>>>>>>> xxl-registry, discoveryThread error.", e);
}
}
} else {
try {
// monitor
boolean monitorRet = registryBaseClient.monitor(discoveryData.keySet());
// avoid fail-retry request too quick
if (!monitorRet){
TimeUnit.SECONDS.sleep(10);
}
// refreshDiscoveryData, all
refreshDiscoveryData(discoveryData.keySet());
} catch (Exception e) {
if (!registryThreadStop) {
logger.error(">>>>>>>>>>> xxl-registry, discoveryThread error.", e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-registry, discoveryThread stoped.");
}
});
discoveryThread.setName("xxl-registry, XxlRegistryClient discoveryThread.");
discoveryThread.setDaemon(true);
discoveryThread.start();
通过心跳检测发现服务是否可用,避免请求过快造成的问题
一般问题:注册中心意外宕机对服务提供者和服务消费者的影响,因为客户端存在缓存,注册中心宕机后一般可以正常请求服务;但此过程如出现服务宕机,就会出现一些问题,所以一般需要保证尽快恢复注册中心或者注册中心的高可用
还没有评论,来说两句吧...