探秘微服务治理之Spring Cloud Netflix Eureka

团队介绍

光大科技智能云计算部云计算团队云管项目组,致力于金融行业多云管理平台的咨询、设计、研发、部署及实施工作。我团队在云计算、虚拟化、存储、云管理平台建设领域拥有多名经验丰富的技术专家。将不定期与大家分享原创技术文章和相关实践经验,期待与大家共同探讨和进步。

探秘微服务治理之Spring Cloud Netflix Eureka

说起微服务框架,可能大家首先想到的就是Spring Cloud的大家族了。微服务简单理解就是很小、功能相对集中的服务。在微服务的思想下,原有功能都集中在一起实现的业务,现在拆分出来很多的服务模块,那服务之间的互相调用就不可避免,同时服务的高可用也要能够有所保证。要解决上述需要,少不了微服务中至关重要的内容,即服务治理。

服务治理包括三大组件:服务注册中心、服务的注册、服务的发现。举个例子:就好比我们租房,房东呢,想要把房子租出去;房客想要租房。房东把自己的房子信息挂在租房平台上,房客通过租房平台了解到出租房屋的信息,进而可以给房东打电话租房。

探秘微服务治理之Spring Cloud Netflix Eureka

这里呢租房平台就相当于服务注册中心,房东在平台上登记出租房屋相当于房屋的注册,房客从平台上获取出租信息相当于服务的发现。进入到程序的世界,Spring Cloud Eureka组件就提供了服务治理的功能。Eureka的服务端就是注册中心;客户端既可以是消费者也可以是提供者,承担着服务的注册和发现。

探秘微服务治理之Spring Cloud Netflix Eureka

简单总结各种解决要提供的主要功能:

服务端:

  1. 接收注册。

  2. 接收续期。

  3. 提供服务下线。

  4. 提供服务列表

客户端:

  1. 注册服务。

  2. 续期。

  3. 获取服务列表。

  4. 维持心跳。

大体上了解之后,我们就来探索一下在底层是如何实现的。

搭建Eureka环境(单实例环境)

基于Springboot我们可以轻松的搭建Eureka的客户端和服务端。

一、服务端搭建:

引入依赖:

<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency>

主类上添加注解:

@SpringBootApplication@EnableEurekaServerpublic class EurekaServer_6001 {    public static void main(String[] args) {        SpringApplication.run(EurekaServer_6001.class, args);    }}

在application.properties中相关Eureka的配置:

eureka:  instance:    hostname: eurekapeer1 #eureka的注册实例名称  client:    register-with-eureka: false # 服务注册,不让自己注册到eureka中    fetch-registry: false # 服务的发现 , 自己不从服务中发现信息    service-url:      defaultZone: http://localhost:6001/eureka/

启动服务,使用浏览器访问配置的端口,可以看到Eureka的服务端的页面:

探秘微服务治理之Spring Cloud Netflix Eureka

二、客户端搭建:

引入依赖:

<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>

主类添加注解:

@EnableDiscoveryClient@SpringBootApplicationpublic class ManagerZuulFilter {    public static void main(String[] args) {        SpringApplication.run(ManagerZuulFilter.class,args);    }}

在application.properties中相关Eureka的配置:

eureka:  client:    register-with-eureka: true #服务注册开关    fetch-registry: true #服务发现开关    serviceUrl:       defaultZone: http://localhost:6001/eureka/

启动服务,刷新服务端Eureka的页面,看到客户端服务注册到Eureka的页面。

探秘微服务治理之Spring Cloud Netflix Eureka

关闭客户端服务,服务端Eureka的管理页面,客户端的服务从列表中下线。

源码详解

一、客户端源码分析

客户端服务的治理包括:服务的注册、服务的续期、服务的下线、服务列表的发现等。

Eureka的注册和发现的内容在DiscoveryClient类中。

@Inject    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {        //...省略一些初始化的内容       //不注册到Eureka server并且不从Eureka server中获取服务列表,相当于单节点的Eureka服务端        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {            logger.info(\"Client configured to neither register nor query for data.\");            this.scheduler = null;            this.heartbeatExecutor = null;            this.cacheRefreshExecutor = null;            this.eurekaTransport = null;            this.instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), this.clientConfig.getRegion());            DiscoveryManager.getInstance().setDiscoveryClient(this);            DiscoveryManager.getInstance().setEurekaClientConfig(config);            this.initTimestampMs = System.currentTimeMillis();            logger.info(\"Discovery Client initialized at timestamp {} with initial instances count: {}\", this.initTimestampMs, this.getApplications().size());        } else {            try {                //注册到Eureka并且从Eureka中获取服务列表的逻辑处理                this.scheduler = Executors.newScheduledThreadPool(2, (new ThreadFactoryBuilder()).setNameFormat(\"DiscoveryClient-%d\").setDaemon(true).build());                //创建心跳定时执行任务                this.heartbeatExecutor = new ThreadPoolExecutor(1, this.clientConfig.getHeartbeatExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat(\"DiscoveryClient-HeartbeatExecutor-%d\").setDaemon(true).build());                //创建刷新缓存定时执行任务                this.cacheRefreshExecutor = new ThreadPoolExecutor(1, this.clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat(\"DiscoveryClient-CacheRefreshExecutor-%d\").setDaemon(true).build());             //注册前处理器,可以在服务注册到Eureka之前做些操作            if (this.preRegistrationHandler != null) {                this.preRegistrationHandler.beforeRegistration();            }            //是否强制初始阶段就注册到Eureka            if (this.clientConfig.shouldRegisterWithEureka() && this.clientConfig.shouldEnforceRegistrationAtInit()) {                try {                    if (!this.register()) {                        throw new IllegalStateException(\"Registration error at startup. Invalid server response.\");                    }                } catch (Throwable var8) {                    logger.error(\"Registration error at startup: {}\", var8.getMessage());                    throw new IllegalStateException(var8);                }            }
//执行定时任务 this.initScheduledTasks();
try { Monitors.registerObject(this); } catch (Throwable var7) { logger.warn(\"Cannot register timers\", var7); }
DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); this.initTimestampMs = System.currentTimeMillis(); logger.info(\"Discovery Client initialized at timestamp {} with initial instances count: {}\", this.initTimestampMs, this.getApplications().size()); } }
//初始化定时任务private void initScheduledTasks() { int renewalIntervalInSecs; int expBackOffBound; //定时获取服务列表任务开启 if (this.clientConfig.shouldFetchRegistry()) { renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds(); expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); this.scheduler.schedule(new TimedSupervisorTask(\"cacheRefresh\", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS); } //注册到Eureka,心跳定时任务开始。要让Eureka Server知道客户端还活着 if (this.clientConfig.shouldRegisterWithEureka()) { renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info(\"Starting heartbeat executor: renew interval is: {}\", renewalIntervalInSecs); this.scheduler.schedule(new TimedSupervisorTask(\"heartbeat\", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS); this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); this.statusChangeListener = new StatusChangeListener() { public String getId() { return \"statusChangeListener\"; }
public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) { DiscoveryClient.logger.info(\"Saw local status change event {}\", statusChangeEvent); } else { DiscoveryClient.logger.warn(\"Saw local status change event {}\", statusChangeEvent); }
DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate(); } }; if (this.clientConfig.shouldOnDemandUpdateStatusChange()) { this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener); } this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info(\"Not registering with Eureka server per configuration\"); }
}
//注册发送,REST请求,注册服务到Eureka Server中。boolean register() throws Throwable { logger.info(\"DiscoveryClient_{}: registering service...\", this.appPathIdentifier);
EurekaHttpResponse httpResponse; try { httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo); } catch (Exception var3) { logger.warn(\"DiscoveryClient_{} - registration failed {}\", new Object[]{this.appPathIdentifier, var3.getMessage(), var3}); throw var3; }
if (logger.isInfoEnabled()) { logger.info(\"DiscoveryClient_{} - registration status: {}\", this.appPathIdentifier, httpResponse.getStatusCode()); }
return httpResponse.getStatusCode() == 204; }

查看线程,发现确实有上述几类线程。

二、服务端源码分析

服务端的主要逻辑就是:接收客户端的注册/续期/下线等请求、提供服务列表、维护注册信息等。

#eureka server端的功能代码主要是在eureka-core包中主要涉及类和接口有: 

  • EurekaServerBootstrap

  • InstanceRegistry

下面主要为大家讲一下客户端注册在服务端如何处理:eureka-server包下org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap初始化eurekserver相关的内容:

public class EurekaServerBootstrap {    public void contextInitialized(ServletContext context) {        try {            this.initEurekaEnvironment();//初始化eureka的部署环境,默认test            this.initEurekaServerContext();//初始化eureka server的上下文            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);        } catch (Throwable var3) {            log.error(\"Cannot bootstrap eureka server :\", var3);            throw new RuntimeException(\"Cannot bootstrap eureka server :\", var3);        }    }}

eureka-server包下的org.springframework.cloud.netflix.eureka.server.InstanceRegistry中方法为跟服务治理有关系的内容。

public class InstanceRegistry extends PeerAwareInstanceRegistryImpl implements ApplicationContextAware {    public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {        this.handleRegistration(info, leaseDuration, isReplication);        super.register(info, leaseDuration, isReplication);//带有租期的注册,默认租期90s    }}
public abstract class AbstractInstanceRegistry implements InstanceRegistry { //存放注册到eureka server上的服务的数据接口 //第一层的String key是指服务名 //第二层的String key是指每个服务的instanceId:默认hostmame:applicationName:port //最里面的InstanceInfo就是一个服务的详细信息 private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap(); //客户端注册到服务端的具体处理过程 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { this.read.lock(); //该Map存储的就是eureka server上的服务信息 //根据应用名获取该应用名下的列表 Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName()); EurekaMonitors.REGISTER.increment(isReplication); if (gMap == null) { //没有注册过,则新创建 ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap(); gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } //通过instanceId获取具体的服务 Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId()); if (existingLease != null && existingLease.getHolder() != null) { Long existingLastDirtyTimestamp = ((InstanceInfo)existingLease.getHolder()).getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug(\"Existing lease found (existing={}, provided={}\", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn(\"There is an existing lease and the existing lease\'s dirty timestamp {} is greater than the one that is being registered {}\", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn(\"Using the existing instanceInfo instead of the new instanceInfo as the registrant\"); registrant = (InstanceInfo)existingLease.getHolder(); } } else { synchronized(this.lock) { if (this.expectedNumberOfRenewsPerMin > 0) { this.expectedNumberOfRenewsPerMin += 2; this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfRenewsPerMin * this.serverConfig.getRenewalPercentThreshold()); } }
logger.debug(\"No previous lease information found; it is new registration\"); } //创建新的租期信息 Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } //注册服务或者更新服务 ((Map)gMap).put(registrant.getId(), lease); synchronized(this.recentRegisteredQueue) { this.recentRegisteredQueue.add(new Pair(System.currentTimeMillis(), registrant.getAppName() + \"(\" + registrant.getId() + \")\")); }
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug(\"Found overridden status {} for instance {}. Checking to see if needs to be add to the overrides\", registrant.getOverriddenStatus(), registrant.getId()); if (!this.overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info(\"Not found overridden id {} and hence adding it\", registrant.getId()); this.overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = (InstanceStatus)this.overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info(\"Storing overridden status {} from map\", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); }
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); }
registrant.setActionType(ActionType.ADDED); this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); this.invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info(\"Registered instance {}/{} with status {} (replication={})\", new Object[]{registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication}); } finally { this.read.unlock(); } }}

配置类

常用配置说明:客户端、服务端、以及实例配置可以直接在配置类的属性中查到:

客户端配置:前缀为eureka.client

org.springframework.cloud.netflix.eureka.EurekaClientConfigBean

服务端配置:前缀为eureka.server

org.springframework.cloud.netflix.eureka.server.EurekaServerConfigBean

实例配置:前缀eureka.instance

org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean

常用配置

注册中心配置:

#关闭注册中心的保护机制,Eureka 会统计15分钟之内心跳失败的比例低于85%将会触发保护机制,不剔除服务提供者eureka.server.enable-self-preservation = false

服务实例配置:

#不使用主机名来定义注册中心的地址,而使用IP地址的形式eureka.instance.prefer-ip-address = true

服务注册类配置:

#获取服务注册列表eureka.client.fetch-registery = true#注册服务到Eureka Servereureka.client.register-with-eureka = true#从Eureka服务器端获取注册信息的间隔时间,单位:秒eureka.client.registery-fetch-interval-seconds = 30#读取EurekaServer信息的超时时间,单位:秒eureka.client.eureka-server-read-timeout-seconds = 8#连接EurekaServer的超时时间,单位:秒eureka.client.eureka-server-connect-timeout-seconds = 5

通用配置:

eureka.client.service-url.defaultZone = http://localhost:8761/eureka

Spring cloud Eureka中还提供了其他的一些功能,比如认证和限流的内容,有兴趣的可以了解一下。除Eureka提供的服务治理外,还有其他的框架也可以实现服务治理,如Dubbo+Zookeeper,有兴趣可以对比看一下。

框架可以算是一种工具,了解工具的内部是为了更好的让工具为我们服务。优秀框架的设计思想是我们要学习的重点,抽丝剥茧之后,原理其实很基础。

往期回顾

1、CDN加速原理

2、mybatis框架的插件机制

3、玩转Selenium——Web页面自动化实战

4、自定义控制器开发(一)

欢迎关注EBCloud!

作者|董月

原创文章,作者:EBCloud,如若转载,请注明出处:https://www.sudun.com/ask/32880.html

(0)
EBCloud's avatarEBCloud
上一篇 2024年4月2日 下午3:28
下一篇 2024年4月2日 下午3:28

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注