RocketMQ源码解析一(NameServer启动流程)

秒速五厘米 2022-09-12 07:46 81阅读 0赞

RocketMQ版本4.6.0,记录自己看源码的过程

从启动类开始看

  1. public class NamesrvStartup {
  2. // 不懂,应该跟日志,配置和命令行参数有关
  3. private static InternalLogger log;
  4. private static Properties properties = null;
  5. private static CommandLine commandLine = null;
  6. public static void main(String[] args) {
  7. main0(args);
  8. }
  9. public static NamesrvController main0(String[] args) {
  10. try {
  11. // 解析配置并创建NamesrvController这一核心处理请求组件
  12. NamesrvController controller = createNamesrvController(args);
  13. // 初始化并启动NamesrvController,创建netty服务器,请求处理器,后台定时任务等
  14. start(controller);
  15. String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
  16. log.info(tip);
  17. System.out.printf("%s%n", tip);
  18. return controller;
  19. } catch (Throwable e) {
  20. e.printStackTrace();
  21. System.exit(-1);
  22. }
  23. return null;
  24. }
  25. }

首先第一步就是解析配置并创建NamesrvController这一核心处理请求组件

  1. public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
  2. System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
  3. //PackageConflictDetect.detectFastjson();
  4. Options options = ServerUtil.buildCommandlineOptions(new Options());
  5. commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
  6. if (null == commandLine) {
  7. System.exit(-1);
  8. return null;
  9. }
  10. // 创建两个核心配置,后面要根据这两个配置创建NamesrvController
  11. final NamesrvConfig namesrvConfig = new NamesrvConfig();
  12. final NettyServerConfig nettyServerConfig = new NettyServerConfig();
  13. // netty监听9876端口
  14. nettyServerConfig.setListenPort(9876);
  15. if (commandLine.hasOption('c')) {
  16. String file = commandLine.getOptionValue('c');
  17. if (file != null) {
  18. InputStream in = new BufferedInputStream(new FileInputStream(file));
  19. properties = new Properties();
  20. properties.load(in);
  21. MixAll.properties2Object(properties, namesrvConfig);
  22. MixAll.properties2Object(properties, nettyServerConfig);
  23. namesrvConfig.setConfigStorePath(file);
  24. System.out.printf("load config properties file OK, %s%n", file);
  25. in.close();
  26. }
  27. }
  28. if (commandLine.hasOption('p')) {
  29. InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
  30. MixAll.printObjectProperties(console, namesrvConfig);
  31. MixAll.printObjectProperties(console, nettyServerConfig);
  32. System.exit(0);
  33. }
  34. MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
  35. if (null == namesrvConfig.getRocketmqHome()) {
  36. System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
  37. System.exit(-2);
  38. }
  39. LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
  40. JoranConfigurator configurator = new JoranConfigurator();
  41. configurator.setContext(lc);
  42. lc.reset();
  43. configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
  44. log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
  45. MixAll.printObjectProperties(log, namesrvConfig);
  46. MixAll.printObjectProperties(log, nettyServerConfig);
  47. // 根据上面的配置创建NamesrvController
  48. final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
  49. // remember all configs to prevent discard
  50. controller.getConfiguration().registerConfig(properties);
  51. return controller;
  52. }

前面几行看不懂直接跳过,直接看这3行关键的代码:

  1. // 创建两个核心配置,后面要根据这两个配置创建NamesrvController
  2. final NamesrvConfig namesrvConfig = new NamesrvConfig();
  3. final NettyServerConfig nettyServerConfig = new NettyServerConfig();
  4. // netty监听9876端口
  5. nettyServerConfig.setListenPort(9876);

这里应该可以猜测是为创建NamesrvController提供一些配置,看看NamesrvConfig和NettyServerConfig是什么

  1. public class NamesrvConfig {
  2. private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
  3. // rocketmq的主目录,通过读取ROCKETMQ_HOME环境变量
  4. private String rocketmqHome = System.getProperty(
  5. MixAll.ROCKETMQ_HOME_PROPERTY,
  6. System.getenv(MixAll.ROCKETMQ_HOME_ENV));
  7. // kv配置文件的路径
  8. private String kvConfigPath = System.getProperty("user.home")
  9. + File.separator
  10. + "namesrv"
  11. + File.separator
  12. + "kvConfig.json";
  13. // namesrv自己的配置文件路径
  14. private String configStorePath = System.getProperty("user.home")
  15. + File.separator
  16. + "namesrv"
  17. + File.separator
  18. + "namesrv.properties";
  19. private String productEnvName = "center";
  20. private boolean clusterTest = false;
  21. private boolean orderMessageEnable = false;
  22. }

这里主要配置一些路径,对于理解主线流程帮助不大,可以跳过。

  1. public class NettyServerConfig implements Cloneable {
  2. // 监听端口,但nameserver改为了9876
  3. private int listenPort = 8888;
  4. // 工作线程
  5. private int serverWorkerThreads = 8;
  6. // netty的public线程池的线程数量
  7. private int serverCallbackExecutorThreads = 0;
  8. // 这是netty的IO线程池的线程数量,默认是3,这里的线程是负责解析网络请求的
  9. // 它这里的线程解析完网络请求之后,就会把请求转发给work线程来处理
  10. private int serverSelectorThreads = 3;
  11. private int serverOnewaySemaphoreValue = 256;
  12. private int serverAsyncSemaphoreValue = 64;
  13. // 如果一个网络连接空闲超过120s,就会被关闭
  14. private int serverChannelMaxIdleTimeSeconds = 120;
  15. private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
  16. private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
  17. private boolean serverPooledByteBufAllocatorEnable = true;
  18. /** * make make install * * * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \ * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd */
  19. private boolean useEpollNativeSelector = false;
  20. }

这是对NettyServer的一些配置信息,由于对netty不了解,所以这里也可以先跳过。

接下来的一些解析配置也不是很懂,就跳过了,直接看创建NamesrvController

  1. // 根据上面的配置创建NamesrvController
  2. final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

可以看到是根据上面的两个核心配置创建的

  1. public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
  2. this.namesrvConfig = namesrvConfig;
  3. this.nettyServerConfig = nettyServerConfig;
  4. this.kvConfigManager = new KVConfigManager(this);
  5. this.routeInfoManager = new RouteInfoManager();
  6. this.brokerHousekeepingService = new BrokerHousekeepingService(this);
  7. this.configuration = new Configuration(
  8. log,
  9. this.namesrvConfig, this.nettyServerConfig
  10. );
  11. this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
  12. }

创建的同时会对里面的一些管理器组件进行创建,供后续使用。

看入口方法,NameSrvController创建完成后就该进行初始化并启动,创建netty服务器,请求处理器,后台定时任务等。

  1. public static NamesrvController start(final NamesrvController controller) throws Exception {
  2. if (null == controller) {
  3. throw new IllegalArgumentException("NamesrvController is null");
  4. }
  5. // 进行初始化,创建netty服务器,请求处理器,后台定时任务等
  6. boolean initResult = controller.initialize();
  7. if (!initResult) {
  8. controller.shutdown();
  9. System.exit(-3);
  10. }
  11. Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
  12. @Override
  13. public Void call() throws Exception {
  14. controller.shutdown();
  15. return null;
  16. }
  17. }));
  18. // 其实就是启动netty服务器
  19. controller.start();
  20. return controller;
  21. }

这里面关键步骤就两步,初始化和启动,先看如何初始化NamesrvController:

  1. public boolean initialize() {
  2. // 无关紧要,跳过
  3. this.kvConfigManager.load();
  4. // 创建netty server
  5. this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
  6. // 默认请求处理器的线程池
  7. this.remotingExecutor =
  8. Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
  9. // 注册默认请求处理器
  10. this.registerProcessor();
  11. // 每隔10s定时扫描broker状态的,看是否还存活着
  12. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  13. @Override
  14. public void run() {
  15. NamesrvController.this.routeInfoManager.scanNotActiveBroker();
  16. }
  17. }, 5, 10, TimeUnit.SECONDS);
  18. // 忽略不重要代码
  19. return true;
  20. }

这里面分为4步:
1、创建netty server,毕竟namesrv要作为服务端来接受broker,producer以及consumer的请求的,所以使用之前创建好的配置来创建netty server

  1. public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
  2. final ChannelEventListener channelEventListener) {
  3. super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
  4. this.serverBootstrap = new ServerBootstrap();
  5. this.nettyServerConfig = nettyServerConfig;
  6. this.channelEventListener = channelEventListener;
  7. // 忽略看不懂的代码
  8. }

这里面最重要的一行代码就是this.serverBootstrap = new ServerBootstrap()。ServerBootstrap是netty里面的一个核心类,可以看作是netty的服务器,在监听9876端口。
2、创建一个默认请求处理器的线程池,用来处理没有专门处理器的请求,至于何谓专门,下面会介绍,但在namesrv中,就这一个处理器线程池,所以其实在namesrv,所有的请求都是由该线程池处理的。
3、注册默认请求处理器,跟上一步说明一样,namesrv只有这一个处理器。

  1. private void registerProcessor() {
  2. if (namesrvConfig.isClusterTest()) {
  3. this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
  4. this.remotingExecutor);
  5. } else {
  6. // 向netty服务器注册默认的请求处理组件
  7. this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
  8. }
  9. }

4、创建一个定时器,每隔10s定时扫描broker状态的,看是否还存活着,挂了就移除相关broker的信息

  1. /** * 扫描发生故障的broker,即默认120s没有更新心跳时间,则表示该broker挂了,需要将该broker路由信息移除 */
  2. public void scanNotActiveBroker() {
  3. // 迭代brokerLiveTable,判断每个broker的心跳时间跟当前时间的间隔
  4. Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
  5. while (it.hasNext()) {
  6. Entry<String, BrokerLiveInfo> next = it.next();
  7. long last = next.getValue().getLastUpdateTimestamp();
  8. // 默认超过120s就表示broker挂了
  9. if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
  10. RemotingUtil.closeChannel(next.getValue().getChannel());
  11. it.remove();
  12. log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
  13. // 移除与该broker所有的信息,有点复杂,只知道移除所有有关broker信息就行了
  14. this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
  15. }
  16. }
  17. }

这里面的一些实例变量等后面解析broker发送心跳时再具体说明。

最后就是启动NamesrvController这一组件:

  1. public void start() throws Exception {
  2. this.remotingServer.start();
  3. if (this.fileWatchService != null) {
  4. this.fileWatchService.start();
  5. }
  6. }

其实就是启动Netty服务器,至于启动细节,看不懂。

好了,到这里,namesrv服务就算是启动成功了,总的来说,namesrv的代码还是比较简单的。

总结一下,就是先解析相关配置,创建namesrvConfig和nettyServerConfig这两个核心配置,根据这两个配置创建NamesrvController这一 核心组件,创建出来以后对该组件进行初始化,包括创建netty服务器,请求处理器,请求线程池,后台定时任务等,最后启动NamesrvController,其实就是启动netty服务器。
在这里插入图片描述
下面就要开始研究NameServer启动成功后,Broker是如何启动的,如果向NameServer进行注册,如何进行心跳,NameServer是如何管理Broker的。

参考资料
《儒猿技术窝——从 0 开始带你成为消息中间件实战高手》

发表评论

表情:
评论列表 (有 0 条评论,81人围观)

还没有评论,来说两句吧...

相关阅读