美团Leaf源码——号段模式源码解析 £神魔★判官ぃ 2022-02-01 05:39 901阅读 1赞 ## 前言 ## 分布式ID生成策略基本要求就是全局不重复,最好还能递增,长度较短,性能高,可用性强。关于相关的实现方案有很多,本文着重使用美团开源的分布式ID生成解决方案——Leaf。 关于Leaf,美团官方的介绍文档主要如下,强烈建议阅读文章大致了解Leaf的工作流程与原理,这对本文后续的源码解析有很大的帮助。 1. [Leaf:美团分布式ID生成服务开源][Leaf_ID] 2. [Leaf——美团点评分布式ID生成系统][Leaf_ID 1] 本系列Leaf源码解析部分按照使用的方式也分为号段模式和snowflake模式两篇文章,本文就来着重研究号段模式的源码实现。 本文的Leaf源码注释地址:[https://github.com/MrSorrow/Leaf][https_github.com_MrSorrow_Leaf] ## I. 导入项目 ## Leaf由Maven构建,源码地址:[https://github.com/Meituan-Dianping/Leaf][https_github.com_Meituan-Dianping_Leaf] 首先先Fork官方仓库到自己的仓库,我的源码注释版:[https://github.com/MrSorrow/Leaf][https_github.com_MrSorrow_Leaf] 下载源码,导入IDEA,导入成功依赖下载完成后项目结构大致如下: ![项目框架][20190513161616625.png] ## II. 测试号段模式 ## #### 「创建数据库表」 #### DROP TABLE IF EXISTS `leaf_alloc`; CREATE TABLE `leaf_alloc` ( `biz_tag` varchar(128) NOT NULL DEFAULT '' COMMENT '业务key', `max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '当前已经分配了的最大id', `step` int(11) NOT NULL COMMENT '初始步长,也是动态调整的最小步长', `description` varchar(256) DEFAULT NULL COMMENT '业务key的描述', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据库维护的更新时间', PRIMARY KEY (`biz_tag`) ) ENGINE=InnoDB; #### 「开启号段模式」 #### leaf.name=com.sankuai.leaf.opensource.test leaf.segment.enable=true leaf.jdbc.url=jdbc:mysql://localhost:3306/leaf_test?useUnicode=true&characterEncoding=utf8&characterSetResults=utf8 leaf.jdbc.username=root leaf.jdbc.password=1234 leaf.snowflake.enable=false #leaf.snowflake.zk.address= #leaf.snowflake.port= #### 「测试号段模式」 #### 开启号段模式并配置好数据库连接后,点击启动 `leaf-server` 模块的 `LeafServerApplication`,将服务跑起来。 浏览器输入[http://localhost:8080/api/segment/get/leaf-segment-test][http_localhost_8080_api_segment_get_leaf-segment-test]来获取分布式递增id; ![获取id][id] 监控号段模式:[http://localhost:8080/cache][http_localhost_8080_cache] ![监控页面][20190513165741933.png] 数据库表: ![数据库][2019051317572554.png] ## III. 号段模式源码解析 ## 正式进入源码前,再强烈建议阅读官方的两篇博客,对Leaf的号段模式工作模式有个大致的理解。 我们从[http://localhost:8080/api/segment/get/leaf-segment-test][http_localhost_8080_api_segment_get_leaf-segment-test]入口来分析。该请求会交由 `com.sankuai.inf.leaf.server.LeafController` 处理: @Autowired SegmentService segmentService; /** * 号段模式获取id * @param key 对应数据库表的biz_tag * @return */ @RequestMapping(value = "/api/segment/get/{key}") public String getSegmentID(@PathVariable("key") String key) { // 核心是segmentService的getId方法 return get(key, segmentService.getId(key)); } private String get(@PathVariable("key") String key, Result id) { Result result; if (key == null || key.isEmpty()) { throw new NoKeyException(); } result = id; if (result.getStatus().equals(Status.EXCEPTION)) { throw new LeafServerException(result.toString()); } return String.valueOf(result.getId()); } 可以看到主要是调用 `SegmentService` 的 `getId(key)` 方法。`key` 参数其实就是路径上对应的 `leaf-segment-test`,也就是数据库对应的 `biz_tag`。 `getId(key)` 方法返回的是 `com.sankuai.inf.leaf.common.Result` 对象,封装了 `id` 和 状态 `status`: public class Result { private long id; private Status status; // getter and setter.... } public enum Status { SUCCESS, EXCEPTION } ### 创建SegmentService ### 我们进入 `SegmentService` 类中,再调用 `getId(key)` 方法之前,我们先看一下 `SegmentService` 类的实例化构造函数逻辑。可以看到: package com.sankuai.inf.leaf.server; @Service("SegmentService") public class SegmentService { private Logger logger = LoggerFactory.getLogger(SegmentService.class); IDGen idGen; DruidDataSource dataSource; /** * 构造函数,注入单例SegmentService时,完成以下几件事: * 1. 加载leaf.properties配置文件解析配置 * 2. 创建Druid dataSource * 3. 创建IDAllocDao * 4. 创建ID生成器实例SegmentIDGenImpl并初始化 * @throws SQLException * @throws InitException */ public SegmentService() throws SQLException, InitException { // 1. 加载leaf.properties配置文件 Properties properties = PropertyFactory.getProperties(); // 是否开启号段模式 boolean flag = Boolean.parseBoolean(properties.getProperty(Constants.LEAF_SEGMENT_ENABLE, "true")); if (flag) { // 2. 创建Druid dataSource dataSource = new DruidDataSource(); dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL)); dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME)); dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD)); dataSource.init(); // 3. 创建Dao IDAllocDao dao = new IDAllocDaoImpl(dataSource); // 4. 创建ID生成器实例SegmentIDGenImpl idGen = new SegmentIDGenImpl(); ((SegmentIDGenImpl) idGen).setDao(dao); // 初始化SegmentIDGenImpl(加载db的tags至内存cache中,并开启定时同步更新任务) if (idGen.init()) { logger.info("Segment Service Init Successfully"); } else { throw new InitException("Segment Service Init Fail"); } } else { // ZeroIDGen一直返回id=0 idGen = new ZeroIDGen(); logger.info("Zero ID Gen Service Init Successfully"); } } /** * 根据key获取id * @param key * @return */ public Result getId(String key) { return idGen.get(key); } /** * 获取号段模式id生成器SegmentIDGenImpl * @return */ public SegmentIDGenImpl getIdGen() { if (idGen instanceof SegmentIDGenImpl) { return (SegmentIDGenImpl) idGen; } return null; } } `SegmentService` 类的构造函数,主要完成以下几件事: 1. 加载 `leaf.properties` 配置文件,并解析配置 2. 创建 `Druid` 数据源对象 `dataSource` 3. 创建 `IDAllocDao` 接口实例 `IDAllocDaoImpl` 4. 创建ID生成器实例 `SegmentIDGenImpl` 并初始化 #### ① 解析leaf.properties配置文件 #### 通过 `PropertyFactory` 读取了 `leaf.properties` 配置文件并进行解析。其中所以的key-value配置信息最终封装为 `Properties` 中。 /** * 加载leaf.properties配置文件中配置信息 */ public class PropertyFactory { private static final Logger logger = LoggerFactory.getLogger(PropertyFactory.class); private static final Properties prop = new Properties(); static { try { prop.load(PropertyFactory.class.getClassLoader().getResourceAsStream("leaf.properties")); logger.debug("Load leaf.properties successfully!"); } catch (IOException e) { logger.warn("Load Properties Ex", e); } } public static Properties getProperties() { return prop; } } #### ② 手动创建数据源 #### 解析完配置文件后需要判断是否开启号段模式: // 是否开启号段模式 boolean flag = Boolean.parseBoolean(properties.getProperty(Constants.LEAF_SEGMENT_ENABLE, "true")); if (flag) { // 2. 创建Druid dataSource dataSource = new DruidDataSource(); dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL)); dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME)); dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD)); dataSource.init(); ······ } else { // ZeroIDGen一直返回id=0 idGen = new ZeroIDGen(); logger.info("Zero ID Gen Service Init Successfully"); } 如果没有开启号段模式,则创建默认返回id为0的id生成器 `ZeroIDGen`。 public class ZeroIDGen implements IDGen { @Override public Result get(String key) { return new Result(0, Status.SUCCESS); } @Override public boolean init() { return true; } } 第二步主要通过配置文件配置的数据库连接信息,手动创建出数据源 `DruidDataSource`。 #### ③ 创建IDAllocDaoImpl #### 我们先来查看 `IDAllocDao` 接口中的方法。 public interface IDAllocDao { List<LeafAlloc> getAllLeafAllocs(); LeafAlloc updateMaxIdAndGetLeafAlloc(String tag); LeafAlloc updateMaxIdByCustomStepAndGetLeafAlloc(LeafAlloc leafAlloc); List<String> getAllTags(); } 再查看 `IDAllocDaoImpl` 实现类对应的方法实现。 public class IDAllocDaoImpl implements IDAllocDao { SqlSessionFactory sqlSessionFactory; public IDAllocDaoImpl(DataSource dataSource) { // 手动初始化sqlSessionFactory TransactionFactory transactionFactory = new JdbcTransactionFactory(); Environment environment = new Environment("development", transactionFactory, dataSource); Configuration configuration = new Configuration(environment); configuration.addMapper(IDAllocMapper.class); sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration); } /** * 获取所有的业务key对应的发号配置 * @return */ @Override public List<LeafAlloc> getAllLeafAllocs() { SqlSession sqlSession = sqlSessionFactory.openSession(false); try { return sqlSession.selectList("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.getAllLeafAllocs"); } finally { sqlSession.close(); } } /** * 更新数据库的最大id值,并返回LeafAlloc * @param tag * @return */ @Override public LeafAlloc updateMaxIdAndGetLeafAlloc(String tag) { SqlSession sqlSession = sqlSessionFactory.openSession(); try { // 更新tag对应记录中的max_id,max_id = max_id + step,step为数据库中设置的step sqlSession.update("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.updateMaxId", tag); // 获取更新完的记录,封装成LeafAlloc对象返回 LeafAlloc result = sqlSession.selectOne("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.getLeafAlloc", tag); // 提交事务 sqlSession.commit(); return result; } finally { sqlSession.close(); } } /** * 依据动态调整的step值,更新DB的最大id值,并返回更新后的记录 * @param leafAlloc * @return */ @Override public LeafAlloc updateMaxIdByCustomStepAndGetLeafAlloc(LeafAlloc leafAlloc) { SqlSession sqlSession = sqlSessionFactory.openSession(); try { sqlSession.update("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.updateMaxIdByCustomStep", leafAlloc); LeafAlloc result = sqlSession.selectOne("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.getLeafAlloc", leafAlloc.getKey()); sqlSession.commit(); return result; } finally { sqlSession.close(); } } /** * 从数据库查询出所有的biz_tag * @return */ @Override public List<String> getAllTags() { // 设置false,表示手动事务 SqlSession sqlSession = sqlSessionFactory.openSession(false); try { return sqlSession.selectList("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.getAllTags"); } finally { sqlSession.close(); } } } 对于接口的四个方法的作用都有详细的注释,读者大致有个印象,我们后面解析获取id流程时会继续详细查看。比较陌生的应该是方法的返回实体类 `LeafAlloc`,其他它就是对应着数据库表。 /** * 分配bean,和数据库表记录基本对应 */ public class LeafAlloc { private String key; // 对应biz_tag private long maxId; // 对应最大id private int step; // 对应步长 private String updateTime; // 对应更新时间 // getter and setter } 我们先来看一下这一步创建 `IDAllocDaoImpl` 中构造函数的逻辑,可以看到主要是按照使用MyBatis的流程创建出 `SqlSessionFactory` 对象。 #### ④ 创建并初始化ID生成器 #### 先来查看ID生成器接口: public interface IDGen { /** * 获取指定key下一个id * @param key * @return */ Result get(String key); /** * 初始化 * @return */ boolean init(); } 接口主要包含两个方法,分别是获取指定key的下一个id值,和初始化生成器的方法。 该接口的实现类有三个,分别是号段模式、snowflake以及默认一直返回0的生成器。 ![ID生成器][ID] **创建号段模式ID生成器** `com.sankuai.inf.leaf.segment.SegmentIDGenImpl` 是我们分析整个流程的重点,我们先来简单的查看其内部几个重要的成员变量: /** * 号段模式ID生成器 */ public class SegmentIDGenImpl implements IDGen { ······· /** * 最大步长不超过100,0000 */ private static final int MAX_STEP = 1000000; /** * 一个Segment维持时间为15分钟 */ private static final long SEGMENT_DURATION = 15 * 60 * 1000L; /** * 线程池,用于执行异步任务,比如异步准备双buffer中的另一个buffer */ private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory()); /** * 标记自己是否初始化完毕 */ private volatile boolean initOK = false; /** * cache,存储所有业务key对应双buffer号段,所以是基于内存的发号方式 */ private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>(); /** * 查询数据库的dao */ private IDAllocDao dao; ········ } `cache` 是号段模式基于内存发号的关键,它是一个key为数据库表中不同业务的tag,value是一个 `SegmentBuffer` 对象,如果阅读过官方的博客可以知道双 `buffer` 优化的事情,这里的`SegmentBuffer` 对象就是封装了两个 `Segment` 号段的数据结构。 回到 `SegmentService` 构造函数的第四步中来,创建 `SegmentIDGenImpl` 实例时使用的是默认构造函数,紧接着将第三步创建数据库 `dao` 注入进 `SegmentIDGenImpl` 。然后调用生成器的初始化方法。 // 4. 创建ID生成器实例SegmentIDGenImpl idGen = new SegmentIDGenImpl(); ((SegmentIDGenImpl) idGen).setDao(dao); // 初始化SegmentIDGenImpl(加载db的tags至内存cache中,并开启定时同步更新任务) if (idGen.init()) { logger.info("Segment Service Init Successfully"); } else { throw new InitException("Segment Service Init Fail"); } **初始化号段模式ID生成器** 我们查看 `SegmentIDGenImpl` 的初始化方法逻辑,可以看到主要调用了两个方法,并且设置了自己的初始化标记为OK状态。如果没有初始化成功,会抛出异常,这在上面代码可以看出。 @Override public boolean init() { logger.info("Init ..."); // 确保加载到kv后才初始化成功 updateCacheFromDb(); initOK = true; // 定时1min同步一次db和cache updateCacheFromDbAtEveryMinute(); return initOK; } 我们具体来查看 `updateCacheFromDb()` 和 `updateCacheFromDbAtEveryMinute()` 方法逻辑。通过方法名其实我们可以推测方法含义是从数据库中取出数据更新 `cache`,第二个方法则是一个定时任务,每分钟都执行一遍第一个方法。我们具体查看一下。 /** * 将数据库表中的tags同步到cache中 */ private void updateCacheFromDb() { logger.info("update cache from db"); StopWatch sw = new Slf4JStopWatch(); try { // 获取数据库表中所有的biz_tag List<String> dbTags = dao.getAllTags(); if (dbTags == null || dbTags.isEmpty()) { return; } // 获取当前的cache中所有的tag List<String> cacheTags = new ArrayList<String>(cache.keySet()); // 数据库中的tag List<String> insertTags = new ArrayList<String>(dbTags); List<String> removeTags = new ArrayList<String>(cacheTags); // 下面两步操作:保证cache和数据库tags同步 // 1. cache新增上数据库表后添加的tags // 2. cache删除掉数据库表后删除的tags // 1. db中新加的tags灌进cache,并实例化初始对应的SegmentBuffer insertTags.removeAll(cacheTags); for (String tag : insertTags) { SegmentBuffer buffer = new SegmentBuffer(); buffer.setKey(tag); // 零值初始化当前正在使用的Segment号段 Segment segment = buffer.getCurrent(); segment.setValue(new AtomicLong(0)); segment.setMax(0); segment.setStep(0); cache.put(tag, buffer); logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer); } // 2. cache中已失效的tags从cache删除 removeTags.removeAll(dbTags); for (String tag : removeTags) { cache.remove(tag); logger.info("Remove tag {} from IdCache", tag); } } catch (Exception e) { logger.warn("update cache from db exception", e); } finally { sw.stop("updateCacheFromDb"); } } 首先通过dao层查询出数据库表中最新的所有的 `biz_tag`,紧接着就是同步数据库中的 `tags` 和内存中的 `cache`。同步的方式包含两步操作: 1. 插入 `cache` 中不存在但是数据库新增的 `biz_tag` ; 2. 删除 `cache` 中仍然存在但是数据库表中已经删除的`biz_tag`。 上面这段代码主要完成的就是这两步操作,代码逻辑仔细阅读还是比较清晰的,配合注释读者可以相应理解,不再赘述。 需要额外提及的是 `cache` 的key我们已经知道是 `biz_tag`,但value我们仅仅知道是封装了两个 `Segment` 号段的 `SegmentBuffer`。我们具体来看看 `SegmentBuffer` 的定义。 /** * 双buffer——双号段 * 双Buffer的方式,保证无论何时DB出现问题,都能有一个Buffer的号段可以正常对外提供服务 * 只要DB在一个Buffer的下发的周期内恢复,就不会影响整个Leaf的可用性 */ public class SegmentBuffer { private String key; // 数据库的业务tag private Segment[] segments; //双buffer,双号段 private volatile int currentPos; //当前的使用的segment的index private volatile boolean nextReady; //下一个segment是否处于可切换状态 private volatile boolean initOk; //是否DB数据初始化完成 private final AtomicBoolean threadRunning; //线程是否在运行中 private final ReadWriteLock lock; // 读写锁 private volatile int step; // 动态调整的step private volatile int minStep; // 最小step private volatile long updateTimestamp; // 更新时间戳 public SegmentBuffer() { // 创建双号段,能够异步准备,并切换 segments = new Segment[]{ new Segment(this), new Segment(this)}; currentPos = 0; nextReady = false; initOk = false; threadRunning = new AtomicBoolean(false); lock = new ReentrantReadWriteLock(); } public int nextPos() { return (currentPos + 1) % 2; } public void switchPos() { currentPos = nextPos(); } public Lock rLock() { return lock.readLock(); } public Lock wLock() { return lock.writeLock(); } } 可以看见 `SegmentBuffer` 中包含了一个号段数组,包含两个 `Segment`,每一次只用一个,另一个异步的准备好,等到当前号段用完,就可以切换另一个,像Young GC的两个Survivor区倒来倒去的思想。我们再来看一下号段 `Segment` 的定义。 /** * 号段类 */ public class Segment { /** * 内存生成的每一个id号 */ private AtomicLong value = new AtomicLong(0); /** * 当前号段允许的最大id值 */ private volatile long max; /** * 步长,会根据数据库的step动态调整 */ private volatile int step; /** * 当前号段所属的SegmentBuffer */ private SegmentBuffer buffer; public Segment(SegmentBuffer buffer) { this.buffer = buffer; } /** * 获取号段的剩余量 * @return */ public long getIdle() { return this.getMax() - getValue().get(); } } `value` 就是用来产生id值的,它是一个 `AtomicLong` 类型,多线程下可以利用它的一些原子API操作。`max` 则代表自己(号段对象)能产生的最大的id值,也就是value的上限,用完了就需要切换号段,自己重新从数据库获取下一个号段区间。`step` 是动态调整的步长,关于动态调整,官方博客也有所解释,这里先不赘述。当自己用完了,就需要从数据库请求新的号段区间,区间大小就是由这个 `step` 决定的。 介绍完Leaf的号段,双Buffer数据结构后,我们回过头查看同步DB到 `cache` 的逻辑中插入新的 `SegmentBuffer` 是如何创建的。 for (String tag : insertTags) { SegmentBuffer buffer = new SegmentBuffer(); buffer.setKey(tag); // 零值初始化当前正在使用的Segment号段 Segment segment = buffer.getCurrent(); segment.setValue(new AtomicLong(0)); segment.setMax(0); segment.setStep(0); cache.put(tag, buffer); } 可以看到对于 `SegmentBuffer` 我们仅仅设置了key,然后就是依靠 `SegmentBuffer` 自身的构造函数对其内部成员进行了默认初始化,也可以说是零值初始化。特别注意,此时 `SegmentBuffer` 的 `initOk` 标记还是 `false`,这也说明这个标记其实并不是标记零值初始化是否完成。然后程序接着对0号 `Segment` 的所有成员进行了零值初始化。 同步完成后,即将数据库中的所有 `tags` 记录加载到内存后,便将ID生成器的初始化标记设置为 `true`。 我们再来查看 `updateCacheFromDbAtEveryMinute()` 方法逻辑。 /** * 每分钟同步db到cache */ private void updateCacheFromDbAtEveryMinute() { ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("check-idCache-thread"); t.setDaemon(true); return t; } }); service.scheduleWithFixedDelay(new Runnable() { @Override public void run() { updateCacheFromDb(); } }, 60, 60, TimeUnit.SECONDS); } 可以看到方法中创建了一个定时执行任务线程池,任务就是 `updateCacheFromDb()`,也就是上面那个方法,定时时间为60s,也就是1min。 ### 获取ID ### 上一小节我们主要是在分析创建 `SegmentService` 过程中做了哪些事情,总结下来最重要的就是从数据库表中准备好 `cache`, `cache` 中包含每个key对应的双号段,经过第一部分已经零值初始化好双号段的当前使用号段。接下来我们继续分析 `SegmentService` 的 `getId()` 方法,我们的控制层就是通过该方法获取id的。 /** * 根据key获取id * @param key * @return */ public Result getId(String key) { return idGen.get(key); } 再次分析号段生成器 `SegmentIDGenImpl` 的 `get()` 方法。 /** * 获取对应key的下一个id值 * @param key * @return */ @Override public Result get(final String key) { // 必须在 SegmentIDGenImpl 初始化后执行init()方法 // 也就是必须将数据库中的tags加载到内存cache中,并开启定时同步任务 if (!initOK) { return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION); } if (cache.containsKey(key)) { // 获取cache中对应的SegmentBuffer,SegmentBuffer中包含双buffer,两个号段 SegmentBuffer buffer = cache.get(key); // 双重判断,避免多线程重复执行SegmentBuffer的初始化值操作 // 在get id前检查是否完成DB数据初始化cache中key对应的的SegmentBuffer(之前只是零值初始化),需要保证线程安全 if (!buffer.isInitOk()) { synchronized (buffer) { if (!buffer.isInitOk()) { // DB数据初始化SegmentBuffer try { // 根据数据库表中key对应的记录 来初始化SegmentBuffer当前正在使用的Segment updateSegmentFromDb(key, buffer.getCurrent()); logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent()); buffer.setInitOk(true); } catch (Exception e) { logger.warn("Init buffer {} exception", buffer.getCurrent(), e); } } } } // SegmentBuffer准备好之后正常就直接从cache中生成id即可 return getIdFromSegmentBuffer(cache.get(key)); } // cache中不存在对应的key,则返回异常错误 return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION); } 首先先从 `cache` 中获取 `key` 对应的 `SegmentBuffer`,然后判断 `SegmentBuffer` 是否是初始化完成,也就是 `SegmentBuffer` 的 `initOk` 标记。这里用了双重判断+`synchronized` 方式确保 `SegmentBuffer` 只被初始化一次。那么这里初始化究竟是指什么,才算初始化完成呢? #### ① 初始化SegmentBuffer #### 初始化 `SegmentBuffer` 的核心逻辑就是调用下面这个方法。 // 根据数据库表中key对应的记录 来初始化SegmentBuffer当前正在使用的Segment updateSegmentFromDb(key, buffer.getCurrent()); 查看方法名,也可以知道是从数据库表查询数据更新号段 `Segment`,对于号段初始状态来说,该方法含义可以理解为初始化 `Segment` 的值,对于用完的号段来讲,可以理解为从数据库获取下一号段值。 所以这里初始化是指DB数据初始化当前号段,初始化完成就标记 `SegmentBuffer` 的 `initOk` 为 `true`,也就表明 `SegmentBuffer` 中有一个号段已经准备完成了。 我们具体查看 `updateSegmentFromDb(key, buffer.getCurrent())` 方法: /** * 从数据库表中读取数据更新SegmentBuffer中的Segment * @param key * @param segment */ public void updateSegmentFromDb(String key, Segment segment) { StopWatch sw = new Slf4JStopWatch(); /** * 1. 先设置SegmentBuffer */ // 获取Segment号段所属的SegmentBuffer SegmentBuffer buffer = segment.getBuffer(); LeafAlloc leafAlloc; // 如果buffer没有DB数据初始化(也就是第一次进行DB数据初始化) if (!buffer.isInitOk()) { // 更新数据库中key对应记录的maxId(maxId表示当前分配到的最大id,maxId=maxId+step),并查询更新后的记录返回 leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); // 数据库初始设置的step赋值给当前buffer的初始step,后面后动态调整 buffer.setStep(leafAlloc.getStep()); // leafAlloc中的step为DB中设置的step,buffer这里是未进行DB数据初始化的,所以DB中step代表动态调整的最小下限 buffer.setMinStep(leafAlloc.getStep()); } // 如果buffer的更新时间是0(初始是0,也就是第二次调用updateSegmentFromDb()) else if (buffer.getUpdateTimestamp() == 0) { // 更新数据库中key对应记录的maxId(maxId表示当前分配到的最大id,maxId=maxId+step),并查询更新后的记录返回 leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); // 记录buffer的更新时间 buffer.setUpdateTimestamp(System.currentTimeMillis()); // leafAlloc中的step为DB中的step buffer.setMinStep(leafAlloc.getStep()); } // 第三次以及之后的进来 动态设置nextStep else { // 计算当前更新操作和上一次更新时间差 long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp(); int nextStep = buffer.getStep(); /** * 动态调整step * 1) duration < 15 分钟 : step 变为原来的2倍, 最大为 MAX_STEP * 2) 15分钟 <= duration < 30分钟 : nothing * 3) duration >= 30 分钟 : 缩小step, 最小为DB中配置的step * * 这样做的原因是认为15min一个号段大致满足需求 * 如果updateSegmentFromDb()速度频繁(15min多次),也就是 * 如果15min这个时间就把step号段用完,为了降低数据库访问频率,我们可以扩大step大小 * 相反如果将近30min才把号段内的id用完,则可以缩小step */ // duration < 15 分钟 : step 变为原来的2倍. 最大为 MAX_STEP if (duration < SEGMENT_DURATION) { if (nextStep * 2 > MAX_STEP) { //do nothing } else { // 步数 * 2 nextStep = nextStep * 2; } } // 15分钟 < duration < 30分钟 : nothing else if (duration < SEGMENT_DURATION * 2) { //do nothing with nextStep } // duration > 30 分钟 : 缩小step ,最小为DB中配置的步数 else { nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep; } logger.info("leafKey[{}], dbStep[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep); /** * 根据动态调整的nextStep更新数据库相应的maxId */ // 为了高效更新记录,创建一个LeafAlloc,仅设置必要的字段的信息 LeafAlloc temp = new LeafAlloc(); temp.setKey(key); temp.setStep(nextStep); // 根据动态调整的step更新数据库的maxId leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp); // 记录更新时间 buffer.setUpdateTimestamp(System.currentTimeMillis()); // 记录当前buffer的动态调整的step值 buffer.setStep(nextStep); // leafAlloc的step为DB中的step,所以DB中的step值代表着下限 buffer.setMinStep(leafAlloc.getStep()); } /** * 2. 准备当前Segment号段 */ // 设置Segment号段id的起始值,value就是id(start=max_id-step) long value = leafAlloc.getMaxId() - buffer.getStep(); // must set value before set max(https://github.com/Meituan-Dianping/Leaf/issues/16) segment.getValue().set(value); segment.setMax(leafAlloc.getMaxId()); segment.setStep(buffer.getStep()); sw.stop("updateSegmentFromDb", key + " " + segment); } 这个函数的逻辑非常重要,还包含了动态调整步长的逻辑。首先,该方法被调用的时机我们需要明确,每当我们需要从数据库获取一个号段才会被调用。方法的第一部分主要先通过数据库并设置 `SegmentBuffer` 相关值,第二部分再准备 `Segment`。 第一部分的逻辑按照调用该方法的次数分为第一次准备号段、第二次准备号段和第三次及之后的准备号段。 1. 第一次准备号段,也就是 `SegmentBuffer` 还没有DB初始化,我们要从数据库获取一个号段,记录 `SegmentBuffer` 的当前步长、最小步长都是数据库设置的步长; 2. 第二次准备号段,也就是双buffer的异步准备另一个号段 `Segment` 时,会进入这一逻辑分支。仍然从数据库获取一个号段,此时记录这次获取下一个号段的时间戳,设置最小步长是数据库设置的步长; 3. 之后再次准备号段,首先要动态调整这次申请号段的区间大小,也就是代码中的 `nextStep`,调整规则主要跟号段申请频率有关,具体可以查看注释以及代码。计算出动态调整的步长,需要根据新的步长去数据库申请号段,同时记录这次获取号段的时间戳,保存动态调整的步长到 `SegmentBuffer`,设置最小步长是数据库设置的步长。 第二部分逻辑主要是准备 `Segment` 号段,将 `Segment` 号段的四个成员变量进行新一轮赋值,`value` 就是 `id`(`start=max_id-step`)。 #### ② 从号段中获取id #### 当 `SegmentBuffer` 和 其中一个号段 `Segment` 准备好,就可以进行从号段中获取id。我们具体查看号段ID生成器 `SegmentIDGenImpl` 的 `getIdFromSegmentBuffer()` 方法。 /** * 从SegmentBuffer生成id返回 * @param buffer * @return */ public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) { // 自旋获取id while (true) { try { // 获取buffer的共享读锁,在平时不操作Segment的情况下益于并发 buffer.rLock().lock(); // 获取当前正在使用的Segment final Segment segment = buffer.getCurrent(); // ===============异步准备双buffer的另一个Segment============== // 1. 另一个Segment没有准备好 // 2. 当前Segment已经使用超过10%则开始异步准备另一个Segment // 3. buffer中的threadRunning字段. 代表是否已经提交线程池运行,是否有其他线程已经开始进行另外号段的初始化工作.使用CAS进行更新保证buffer在任意时刻,只会有一个线程进行异步更新另外一个号段. if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) { // 线程池异步执行【准备Segment】任务 service.execute(new Runnable() { @Override public void run() { // 获得另一个Segment对象 Segment next = buffer.getSegments()[buffer.nextPos()]; boolean updateOk = false; try { // 从数据库表中准备Segment updateSegmentFromDb(buffer.getKey(), next); updateOk = true; logger.info("update segment {} from db {}", buffer.getKey(), next); } catch (Exception e) { logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e); } finally { // 如果准备成功,则通过独占写锁设置另一个Segment准备标记OK,threadRunning为false表示准备完毕 if (updateOk) { // 读写锁是不允许线程先获得读锁继续获得写锁,这里可以是因为这一段代码其实是线程池线程去完成的,不是获取到读锁的线程 buffer.wLock().lock(); buffer.setNextReady(true); buffer.getThreadRunning().set(false); buffer.wLock().unlock(); } else { // 失败了,则还是没有准备好,threadRunning恢复false,以便于下次获取id时重新再异步准备Segment buffer.getThreadRunning().set(false); } } } }); } // 原子value++(返回旧值),也就是下一个id,这一步是多线程操作的,每一个线程加1都是原子的,但不一定保证顺序性 long value = segment.getValue().getAndIncrement(); // 如果获取到的id小于maxId if (value < segment.getMax()) { return new Result(value, Status.SUCCESS); } } finally { // 释放读锁 buffer.rLock().unlock(); } // 等待线程池异步准备号段完毕 waitAndSleep(buffer); // 执行到这里,说明当前号段已经用完,应该切换另一个Segment号段使用 try { // 获取独占式写锁 buffer.wLock().lock(); // 获取当前使用的Segment号段 final Segment segment = buffer.getCurrent(); // 重复获取value, 多线程执行时,Segment可能已经被其他线程切换。再次判断, 防止重复切换Segment long value = segment.getValue().getAndIncrement(); if (value < segment.getMax()) { return new Result(value, Status.SUCCESS); } // 执行到这里, 说明其他的线程没有进行Segment切换,并且当前号段所有号码用完,需要进行切换Segment // 如果准备好另一个Segment,直接切换 if (buffer.isNextReady()) { buffer.switchPos(); buffer.setNextReady(false); } // 如果另一个Segment没有准备好,则返回异常双buffer全部用完 else { logger.error("Both two segments in {} are not ready!", buffer); return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION); } } finally { // 释放写锁 buffer.wLock().unlock(); } } } 首先该方法最外层套了一个循环,不断地尝试获取id。整个方法的逻辑大致包含: 1. 首先获取共享读锁,多个线程能够同时进来获取id。如果能够不需要异步准备双buffer的另一个 `Segment` 且分发的id号没有超出maxId,那么可以直接返回id号。多个线程并发获取id号,靠 `AtomicLong` 的 `getAndIncrement()` 原子操作保证不出问题。 2. 如果需要异步准备另一个 `Segment`,则将准备任务提交到线程池中进行完成。多线程执行下,要保证只有一个线程去提交任务。这一点是靠 `SegmentBuffer` 中的 `threadRunning` 字段实现的。`threadRunning` 字段用 `volatile` 修饰保证多线程可见性,其含义代表了异步准备号段任务是否已经提交线程池运行,是否有其他线程已经开始进行另外号段的初始化工作。使用CAS操作进行更新,保证 `SegmentBuffer` 在任意时刻只会有一个线程进行异步更新另外一个号段。 3. 如果号段分配的 `id` 号超出了maxId,则需要进行切换双buffer的操作。在进行直接切换之前,需要再次判断是否 `id` 还大于 maxId,因为多线程下,号段已经被其他线程切换成功,自己还不知道,所以为了避免重复切换出错,需要再次判断。切换操作为了保证同一时间只能有一个线程切换,这里利用了独占式的写锁。 [Leaf_ID]: https://tech.meituan.com/2019/03/07/open-source-project-leaf.html [Leaf_ID 1]: https://tech.meituan.com/2017/04/21/mt-leaf.html [https_github.com_MrSorrow_Leaf]: https://github.com/MrSorrow/Leaf [https_github.com_Meituan-Dianping_Leaf]: https://github.com/Meituan-Dianping/Leaf [20190513161616625.png]: /images/20220201/3c6b187ede404ff4a7f6361da61d2419.png [http_localhost_8080_api_segment_get_leaf-segment-test]: http://localhost:8080/api/segment/get/leaf-segment-test [id]: /images/20220201/7dc824788f0645d0beb6af6c58b13a70.png [http_localhost_8080_cache]: http://localhost:8080/cache [20190513165741933.png]: /images/20220201/ba6db337a2694bd58f22dc098ee2fe72.png [2019051317572554.png]: /images/20220201/7e5ddaac38944e02811c8a1b2fa0bd2e.png [ID]: /images/20220201/dd96cd860ba949daae2a8a9b99336aad.png
相关 美团开源分布式ID生成系统——Leaf源码阅读笔记(Leaf的号段模式) Leaf 最早期需求是各个业务线的订单ID生成需求。在美团早期,有的业务直接通过DB自增的方式生成ID,有的业务通过redis缓存来生成ID,也有的业务直接用UUID这种方式来 末蓝、/ 2023年06月10日 15:28/ 0 赞/ 190 阅读
相关 MyBatis源码- SqlSession门面模式 & selectList 源码解析 文章目录 Pre 工程概览 pom.xml mybatis-config.xml UserMapper 测试类 selectLi 阳光穿透心脏的1/2处/ 2023年02月17日 12:24/ 0 赞/ 5 阅读
相关 深度解析leaf分布式id生成服务源码(号段模式) > 原创不易,转载请注明出处 文章目录 前言 1.实现原理推演 1.1 基于mysql最简单分布式I 曾经终败给现在/ 2022年10月12日 01:40/ 1 赞/ 422 阅读
相关 HashMap源码解析 一、HashMap概述 HashMap基于哈希表的 Map 接口的实现。此实现提供所有可选的映射操作,并允许使用 null 值和 null 键。(除了不同步 不念不忘少年蓝@/ 2022年07月12日 14:07/ 0 赞/ 384 阅读
相关 美团 Cat 源码文章 注意的地方: 1,Cat使用的是`plexus`作为底层容器,又在此容器基础之上封装了一个容器叫`org.unidal.framework`。一些Web请求应该都是通过框架 小鱼儿/ 2022年06月09日 04:55/ 0 赞/ 310 阅读
相关 HashMap源码解析 以下针对JDK 1.8版本中的HashMap进行分析。 概述 哈希表基于`Map`接口的实现。此实现提供了所有可选的映射操作,并且允许键为`null`,值也为`null 太过爱你忘了你带给我的痛/ 2022年04月05日 10:13/ 0 赞/ 506 阅读
相关 hashMap源码解析 源码来自jdk:1.8,和其他jdk版本可能有少许差异。 一.hashMap的实现原理 hashMap底层是一个有Node组成的数组,每个Node都有一个key 冷不防/ 2022年03月25日 06:58/ 0 赞/ 470 阅读
相关 Fabric 源码解析——源码目录解析 Fabric 源码解析——源码目录解析 源码目录 目录解析 源码目录 ![ALT][] 目录解析 这里对重要的一些目录进行说明: 1. b 布满荆棘的人生/ 2022年03月17日 04:46/ 0 赞/ 624 阅读
相关 美团Leaf源码——号段模式源码解析 前言 分布式ID生成策略基本要求就是全局不重复,最好还能递增,长度较短,性能高,可用性强。关于相关的实现方案有很多,本文着重使用美团开源的分布式ID生成解决方案——Lea £神魔★判官ぃ/ 2022年02月01日 05:39/ 1 赞/ 902 阅读
相关 美团Leaf源码——snowflake模式源码解析 前言 上一篇文章介绍了如何使用Leaf的号段模式生成分布式全局唯一id,参照下图我们简单总结一下。当我们部署Leaf集群时(图中是3个),每个节点起初都包含一个双 `bu 曾经终败给现在/ 2022年01月31日 06:57/ 0 赞/ 687 阅读