从实现原理来讲,Nacos 为什么这么强?

作者:微信小助手

发布时间:2022-07-08T12:30:10

今天来分享一下Nacos注册中心的底层原理,从服务注册到服务发现,非常细致

1. Nacos介绍

再讲Nacos之前,先来讲一下服务注册和发现。我们知道,现在微服务架构是目前开发的一个趋势。服务消费者要去调用多个服务提供者组成的集群。这里需要做到以下几点:

  1. 服务消费者 需要在本地配置文件中维护服务提供者集群的每个节点的请求地址
  2. 服务提供者集群中如果某个节点宕机, 服务消费者的本地配置中需要同步删除这个节点的请求地址,防止请求发送到已经宕机的节点上造成请求失败。

因此需要引入服务注册中心,它具有以下几个功能:

  1. 服务地址的管理。
  2. 服务注册。
  3. 服务动态感知。

而Nacos致力于解决微服务中的统一配置,服务注册和发现等问题。Nacos集成了注册中心和配置中心。其相关特性包括:

1.服务发现和服务健康监测

Nacos支持基于DNS和RPC的服务发现,即服务消费者可以使用DNS或者HTTP的方式来查找和发现服务。Nacos提供对服务的实时的健康检查,阻止向不健康的主机或者服务实例发送请求。Nacos支持传输层(Ping/TCP)、应用层(HTTP、Mysql)的健康检查。

2.动态配置服务

动态配置服务可以以中心化、外部化和动态化的方式管理所有环境的应用配置和服务配置。

3.动态DNS服务

支持权重路由,让开发者更容易的实现中间层的负载均衡、更灵活的路由策略、流量控制以及DNS解析服务。

4.服务和元数据管理

Nacos允许开发者从微服务平台建设的视角来管理数据中心的所有服务和元数据。如:服务的生命周期、静态依赖分析、服务的健康状态、服务的流量管理、路由和安全策略等。

2. Nacos注册中心实现原理分析

2.1 Nacos架构图

以下是Nacos的架构图:其中分为这么几个模块:

  • Provider APP:服务提供者。

  • Consumer APP:服务消费者。

  • Name Server:通过Virtual IP或者DNS的方式实现Nacos高可用集群的服务路由。

  • Nacos Server:Nacos服务提供者。

    • OpenAPI:功能访问入口。
    • Config Service、Naming Service:Nacos提供的配置服务、名字服务模块。
    • Consistency Protocol:一致性协议,用来实现Nacos集群节点的数据同步,使用Raft算法实现。
    • 其中包含:
  • Nacos Console:Nacos控制台。

小总结

  • 服务提供者通过VIP(Virtual IP)访问Nacos Server高可用集群,基于OpenAPI完成服务的注册和服务的查询。
  • Nacos Server的底层则通过数据一致性算法(Raft)来完成节点的数据同步。

2.2 注册中心的原理

这里对其原理做一个大致的介绍,在后文则从源码角度进行分析。

首先,服务注册的功能体现在:

  • 服务实例启动时注册到服务注册表、关闭时则注销( 服务注册)。
  • 服务消费者可以通过查询服务注册表来获得可用的实例( 服务发现)。
  • 服务注册中心需要调用服务实例的健康检查API来验证其是否可以正确的处理请求( 健康检查)。

Nacos服务注册和发现的实现原理的图如下:

3. Nacos源码分析

前提(在本地或者虚机上先启动好Nacos) 这一部分从2个角度来讲Nacos是如何实现的:

  • 服务注册。
  • 服务发现

3.1 Nacos服务注册

首先看下一个包:spring-cloud-commons这个ServiceRegistry接口是SpringCloud提供的服务注册的标准,集成到SpringCloud中实现服务注册的组件,都需要实现这个接口。来看下它的结构:

public interface ServiceRegistry<R extends Registration{
    void register(R registration);

    void deregister(R registration);

    void close();

    void setStatus(R registration, String status);

    <T> getStatus(R registration);
}

那么对于Nacos而言,该接口的实现类是NacosServiceRegistry,该类在这个pom包下:

再回过头来看spring-cloud-commons包:spring.factories主要是包含了自动装配的配置信息,如图:在我之前的文章里我有提到过,在spring.factories中配置EnableAutoConfiguration的内容后,项目在启动的时候,会导入相应的自动配置类,那么也就允许对该类的相关属性进行一个自动装配。那么显然,在这里导入了AutoServiceRegistrationAutoConfiguration这个类,而这个类顾名思义是服务注册相关的配置类

该类的完整代码如下:

@Configuration(
    proxyBeanMethods = false
)
@Import({AutoServiceRegistrationConfiguration.class})
@ConditionalOnProperty(
    value 
= {"spring.cloud.service-registry.auto-registration.enabled"},
    matchIfMissing = true
)
public class AutoServiceRegistrationAutoConfiguration {
    @Autowired(
        required = false
    )
    private AutoServiceRegistration autoServiceRegistration;
    @Autowired
    private AutoServiceRegistrationProperties properties;

    public AutoServiceRegistrationAutoConfiguration() {
    }

    @PostConstruct
    protected void init() {
        if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
            throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");
        }
    }
}

这里做一个分析,AutoServiceRegistrationAutoConfiguration中注入了AutoServiceRegistration实例,该类的关系图如下:我们先来看一下这个抽象类AbstractAutoServiceRegistration

public abstract class AbstractAutoServiceRegistration<R extends Registrationimplements AutoServiceRegistration
ApplicationContextAware
ApplicationListener<WebServerInitializedEvent
{
 public void onApplicationEvent(WebServerInitializedEvent event) {
     this.bind(event);
 }
}

这里实现了ApplicationListener接口,并且传入了WebServerInitializedEvent作为泛型,啥意思嘞,意思是:

  • NacosAutoServiceRegistration监听 WebServerInitializedEvent事件。
  • 也就是WebServer初始化完成后,会调用对应的事件绑定方法,调用 onApplicationEvent(),该方法最终调用 NacosServiceRegistryregister()方法( NacosServiceRegistry实现了Spring的一个服务注册标准接口)。

对于register()方法,主要调用的是Nacos Client SDK中的NamingService下的registerInstance()方法完成服务的注册

public void register(Registration registration) {
    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
    } else {
        String serviceId = registration.getServiceId();
        String group = this.nacosDiscoveryProperties.getGroup();
        Instance instance = this.getNacosInstanceFromRegistration(registration);

        try {
            this.namingService.registerInstance(serviceId, group, instance);
            log.info("nacos registry, {} {} {}:{} register finished"new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
        } catch (Exception var6) {
            log.error("nacos registry, {} register failed...{},"new Object[]{serviceId, registration.toString(), var6});
            ReflectionUtils.rethrowRuntimeException(var6);
        }

    }
}

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        long instanceInterval = instance.getInstanceHeartBeatInterval();
        beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
        // 1.addBeatInfo()负责创建心跳信息实现健康监测。因为Nacos Server必须要确保注册的服务实例是健康的。
        // 而心跳监测就是服务健康监测的一种手段。
        this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }
 // 2.registerService()实现服务的注册
    this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

再来看一下心跳监测的方法addBeatInfo()

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }

    this.dom2Beat.put(key, beatInfo);
    // 通过schedule()方法,定时的向服务端发送一个数据包,然后启动一个线程不断地检测服务端的回应。
    // 如果在指定的时间内没有收到服务端的回应,那么认为服务器出现了故障。
    // 参数1:可以说是这个实例的相关信息。
    // 参数2:一个long类型的时间,代表从现在开始推迟执行的时间,默认是5000
    // 参数3:时间的单位,默认是毫秒,结合5000即代表每5秒发送一次心跳数据包
    this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}