这个方法就是更新客户端本地的注册表信息。
4.2 服务注册
// 如果进行服务注册的话 clientConfig.shouldEnforceRegistrationAtInit() 默认false
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
// todo 进行服务注册
if (!register()) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
如果在这里进行服务注册的话,需要配置文件中增加下面配置(默认是false):
eureka.client.should-enforce-registration-at-init: true
所以在这里是没有服务注册的,那么服务注册是在哪里呢?在会面分析续约定时任务时完成了服务注册,不过,我们在这里也看一下服务注册的代码:
boolean register() throws Throwable {
EurekaHttpResponse httpResponse;
try {
// todo 进行服务注册
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
}
...
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
接下来看:
@Override
public EurekaHttpResponse register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
Response response = null;
try {
Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
addExtraProperties(resourceBuilder);
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.accept(MediaType.APPLICATION_JSON)
.acceptEncoding("gzip")
.post(Entity.json(info));
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
服务注册:POST请求,path为:“apps/" + appName
4.3 定时任务
initScheduledTasks();
初始化定时任务。我们分别看一下:
4.3.1 定时更新客户端注册表任务
private void initScheduledTasks() {
// todo 拉取注册表 增量拉取定时任务
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
// 拉取间隔 默认是30s
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
// todo 放入定时任务,默认30s执行一次
// 在这里看只有一个任务,在任务完成的时候会重新开启一个新的任务,可以点进去看看
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
}
默认每隔30s 增量拉取注册表信息。拉取注册表信息,最终还是走我们上面介绍的fetchRegistry 方法。
我们看一下com.netflix.discovery.TimedSupervisorTask#run:
@Override
public void run() {
Future<?> future = null;
try {
// 使用Future,可以设定子线程的超时时间,这样当前线程就不用无限等待了
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
// 阻塞 获取任务的执行结果
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
// delay是个很有用的变量,后面会用到,这里记得每次执行任务成功都会将delay重置
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
// 任务线程超时的时候,就把delay变量翻倍,但不会超过外部调用时设定的最大延时时间
long newDelay = Math.min(maxDelay, currentDelay * 2);
// 设置为最新的值,考虑到多线程,所以用了CAS
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
// 一旦线程池的阻塞队列中放满了待处理任务,触发了拒绝策略,就会将调度器停掉
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
// 一旦出现未知的异常,就停掉调度器
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
// 这里任务要么执行完毕,要么发生异常,都用cancel方法来清理任务;
if (future != null) {
future.cancel(true);
}
// 只要调度器没有停止,就再指定等待时间之后在执行一次同样的任务
if (!scheduler.isShutdown()) {
// todo 下一次时间 再次执行这个任务
//这里就是周期性任务的原因:只要没有停止调度器,就再创建一次性任务,执行时间时delay的值,
//假设外部调用时传入的超时时间为30秒(构造方法的入参timeout),最大间隔时间为50秒(构造方法的入参expBackOffBound)
//如果最近一次任务没有超时,那么就在30秒后开始新任务,
//如果最近一次任务超时了,那么就在50秒后开始新任务(异常处理中有个乘以二的操作,乘以二后的60秒超过了最大间隔50秒)
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}