ES5.4源码分析之启动流程

ES5.4源码分析之节点启动流程

入口

if [[ $DAEMONIZE = false ]]; then
  exec \
    "$JAVA" \
    "$XSHARE" \
    $ES_JAVA_OPTS \
    -Des.path.home="$ES_HOME" \
    -Des.path.conf="$ES_PATH_CONF" \
    -Des.distribution.flavor="$ES_DISTRIBUTION_FLAVOR" \
    -Des.distribution.type="$ES_DISTRIBUTION_TYPE" \
    -Des.bundled_jdk="$ES_BUNDLED_JDK" \
    -cp "$ES_CLASSPATH" \
    org.elasticsearch.bootstrap.Elasticsearch \
    "$@" <<<"$KEYSTORE_PASSWORD"
else
  exec \
    "$JAVA" \
    "$XSHARE" \
    $ES_JAVA_OPTS \
    -Des.path.home="$ES_HOME" \
    -Des.path.conf="$ES_PATH_CONF" \
    -Des.distribution.flavor="$ES_DISTRIBUTION_FLAVOR" \
    -Des.distribution.type="$ES_DISTRIBUTION_TYPE" \
    -Des.bundled_jdk="$ES_BUNDLED_JDK" \
    -cp "$ES_CLASSPATH" \
    org.elasticsearch.bootstrap.Elasticsearch \
    "$@" \
    <<<"$KEYSTORE_PASSWORD" &
  retval=$?
复制代码

流程图

启动流程.png

  1. 解析配置文件和命令参数
  2. 检查外部环境和内部环境 jvm版本、操作系统
  3. 初始化内部资源、创建内部模块
  4. 启动各个子模块和keep线程

org.elasticsearch.bootstrap.Elasticsearch

main

public static void main(final String[] args) throws Exception {
        // we want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
        // presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy)
        System.setSecurityManager(new SecurityManager() {
            @Override
            public void checkPermission(Permission perm) {
                // grant all permissions so that we can later set the security manager to the one that we want
            }
        });
        LogConfigurator.registerErrorListener();
        final Elasticsearch elasticsearch = new Elasticsearch();
        int status = main(args, elasticsearch, Terminal.DEFAULT);
        if (status != ExitCodes.OK) {
            exit(status);
        }
    }
复制代码

main(args, elasticsearch, Terminal.DEFAULT)调用Command类的main方法

Elasticsearch的构造方法

 Elasticsearch() {
        super("starts elasticsearch");
        versionOption = parser.acceptsAll(Arrays.asList("V", "version"),
            "Prints elasticsearch version information and exits");
        daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),
            "Starts Elasticsearch in the background")
            .availableUnless(versionOption);
        pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"),
            "Creates a pid file in the specified path on start")
            .availableUnless(versionOption)
            .withRequiredArg()
            .withValuesConvertedBy(new PathConverter());
        quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"),
            "Turns off standard ouput/error streams logging in console")
            .availableUnless(versionOption)
            .availableUnless(daemonizeOption);
    }
复制代码
  • 主要做了解析参数

org.elasticsearch.cli.Command#main

public final int main(String[] args, Terminal terminal) throws Exception {
    if (addShutdownHook()) {
        shutdownHookThread.set(new Thread(() -> {
            try {
                this.close();
            } catch (final IOException e) {
                try (
                    StringWriter sw = new StringWriter();
                    PrintWriter pw = new PrintWriter(sw)) {
                    e.printStackTrace(pw);
                    terminal.println(sw.toString());
                } catch (final IOException impossible) {
                    // StringWriter#close declares a checked IOException from the Closeable interface but the Javadocs for StringWriter
                    // say that an exception here is impossible
                    throw new AssertionError(impossible);
                }
            }
        }));
        Runtime.getRuntime().addShutdownHook(shutdownHookThread.get());
    }

    if (shouldConfigureLoggingWithoutConfig()) {
        // initialize default for es.logger.level because we will not read the log4j2.properties
        final String loggerLevel = System.getProperty("es.logger.level", Level.INFO.name());
        final Settings settings = Settings.builder().put("logger.level", loggerLevel).build();
        LogConfigurator.configureWithoutConfig(settings);
    }

    try {
        mainWithoutErrorHandling(args, terminal);
    } catch (OptionException e) {
        printHelp(terminal);
        terminal.println(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
        return ExitCodes.USAGE;
    } catch (UserException e) {
        if (e.exitCode == ExitCodes.USAGE) {
            printHelp(terminal);
        }
        terminal.println(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
        return e.exitCode;
    }
    return ExitCodes.OK;
}
复制代码
  • 采用模板方法

    • 增加线程的关闭钩子,关闭一些资源

    • 配置日志信息

    • mainWithoutErrorHandling 主要处理方法

      • void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {	
        		// 参数解析
            final OptionSet options = parser.parse(args);
        
            if (options.has(helpOption)) {
                printHelp(terminal);
                return;
            }
        
            if (options.has(silentOption)) {
                terminal.setVerbosity(Terminal.Verbosity.SILENT);
            } else if (options.has(verboseOption)) {
                terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
            } else {
                terminal.setVerbosity(Terminal.Verbosity.NORMAL);
            }
        		// 真正的执行
            execute(terminal, options);
        }
        复制代码
        • execute是一个抽象方法,由子类实现

          • 先看下 EnvironmentAwareCommand的exec

            • @Override
              protected void execute(Terminal terminal, OptionSet options) throws Exception {
                  final Map<String, String> settings = new HashMap<>();
                  for (final KeyValuePair kvp : settingOption.values(options)) {
                      if (kvp.value.isEmpty()) {
                          throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty");
                      }
                      if (settings.containsKey(kvp.key)) {
                          final String message = String.format(
                                  Locale.ROOT,
                                  "setting [%s] already set, saw [%s] and [%s]",
                                  kvp.key,
                                  settings.get(kvp.key),
                                  kvp.value);
                          throw new UserException(ExitCodes.USAGE, message);
                      }
                      settings.put(kvp.key, kvp.value);
                  }
              
                  putSystemPropertyIfSettingIsMissing(settings, "default.path.conf", "es.default.path.conf");
                  putSystemPropertyIfSettingIsMissing(settings, "default.path.data", "es.default.path.data");
                  putSystemPropertyIfSettingIsMissing(settings, "default.path.logs", "es.default.path.logs");
                  putSystemPropertyIfSettingIsMissing(settings, "path.conf", "es.path.conf");
                  putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
                  putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
                  putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");
              		// 又由子类实现
                  execute(terminal, options, createEnv(terminal, settings));
              }
              复制代码
              • org.elasticsearch.bootstrap.Elasticsearch#execute

              • @Override
                protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
                    if (options.nonOptionArguments().isEmpty() == false) {
                        throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments());
                    }
                    if (options.has(versionOption)) {
                        if (options.has(daemonizeOption) || options.has(pidfileOption)) {
                            throw new UserException(ExitCodes.USAGE, "Elasticsearch version option is mutually exclusive with any other option");
                        }
                        terminal.println("Version: " + org.elasticsearch.Version.CURRENT
                                + ", Build: " + Build.CURRENT.shortHash() + "/" + Build.CURRENT.date()
                                + ", JVM: " + JvmInfo.jvmInfo().version());
                        return;
                    }
                
                    final boolean daemonize = options.has(daemonizeOption);
                    final Path pidFile = pidfileOption.value(options);
                    final boolean quiet = options.has(quietOption);
                
                    try {
                        init(daemonize, pidFile, quiet, env);
                    } catch (NodeValidationException e) {
                        throw new UserException(ExitCodes.CONFIG, e.getMessage());
                    }
                }
                复制代码
                void init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv)
                    throws NodeValidationException, UserException {
                    try {
                        Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);
                    } catch (BootstrapException | RuntimeException e) {
                        // format exceptions to the console in a special way
                        // to avoid 2MB stacktraces from guice, etc.
                        throw new StartupException(e);
                    }
                }
                复制代码
                • Bootstrap#init

                  static void init(
                          final boolean foreground,
                          final Path pidFile,
                          final boolean quiet,
                          final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
                      // 
                      BootstrapInfo.init();
                  
                      INSTANCE = new Bootstrap();
                  
                      final SecureSettings keystore = loadSecureSettings(initialEnv);
                      Environment environment = createEnvironment(foreground, pidFile, keystore, initialEnv.settings());
                      try {
                          LogConfigurator.configure(environment);
                      } catch (IOException e) {
                          throw new BootstrapException(e);
                      }
                      checkForCustomConfFile();
                      checkConfigExtension(environment.configExtension());
                  
                      if (environment.pidFile() != null) {
                          try {
                              PidFile.create(environment.pidFile(), true);
                          } catch (IOException e) {
                              throw new BootstrapException(e);
                          }
                      }
                  
                      final boolean closeStandardStreams = (foreground == false) || quiet;
                      try {
                          if (closeStandardStreams) {
                              final Logger rootLogger = ESLoggerFactory.getRootLogger();
                              final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
                              if (maybeConsoleAppender != null) {
                                  Loggers.removeAppender(rootLogger, maybeConsoleAppender);
                              }
                              closeSystOut();
                          }
                  
                          // fail if somebody replaced the lucene jars
                          checkLucene();
                  
                          // install the default uncaught exception handler; must be done before security is
                          // initialized as we do not want to grant the runtime permission
                          // setDefaultUncaughtExceptionHandler
                          Thread.setDefaultUncaughtExceptionHandler(
                              new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings())));
                  
                          INSTANCE.setup(true, environment);
                  
                          try {
                              // any secure settings must be read during node construction
                              IOUtils.close(keystore);
                          } catch (IOException e) {
                              throw new BootstrapException(e);
                          }
                  
                          INSTANCE.start();
                  
                          if (closeStandardStreams) {
                              closeSysError();
                          }
                      } catch (NodeValidationException | RuntimeException e) {
                          // disable console logging, so user does not see the exception twice (jvm will show it already)
                          final Logger rootLogger = ESLoggerFactory.getRootLogger();
                          final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
                          if (foreground && maybeConsoleAppender != null) {
                              Loggers.removeAppender(rootLogger, maybeConsoleAppender);
                          }
                          Logger logger = Loggers.getLogger(Bootstrap.class);
                          if (INSTANCE.node != null) {
                              logger = Loggers.getLogger(Bootstrap.class, Node.NODE_NAME_SETTING.get(INSTANCE.node.settings()));
                          }
                          // HACK, it sucks to do this, but we will run users out of disk space otherwise
                          if (e instanceof CreationException) {
                              // guice: log the shortened exc to the log file
                              ByteArrayOutputStream os = new ByteArrayOutputStream();
                              PrintStream ps = null;
                              try {
                                  ps = new PrintStream(os, false, "UTF-8");
                              } catch (UnsupportedEncodingException uee) {
                                  assert false;
                                  e.addSuppressed(uee);
                              }
                              new StartupException(e).printStackTrace(ps);
                              ps.flush();
                              try {
                                  logger.error("Guice Exception: {}", os.toString("UTF-8"));
                              } catch (UnsupportedEncodingException uee) {
                                  assert false;
                                  e.addSuppressed(uee);
                              }
                          } else if (e instanceof NodeValidationException) {
                              logger.error("node validation exception\n{}", e.getMessage());
                          } else {
                              // full exception
                              logger.error("Exception", e);
                          }
                          // re-enable it if appropriate, so they can see any logging during the shutdown process
                          if (foreground && maybeConsoleAppender != null) {
                              Loggers.addAppender(rootLogger, maybeConsoleAppender);
                          }
                  
                          throw e;
                      }
                  }
                  复制代码
                  • init 是个空方法,方便子类实现一些初始化的操作

                  • INSTANCE = new Bootstrap(); 创建一个实例

                  • private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
                    Bootstrap() {
                        keepAliveThread = new Thread(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    keepAliveLatch.await();
                                } catch (InterruptedException e) {
                                    // bail out
                                }
                            }
                        }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
                        keepAliveThread.setDaemon(false);
                        // keep this thread alive (non daemon thread) until we shutdown
                        Runtime.getRuntime().addShutdownHook(new Thread() {
                            @Override
                            public void run() {
                                keepAliveLatch.countDown();
                            }
                        });
                    }
                    复制代码
                    • keepThread本身不做什么具体工作。
                    • 主线程执行完启动流程会退出,keepthread线程是唯一的用户线程,作用是保持进程运行
                  • final SecureSettings keystore = loadSecureSettings(initialEnv); 加载安全配置

                  • private static SecureSettings loadSecureSettings(Environment initialEnv) throws BootstrapException {
                        final KeyStoreWrapper keystore;
                        try {
                            keystore = KeyStoreWrapper.load(initialEnv.configFile());
                        } catch (IOException e) {
                            throw new BootstrapException(e);
                        }
                        if (keystore == null) {
                            return null; // no keystore
                        }
                    
                        try {
                            keystore.decrypt(new char[0] /* TODO: read password from stdin */);
                        } catch (Exception e) {
                            throw new BootstrapException(e);
                        }
                        return keystore;
                    }
                    复制代码
                • KeyStoreWrapper.load

                • public static KeyStoreWrapper load(Path configDir) throws IOException {
                      Path keystoreFile = keystorePath(configDir);
                      if (Files.exists(keystoreFile) == false) {
                          return null;
                      }
                  		
                  		// 开始lucene 
                      SimpleFSDirectory directory = new SimpleFSDirectory(configDir);
                      try (IndexInput indexInput = directory.openInput(KEYSTORE_FILENAME, IOContext.READONCE)) {
                          ChecksumIndexInput input = new BufferedChecksumIndexInput(indexInput);
                          int formatVersion = CodecUtil.checkHeader(input, KEYSTORE_FILENAME, MIN_FORMAT_VERSION, FORMAT_VERSION);
                          byte hasPasswordByte = input.readByte();
                          boolean hasPassword = hasPasswordByte == 1;
                          if (hasPassword == false && hasPasswordByte != 0) {
                              throw new IllegalStateException("hasPassword boolean is corrupt: "
                                  + String.format(Locale.ROOT, "%02x", hasPasswordByte));
                          }
                          String type = input.readString();
                          String stringKeyAlgo = input.readString();
                          final String fileKeyAlgo;
                          if (formatVersion >= 2) {
                              fileKeyAlgo = input.readString();
                          } else {
                              fileKeyAlgo = NEW_KEYSTORE_FILE_KEY_ALGO;
                          }
                          final Map<String, KeyType> settingTypes;
                          if (formatVersion >= 2) {
                              settingTypes = input.readMapOfStrings().entrySet().stream().collect(Collectors.toMap(
                                  Map.Entry::getKey,
                                  e -> KeyType.valueOf(e.getValue())));
                          } else {
                              settingTypes = new HashMap<>();
                          }
                          byte[] keystoreBytes = new byte[input.readInt()];
                          input.readBytes(keystoreBytes, 0, keystoreBytes.length);
                          CodecUtil.checkFooter(input);
                          return new KeyStoreWrapper(formatVersion, hasPassword, type, stringKeyAlgo, fileKeyAlgo, settingTypes, keystoreBytes);
                      }
                  }
                  复制代码
                • Path keystoreFile = keystorePath(configDir)

            • 最核心的代码Node类

              • 构造方法

                • protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
                      final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
                      boolean success = false;
                      {
                          // use temp logger just to say we are starting. we can't use it later on because the node name might not be set
                          Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));
                          logger.info("initializing ...");
                  
                      }
                      try {
                          Settings tmpSettings = Settings.builder().put(environment.settings())
                              .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
                  
                          tmpSettings = TribeService.processSettings(tmpSettings);
                  
                          // create the node environment as soon as possible, to recover the node id and enable logging
                          try {
                              nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
                              resourcesToClose.add(nodeEnvironment);
                          } catch (IOException ex) {
                              throw new IllegalStateException("Failed to create node environment", ex);
                          }
                          final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
                          Logger logger = Loggers.getLogger(Node.class, tmpSettings);
                          final String nodeId = nodeEnvironment.nodeId();
                          tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
                          if (DiscoveryNode.nodeRequiresLocalStorage(tmpSettings)) {
                              checkForIndexDataInDefaultPathData(tmpSettings, nodeEnvironment, logger);
                          }
                          // this must be captured after the node name is possibly added to the settings
                          final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
                          if (hadPredefinedNodeName == false) {
                              logger.info("node name [{}] derived from node ID [{}]; set [{}] to override", nodeName, nodeId, NODE_NAME_SETTING.getKey());
                          } else {
                              logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
                          }
                  
                          final JvmInfo jvmInfo = JvmInfo.jvmInfo();
                          logger.info(
                              "version[{}], pid[{}], build[{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
                              displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()),
                              jvmInfo.pid(),
                              Build.CURRENT.shortHash(),
                              Build.CURRENT.date(),
                              Constants.OS_NAME,
                              Constants.OS_VERSION,
                              Constants.OS_ARCH,
                              Constants.JVM_VENDOR,
                              Constants.JVM_NAME,
                              Constants.JAVA_VERSION,
                              Constants.JVM_VERSION);
                          logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
                          warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger);
                  
                          if (logger.isDebugEnabled()) {
                              logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
                                  environment.configFile(), Arrays.toString(environment.dataFiles()), environment.logsFile(), environment.pluginsFile());
                          }
                          // TODO: Remove this in Elasticsearch 6.0.0
                          if (JsonXContent.unquotedFieldNamesSet) {
                              DeprecationLogger dLogger = new DeprecationLogger(logger);
                              dLogger.deprecated("[{}] has been set, but will be removed in Elasticsearch 6.0.0",
                                  JsonXContent.JSON_ALLOW_UNQUOTED_FIELD_NAMES);
                          }
                  
                          this.pluginsService = new PluginsService(tmpSettings, environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
                          this.settings = pluginsService.updatedSettings();
                          localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
                  
                          // create the environment based on the finalized (processed) view of the settings
                          // this is just to makes sure that people get the same settings, no matter where they ask them from
                          this.environment = new Environment(this.settings);
                          Environment.assertEquivalent(environment, this.environment);
                  
                  
                          final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
                  
                          final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
                          resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
                          // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
                          DeprecationLogger.setThreadContext(threadPool.getThreadContext());
                          resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
                  
                          final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
                          final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
                          for (final ExecutorBuilder<?> builder : threadPool.builders()) {
                              additionalSettings.addAll(builder.getRegisteredSettings());
                          }
                          client = new NodeClient(settings, threadPool);
                          final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
                          final ScriptModule scriptModule = ScriptModule.create(settings, this.environment, resourceWatcherService,
                              pluginsService.filterPlugins(ScriptPlugin.class));
                          AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
                          additionalSettings.addAll(scriptModule.getSettings());
                          // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
                          // so we might be late here already
                          final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
                          scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
                          resourcesToClose.add(resourceWatcherService);
                          final NetworkService networkService = new NetworkService(settings,
                              getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
                          final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
                              localNodeFactory::getNode);
                          clusterService.addStateApplier(scriptModule.getScriptService());
                          resourcesToClose.add(clusterService);
                          final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment,
                              scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
                          final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
                  
                          ModulesBuilder modules = new ModulesBuilder();
                          // plugin modules must be added here, before others or we can get crazy injection errors...
                          for (Module pluginModule : pluginsService.createGuiceModules()) {
                              modules.add(pluginModule);
                          }
                          final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
                          modules.add(new NodeModule(this, monitorService));
                          ClusterModule clusterModule = new ClusterModule(settings, clusterService,
                              pluginsService.filterPlugins(ClusterPlugin.class));
                          modules.add(clusterModule);
                          IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
                          modules.add(indicesModule);
                  
                          SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
                          CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
                              settingsModule.getClusterSettings());
                          resourcesToClose.add(circuitBreakerService);
                          ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
                                  settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
                                  threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService);
                          modules.add(actionModule);
                          modules.add(new GatewayModule());
                  
                  
                          BigArrays bigArrays = createBigArrays(settings, circuitBreakerService);
                          resourcesToClose.add(bigArrays);
                          modules.add(settingsModule);
                          List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
                              NetworkModule.getNamedWriteables().stream(),
                              indicesModule.getNamedWriteables().stream(),
                              searchModule.getNamedWriteables().stream(),
                              pluginsService.filterPlugins(Plugin.class).stream()
                                  .flatMap(p -> p.getNamedWriteables().stream()),
                              ClusterModule.getNamedWriteables().stream())
                              .flatMap(Function.identity()).collect(Collectors.toList());
                          final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
                          NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
                              NetworkModule.getNamedXContents().stream(),
                              searchModule.getNamedXContents().stream(),
                              pluginsService.filterPlugins(Plugin.class).stream()
                                  .flatMap(p -> p.getNamedXContent().stream()),
                              ClusterModule.getNamedXWriteables().stream())
                              .flatMap(Function.identity()).collect(toList()));
                          final TribeService tribeService = new TribeService(settings, clusterService, nodeId, namedWriteableRegistry,
                              s -> newTribeClientNode(s, classpathPlugins));
                          resourcesToClose.add(tribeService);
                          modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
                          final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
                          final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
                              settingsModule.getClusterSettings(), analysisModule.getAnalysisRegistry(),
                              clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
                              threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),
                              clusterService, client, metaStateService);
                  
                          Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
                              .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
                                                               scriptModule.getScriptService(), xContentRegistry).stream())
                              .collect(Collectors.toList());
                          final RestController restController = actionModule.getRestController();
                          final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
                                  threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController);
                          Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =
                              pluginsService.filterPlugins(Plugin.class).stream()
                                  .map(Plugin::getCustomMetaDataUpgrader)
                                  .collect(Collectors.toList());
                          Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders =
                              pluginsService.filterPlugins(Plugin.class).stream()
                                  .map(Plugin::getIndexTemplateMetaDataUpgrader)
                                  .collect(Collectors.toList());
                          Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream()
                                  .map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
                          final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders);
                          new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
                          final Transport transport = networkModule.getTransportSupplier().get();
                          final TransportService transportService = newTransportService(settings, transport, threadPool,
                              networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
                          final SearchTransportService searchTransportService =  new SearchTransportService(settings,
                              transportService);
                          final Consumer<Binder> httpBind;
                          final HttpServerTransport httpServerTransport;
                          if (networkModule.isHttpEnabled()) {
                              httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
                              httpBind = b -> {
                                  b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
                              };
                          } else {
                              httpBind = b -> {
                                  b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
                              };
                              httpServerTransport = null;
                          }
                          final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService,
                              namedWriteableRegistry, networkService, clusterService, pluginsService.filterPlugins(DiscoveryPlugin.class));
                          NodeService nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
                              transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
                              httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter());
                  
                          modules.add(b -> {
                                  b.bind(NodeService.class).toInstance(nodeService);
                                  b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
                                  b.bind(PluginsService.class).toInstance(pluginsService);
                                  b.bind(Client.class).toInstance(client);
                                  b.bind(NodeClient.class).toInstance(client);
                                  b.bind(Environment.class).toInstance(this.environment);
                                  b.bind(ThreadPool.class).toInstance(threadPool);
                                  b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
                                  b.bind(TribeService.class).toInstance(tribeService);
                                  b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
                                  b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
                                  b.bind(BigArrays.class).toInstance(bigArrays);
                                  b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
                                  b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
                                  b.bind(IngestService.class).toInstance(ingestService);
                                  b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
                                  b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
                                  b.bind(MetaStateService.class).toInstance(metaStateService);
                                  b.bind(IndicesService.class).toInstance(indicesService);
                                  b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,
                                      threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase()));
                                  b.bind(SearchTransportService.class).toInstance(searchTransportService);
                                  b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
                                          scriptModule.getScriptService()));
                                  b.bind(Transport.class).toInstance(transport);
                                  b.bind(TransportService.class).toInstance(transportService);
                                  b.bind(NetworkService.class).toInstance(networkService);
                                  b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
                                  b.bind(MetaDataIndexUpgradeService.class).toInstance(new MetaDataIndexUpgradeService(settings, xContentRegistry,
                                      indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders));
                                  b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
                                  b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
                                  {
                                      RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
                                      processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
                                      b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(settings, transportService,
                                              indicesService, recoverySettings, clusterService));
                                      b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(settings, threadPool,
                                              transportService, recoverySettings, clusterService));
                                  }
                                  httpBind.accept(b);
                                  pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
                              }
                          );
                          injector = modules.createInjector();
                  
                          List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
                              .filter(p -> p instanceof LifecycleComponent)
                              .map(p -> (LifecycleComponent) p).collect(Collectors.toList());
                          pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
                              .map(injector::getInstance).collect(Collectors.toList()));
                          resourcesToClose.addAll(pluginLifecycleComponents);
                          this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
                          client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
                                  () -> clusterService.localNode().getId());
                  
                          if (NetworkModule.HTTP_ENABLED.get(settings)) {
                              logger.debug("initializing HTTP handlers ...");
                              actionModule.initRestHandlers(() -> clusterService.state().nodes());
                          }
                          logger.info("initialized");
                  
                          success = true;
                      } catch (IOException ex) {
                          throw new ElasticsearchException("failed to bind service", ex);
                      } finally {
                          if (!success) {
                              IOUtils.closeWhileHandlingException(resourcesToClose);
                          }
                      }
                  }
                  复制代码
                  • 注:
                    • 构建各个模块
              • start方法

              public Node start() throws NodeValidationException {
                  if (!lifecycle.moveToStarted()) {
                      return this;
                  }
              
                  Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
                  logger.info("starting ...");
                  // hack around dependency injection problem (for now...)
                  injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
                  pluginLifecycleComponents.forEach(LifecycleComponent::start);
              
                  injector.getInstance(MappingUpdatedAction.class).setClient(client);
                  injector.getInstance(IndicesService.class).start();
                  injector.getInstance(IndicesClusterStateService.class).start();
                  injector.getInstance(IndicesTTLService.class).start();
                  injector.getInstance(SnapshotsService.class).start();
                  injector.getInstance(SnapshotShardsService.class).start();
                  injector.getInstance(RoutingService.class).start();
                  injector.getInstance(SearchService.class).start();
                  injector.getInstance(MonitorService.class).start();
              
                  final ClusterService clusterService = injector.getInstance(ClusterService.class);
              
                  final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
                  nodeConnectionsService.start();
                  clusterService.setNodeConnectionsService(nodeConnectionsService);
              
                  // TODO hack around circular dependencies problems
                  injector.getInstance(GatewayAllocator.class).setReallocation(clusterService, injector.getInstance(RoutingService.class));
              
                  injector.getInstance(ResourceWatcherService.class).start();
                  injector.getInstance(GatewayService.class).start();
                  Discovery discovery = injector.getInstance(Discovery.class);
                  clusterService.setDiscoverySettings(discovery.getDiscoverySettings());
                  clusterService.addInitialStateBlock(discovery.getDiscoverySettings().getNoMasterBlock());
                  clusterService.setClusterStatePublisher(discovery::publish);
              
                  // start before the cluster service since it adds/removes initial Cluster state blocks
                  final TribeService tribeService = injector.getInstance(TribeService.class);
                  tribeService.start();
              
                  // Start the transport service now so the publish address will be added to the local disco node in ClusterService
                  TransportService transportService = injector.getInstance(TransportService.class);
                  transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
                  transportService.start();
                  validateNodeBeforeAcceptingRequests(settings, transportService.boundAddress(), pluginsService.filterPlugins(Plugin.class).stream()
                      .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
              
                  clusterService.addStateApplier(transportService.getTaskManager());
                  clusterService.start();
                  assert localNodeFactory.getNode() != null;
                  assert transportService.getLocalNode().equals(localNodeFactory.getNode())
                      : "transportService has a different local node than the factory provided";
                  assert clusterService.localNode().equals(localNodeFactory.getNode())
                      : "clusterService has a different local node than the factory provided";
                  // start after cluster service so the local disco is known
                  discovery.start();
                  transportService.acceptIncomingRequests();
                  discovery.startInitialJoin();
                  // tribe nodes don't have a master so we shouldn't register an observer         s
                  final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
                  if (initialStateTimeout.millis() > 0) {
                      final ThreadPool thread = injector.getInstance(ThreadPool.class);
                      ClusterState clusterState = clusterService.state();
                      ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
                      if (clusterState.nodes().getMasterNodeId() == null) {
                          logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
                          final CountDownLatch latch = new CountDownLatch(1);
                          observer.waitForNextChange(new ClusterStateObserver.Listener() {
                              @Override
                              public void onNewClusterState(ClusterState state) { latch.countDown(); }
              
                              @Override
                              public void onClusterServiceClose() {
                                  latch.countDown();
                              }
              
                              @Override
                              public void onTimeout(TimeValue timeout) {
                                  logger.warn("timed out while waiting for initial discovery state - timeout: {}",
                                      initialStateTimeout);
                                  latch.countDown();
                              }
                          }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
              
                          try {
                              latch.await();
                          } catch (InterruptedException e) {
                              throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
                          }
                      }
                  }
              
              
                  if (NetworkModule.HTTP_ENABLED.get(settings)) {
                      injector.getInstance(HttpServerTransport.class).start();
                  }
              
                  if (WRITE_PORTS_FILE_SETTING.get(settings)) {
                      if (NetworkModule.HTTP_ENABLED.get(settings)) {
                          HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
                          writePortsFile("http", http.boundAddress());
                      }
                      TransportService transport = injector.getInstance(TransportService.class);
                      writePortsFile("transport", transport.boundAddress());
                  }
              
                  // start nodes now, after the http server, because it may take some time
                  tribeService.startNodes();
                  logger.info("started");
              
                  return this;
              }
              复制代码
              • 注:

                • 启动各个service服务

                  • TribeService#start

                  • TransportService#start

总结

  • 采用了google的轻量级ioc容器inject
  • 核心类org.elasticsearch.node.Node#Node
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享