关于拉取Apollo配置中心Namespace逻辑探析

doMore 26 2025-03-07

Apollo 简介

Apollo(阿波罗)是一款可靠的分布式配置管理中心,诞生于携程框架研发部,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性,适用于微服务配置管理场景。

项目结构

  • Config Service提供配置的读取、推送等功能,服务对象是Apollo客户端。
  • Admin Service提供配置的修改、发布等功能,服务对象是Apollo Portal(管理界面)。
  • Config Service和Admin Service都是多实例、无状态部署,所以需要将自己注册到Eureka中并保持心跳。
  • Client通过域名访问Meta Server获取Config Service服务列表(IP+Port),而后直接通过IP+Port访问服务,同时在Client侧会做load balance、错误重试。
  • Portal通过域名访问Meta Server获取Admin Service服务列表(IP+Port),而后直接通过IP+Port访问服务,同时在Portal侧会做load balance、错误重试。

注意:本次所谈内容默认已经成功部署 Apollo server 端,并且在项目中也成功结合了 Apollo Client。

Client 拉取

  1. 入口类 com.ctrip.framework.apollo.spring.boot.ApolloApplicationContextInitializer 。该类由 spring.factories 进行加载,实现了 org.springframework.boot.env.EnvironmentPostProcessor ,所以在 spring 环境变量 Enviroment 加载的时候 会执行方法 org.springframework.boot.env.EnvironmentPostProcessor#postProcessEnvironment 。

  @Override
  public void postProcessEnvironment(ConfigurableEnvironment configurableEnvironment, SpringApplication springApplication) {

    // should always initialize system properties like app.id in the first place
    initializeSystemProperty(configurableEnvironment);

    Boolean eagerLoadEnabled = configurableEnvironment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_EAGER_LOAD_ENABLED, Boolean.class, false);

    //EnvironmentPostProcessor should not be triggered if you don't want Apollo Loading before Logging System Initialization
    // apollo.bootstrap.eagerLoad.enabled 是否 直接加载配置
    if (!eagerLoadEnabled) {
      return;
    }

    Boolean bootstrapEnabled = configurableEnvironment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED, Boolean.class, false);

    if (bootstrapEnabled) {
      DeferredLogger.enable();
      // 远程配置的加载
      initialize(configurableEnvironment);
    }

  }


protected void initialize(ConfigurableEnvironment environment) {

    if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
      //already initialized, replay the logs that were printed before the logging system was initialized
      DeferredLogger.replayTo();
      return;
    }
   // 从环境变量获取 apollo.bootstrap.namespaces 的值,可以获取到填写的application
    String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION);
    logger.debug("Apollo bootstrap namespaces: {}", namespaces);
    // 因为namespaces可以配置多个,这里根据逗号转为集合列表
    List<String> namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces);

    CompositePropertySource composite;
    final ConfigUtil configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    if (configUtil.isPropertyNamesCacheEnabled()) {
      composite = new CachedCompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);
    } else {
      composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);
    }
    for (String namespace : namespaceList) {
      // 获取config,这个config很关键,里面就包含了namespace文件的内容,下面要分析这个方法
      Config config = ConfigService.getConfig(namespace);
      // 根据config转换为spring environment需要的propertySource
      composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
    }
    // 加到所有PropertySources最前面,这里可以看到优先级就是最高的了
    environment.getPropertySources().addFirst(composite);
  }


需要注意的是,很多时候项目中会使用 @EnableApolloConfig 去配置 Namespace ,该注解的配置内容会由 @Import(ApolloConfigRegistrar.class) 进行加载。 在上述 environment.getProperty 时已经转换完成。

  1. 获取 Config 内容
  • com.ctrip.framework.apollo.ConfigService#getConfig
    • com.ctrip.framework.apollo.internals.DefaultConfigManager#getConfig
      • com.ctrip.framework.apollo.spi.DefaultConfigFactory#create

  @Override
  public Config create(String namespace) {
    ConfigFileFormat format = determineFileFormat(namespace);

    ConfigRepository configRepository = null;

    // although ConfigFileFormat.Properties are compatible with themselves we
    // should not create a PropertiesCompatibleFileConfigRepository for them
    // calling the method `createLocalConfigRepository(...)` is more suitable
    // for ConfigFileFormat.Properties
    if (ConfigFileFormat.isPropertiesCompatible(format) &&
        format != ConfigFileFormat.Properties) {
      configRepository = createPropertiesCompatibleFileConfigRepository(namespace, format);
    } else {
      // 获取配置信息
      configRepository = createConfigRepository(namespace);
    }

    logger.debug("Created a configuration repository of type [{}] for namespace [{}]",
        configRepository.getClass().getName(), namespace);
    // 这个是关键点,大部分核心代码都在这里
    return this.createRepositoryConfig(namespace, configRepository);
  }

  ConfigRepository createConfigRepository(String namespace) {
    // 判断是否配置只走本地缓存文件的方式,这里默认走这个分析
    if (m_configUtil.isPropertyFileCacheEnabled()) {
      return createLocalConfigRepository(namespace);
    }
    return createRemoteConfigRepository(namespace);
  }

    LocalFileConfigRepository createLocalConfigRepository(String namespace) {
    if (m_configUtil.isInLocalMode()) {
      logger.warn(
          "==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
          namespace);
      return new LocalFileConfigRepository(namespace);
    }
    // 这创建了一个LocalFileConfigRepository,里面又包装了一个RemoteConfigRepository,
    return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
  }

  RemoteConfigRepository createRemoteConfigRepository(String namespace) {
    return new RemoteConfigRepository(namespace);
  }


  protected Config createRepositoryConfig(String namespace, ConfigRepository configRepository) {
    return new DefaultConfig(namespace, configRepository);
  }

    public DefaultConfig(String namespace, ConfigRepository configRepository) {
    m_namespace = namespace;
    m_resourceProperties = loadFromResource(m_namespace);
    m_configRepository = configRepository;
    m_configProperties = new AtomicReference<>();
    m_warnLogRateLimiter = RateLimiter.create(0.017); // 1 warning log output per minute
    initialize();
  }


  private void initialize() {
    try {
        // 获取远程的配置
      updateConfig(m_configRepository.getConfig(), m_configRepository.getSourceType());
    } catch (Throwable ex) {
      Tracer.logError(ex);
      logger.warn("Init Apollo Local Config failed - namespace: {}, reason: {}.",
          m_namespace, ExceptionUtil.getDetailMessage(ex));
    } finally {
      //register the change listener no matter config repository is working or not
      //so that whenever config repository is recovered, config could get changed
      // 增加监听器
      // 这里发现对传进来的LocalFileConfigRepository调用了addChangeListener,同样的方式,
      // 让当前类来监听LocalFileConfigRepository,
      // 所以监听可以传递到当前类DefaultConfig的onRepositoryChange方法了
      m_configRepository.addChangeListener(this);
    }
  }

// com.ctrip.framework.apollo.internals.RemoteConfigRepository#getConfig

  @Override
  public Properties getConfig() {
    // 没有缓存信息就去同步远程配置
    if (m_configCache.get() == null) {
      this.sync();
    }
    return transformApolloConfigToProperties(m_configCache.get());
  }


  @Override
  protected synchronized void sync() {
    Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");

    try {
      // 获取当前的ApolloConfig
      ApolloConfig previous = m_configCache.get();
      // 获取远程的ApolloConfig
      // 该方法就是根据 appId、namespace、secret 等信息去请求 /configs/{{appId}}/{{clusterName}}/{{namespace}} 借口获取相应的配置信息,并且有重试机制等 。
      // appId 在没有指定的情况在 会有一个默认值 ApolloNoAppIdPlaceHolder
      ApolloConfig current = loadApolloConfig();

      //reference equals means HTTP 304
      if (previous != current) {
        logger.debug("Remote Config refreshed!");
        // 如果本地和远程不一样,则更新
        m_configCache.set(current);
        // (this.getConfig()是根据ApolloConfig获取一个Properties)
        this.fireRepositoryChange(m_namespace, this.getConfig());
      }

      if (current != null) {
        Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
                current.getReleaseKey());
      }

      transaction.setStatus(Transaction.SUCCESS);
    } catch (Throwable ex) {
      transaction.setStatus(ex);
      throw ex;
    } finally {
      transaction.complete();
    }
  }

Server 端接口

  1. 入口接口 com.ctrip.framework.apollo.configservice.controller.ConfigController#queryConfig 请求地址 -> /configs/{{appId}}/{{clusterName}}/{{namespace}}

@GetMapping(value = "/{appId}/{clusterName}/{namespace:.+}")
  public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName,
                                  @PathVariable String namespace,
                                  @RequestParam(value = "dataCenter", required = false) String dataCenter,
                                  @RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey,
                                  @RequestParam(value = "ip", required = false) String clientIp,
                                  @RequestParam(value = "label", required = false) String clientLabel,
                                  @RequestParam(value = "messages", required = false) String messagesAsString,
                                  HttpServletRequest request, HttpServletResponse response) throws IOException {
    String originalNamespace = namespace;
    //strip out .properties suffix
    namespace = namespaceUtil.filterNamespaceName(namespace);
    //fix the character case issue, such as FX.apollo <-> fx.apollo
    namespace = namespaceUtil.normalizeNamespace(appId, namespace);

    if (Strings.isNullOrEmpty(clientIp)) {
      clientIp = tryToGetClientIp(request);
    }

    ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);

    List<Release> releases = Lists.newLinkedList();

    String appClusterNameLoaded = clusterName;
    // 提供了 APPID 的根据 APPID 去加载
    if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
      Release currentAppRelease = configService.loadConfig(appId, clientIp, clientLabel, appId, clusterName, namespace,
          dataCenter, clientMessages);

      if (currentAppRelease != null) {
        releases.add(currentAppRelease);
        //we have cluster search process, so the cluster name might be overridden
        appClusterNameLoaded = currentAppRelease.getClusterName();
      }
    }

    //if namespace does not belong to this appId, should check if there is a public configuration
    // 如果当前 namespace 不属于 指定的 appId,则检查 namespace 是不是一个 public,是一个 public 也可以正常加载
    if (!namespaceBelongsToAppId(appId, namespace)) {
      Release publicRelease = this.findPublicConfig(appId, clientIp, clientLabel, clusterName, namespace,
          dataCenter, clientMessages);
      if (Objects.nonNull(publicRelease)) {
        releases.add(publicRelease);
      }
    }

    if (releases.isEmpty()) {
      response.sendError(HttpServletResponse.SC_NOT_FOUND,
          String.format(
              "Could not load configurations with appId: %s, clusterName: %s, namespace: %s",
              appId, clusterName, originalNamespace));
      Tracer.logEvent("Apollo.Config.NotFound",
          assembleKey(appId, clusterName, originalNamespace, dataCenter));
      return null;
    }

    auditReleases(appId, clusterName, dataCenter, clientIp, releases);

    String mergedReleaseKey = releases.stream().map(Release::getReleaseKey)
            .collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));

    if (mergedReleaseKey.equals(clientSideReleaseKey)) {
      // Client side configuration is the same with server side, return 304
      response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
      Tracer.logEvent("Apollo.Config.NotModified",
          assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));
      return null;
    }

    ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace,
        mergedReleaseKey);
    apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));

    Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded,
        originalNamespace, dataCenter));
    return apolloConfig;
  }@GetMapping(value = "/{appId}/{clusterName}/{namespace:.+}")
  public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName,
                                  @PathVariable String namespace,
                                  @RequestParam(value = "dataCenter", required = false) String dataCenter,
                                  @RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey,
                                  @RequestParam(value = "ip", required = false) String clientIp,
                                  @RequestParam(value = "label", required = false) String clientLabel,
                                  @RequestParam(value = "messages", required = false) String messagesAsString,
                                  HttpServletRequest request, HttpServletResponse response) throws IOException {
    String originalNamespace = namespace;
    //strip out .properties suffix
    namespace = namespaceUtil.filterNamespaceName(namespace);
    //fix the character case issue, such as FX.apollo <-> fx.apollo
    namespace = namespaceUtil.normalizeNamespace(appId, namespace);

    if (Strings.isNullOrEmpty(clientIp)) {
      clientIp = tryToGetClientIp(request);
    }

    ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);

    List<Release> releases = Lists.newLinkedList();

    String appClusterNameLoaded = clusterName;
    // 提供了 APPID 的根据 APPID 去加载
    if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
      Release currentAppRelease = configService.loadConfig(appId, clientIp, clientLabel, appId, clusterName, namespace,
          dataCenter, clientMessages);

      if (currentAppRelease != null) {
        releases.add(currentAppRelease);
        //we have cluster search process, so the cluster name might be overridden
        appClusterNameLoaded = currentAppRelease.getClusterName();
      }
    }

    //if namespace does not belong to this appId, should check if there is a public configuration
    // 如果当前 namespace 不属于 指定的 appId,则检查 namespace 是不是一个 public,是一个 public 也可以正常加载
    if (!namespaceBelongsToAppId(appId, namespace)) {
      Release publicRelease = this.findPublicConfig(appId, clientIp, clientLabel, clusterName, namespace,
          dataCenter, clientMessages);
      if (Objects.nonNull(publicRelease)) {
        releases.add(publicRelease);
      }
    }

    if (releases.isEmpty()) {
      response.sendError(HttpServletResponse.SC_NOT_FOUND,
          String.format(
              "Could not load configurations with appId: %s, clusterName: %s, namespace: %s",
              appId, clusterName, originalNamespace));
      Tracer.logEvent("Apollo.Config.NotFound",
          assembleKey(appId, clusterName, originalNamespace, dataCenter));
      return null;
    }

    auditReleases(appId, clusterName, dataCenter, clientIp, releases);

    String mergedReleaseKey = releases.stream().map(Release::getReleaseKey)
            .collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));

    if (mergedReleaseKey.equals(clientSideReleaseKey)) {
      // Client side configuration is the same with server side, return 304
      response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
      Tracer.logEvent("Apollo.Config.NotModified",
          assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));
      return null;
    }

    ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace,
        mergedReleaseKey);
    apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));

    Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded,
        originalNamespace, dataCenter));
    return apolloConfig;
  }

总结

在指定了 appId 的情况下,没有指定 namespace 获取不到配置,制定了 namespace,但是不属于当前 appId 的,则看是不是 public ,如果是 public 也可以正常获取配置信息。