Spring Cloud OpenFeign 重试机制及其源码分析
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
文章目录
Retryer接口
Retryer接口的实现主要是continueOrPropagate方法该方法主要用于判断是否可以重试具体的判断逻辑我们在下面介绍
void continueOrPropagate(RetryableException e);
Retryer clone(); //Retryer继承了Cloneable接口
OpenFeign中提供的Retryer接口的两个实现
- Retryer.DefaultOpenFeign中提供的默认的重试实现
- Retryer.NEVER_RETRY在无任何自定义配置的情况下OpenFeign使用该重试配置表示不重试。其配置在FeignClientsConfiguration中
@Bean
@ConditionalOnMissingBean
public Retryer feignRetryer() {
return Retryer.NEVER_RETRY;
}
我们可以自定义覆盖该配置来设置重试接口。
Retryer.Default
源码如下我们做了一些基本的说明
class Default implements Retryer {
private final int maxAttempts; // 最大尝试次数
private final long period; // 间隔时间
private final long maxPeriod; // 最大间隔时间
int attempt; // 当前尝试了多少次
long sleptForMillis; // 已休眠了多久
public Default() {
this(100, SECONDS.toMillis(1), 5);
}
public Default(long period, long maxPeriod, int maxAttempts) {
this.period = period;
this.maxPeriod = maxPeriod;
this.maxAttempts = maxAttempts;
this.attempt = 1;
}
protected long currentTimeMillis() {
return System.currentTimeMillis();
}
public void continueOrPropagate(RetryableException e) {
if (attempt++ >= maxAttempts) { // 超过最大尝试次数则直接抛出异常e
throw e;
}
long interval;// 实际的间隔时间
if (e.retryAfter() != null) {
// retryAfter是RetryableException抛出时设置的这个我们在将ErrorEncoder的时候再说
interval = e.retryAfter().getTime() - currentTimeMillis();
if (interval > maxPeriod) {
interval = maxPeriod;
}
if (interval < 0) {
return;
}
} else {
// retryAfter 为null将使用period 来计算间隔时间
interval = nextMaxInterval();
}
try {
Thread.sleep(interval);// 休眠interval毫秒
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
throw e;
}
sleptForMillis += interval;
}
long nextMaxInterval() {
long interval = (long) (period * Math.pow(1.5, attempt - 1));
return interval > maxPeriod ? maxPeriod : interval;
}
@Override
public Retryer clone() {
return new Default(period, maxPeriod, maxAttempts);
}
}
Retryer.Default的continueOrPropagate方法主要就是计算实际的间隔时间然后线程休眠对应的时间如果达到重试次数则把RetryableException抛出
重试的源码分析
我们在前文中已经提到每次的FeignClient调用主要就是SynchronousMethodHandler的invoke和executeAndDecode方法我们再次来看看这两个方法
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Options options = findOptions(argv);
Retryer retryer = this.retryer.clone(); // 克隆Retryer
while (true) {// 重试循环
try {
// 执行并decode
return executeAndDecode(template, options);
} catch (RetryableException e) {// catch RetryableException异常
try {
// 执行retryery接口的continueOrPropagate方法
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
// 如果continueOrPropagate接着抛出RetryableException 异常则不再重试
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
// 休眠之后重试
continue;
}
}
}
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 12
response = response.toBuilder()
.request(request)
.requestTemplate(template)
.build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
// 执行请求时如果出现IOException则认为是请求时网络波动等问题影响了请求尝试重试
// errorExecuting方法是FeignException类下静态方法
// import static feign.FeignException.errorExecuting;
// 该方法直接抛出RetryableException
throw errorExecuting(request, e);
}
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
if (decoder != null)
return decoder.decode(response, metadata.returnType());
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
// 处理请求结果这里涉及到结果的Decoder可查看我们的上一篇文章 OpenFeign的基本配置
// 这里面除了我们上一篇文档的Decoder还有一个特殊的DecoderErrorDecoder
asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
metadata.returnType(),
elapsedTime);
try {
if (!resultFuture.isDone())
throw new IllegalStateException("Response handling not done");
return resultFuture.join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause != null)
throw cause;
throw e;
}
}
asyncResponseHandler.handleResponse源码解析
void handleResponse(CompletableFuture<Object> resultFuture,
String configKey,
Response response,
Type returnType,
long elapsedTime) {
// copied fairly liberally from SynchronousMethodHandler
boolean shouldClose = true;
try {
if (logLevel != Level.NONE) {
response = logger.logAndRebufferResponse(configKey, logLevel, response,
elapsedTime);
}
if (Response.class == returnType) {// 如果请求的结果类型就是Response则直接返回即可
if (response.body() == null) {
resultFuture.complete(response);
} else if (response.body().length() == null
|| response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
shouldClose = false;
resultFuture.complete(response);
} else {
// Ensure the response body is disconnected
final byte[] bodyData = Util.toByteArray(response.body().asInputStream());
resultFuture.complete(response.toBuilder().body(bodyData).build());
}
} else if (response.status() >= 200 && response.status() < 300) {
// 响应状态为200到300其实就是正确返回
if (isVoidType(returnType)) {// 如果返回结果为void
resultFuture.complete(null);
} else {// 执行我们的Decoder
final Object result = decode(response, returnType);
shouldClose = closeAfterDecode;
resultFuture.complete(result);
}
} else if (decode404 && response.status() == 404 && !isVoidType(returnType)) {
// 如果是404且不是void并且decode404的配置为true则执行Decoder
final Object result = decode(response, returnType);
shouldClose = closeAfterDecode;
resultFuture.complete(result);
} else {
如果decode404为false或者不属于以上的状态码如5xx异常则执行errorDecoder
resultFuture.completeExceptionally(errorDecoder.decode(configKey, response));
}
} catch (final IOException e) {
if (logLevel != Level.NONE) {
logger.logIOException(configKey, logLevel, e, elapsedTime);
}
// errorReading 抛出FeignException异常
resultFuture.completeExceptionally(errorReading(response.request(), response, e));
} catch (final Exception e) {
resultFuture.completeExceptionally(e);
} finally {
if (shouldClose) {
ensureClosed(response.body());
}
}
}
ErrorDecoder源码分析
ErrorDecoder只有一个方法
public Exception decode(String methodKey, Response response);
OpenFeign默认使用的ErrorDecoder是ErrorDecoder.Default
ErrorDecoder.Default 是ErrorDecoder的一个内部类
public static class Default implements ErrorDecoder {
// RetryAfterDecoder 也是ErrorDecoder的一个内部类其主要方法apply
private final RetryAfterDecoder retryAfterDecoder = new RetryAfterDecoder();
@Override
public Exception decode(String methodKey, Response response) {
// 获取当前的异常类根据不同的status返回不同的异常类)
FeignException exception = errorStatus(methodKey, response);
// 使用retryAfterDecoder获取响应头信息Retry-After的值
// Retry-After可以是数字比如100表示100秒之后重试
// Retry-After也可以是Date类型的字符串默认格式为new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss 'GMT'", US);
// 我们可以自定义ErrorDecoder然后自定义DateFormat来满足我们的日期格式
Date retryAfter = retryAfterDecoder.apply(firstOrNull(response.headers(), RETRY_AFTER));
if (retryAfter != null) {
// 如果retryAfter不为空则抛出RetryableException异常这样就会进入重试
return new RetryableException(
response.status(),
exception.getMessage(),
response.request().httpMethod(),
exception,
retryAfter,
response.request());
}
return exception;
// 如果retryAfter 为空则直接将请求结果FeignException 异常抛出而且不会进入重试
}
private <T> T firstOrNull(Map<String, Collection<T>> map, String key) {
if (map.containsKey(key) && !map.get(key).isEmpty()) {
return map.get(key).iterator().next();
}
return null;
}
}
总结
OpenFeign的重试机制基于Retryer接口其continueOrPropagate方法主要计算重试间隔的时间以及可以重试的次数。
OpenFeign默认使用Retryer.NEVER_RETRY不重试。我们可以设置Retryer.Default来开启重试
@Bean
@ConditionalOnMissingBean
public Retryer feignRetryer() {
return new Retryer.Default();// 默认period:100ms,maxPeriod:1000ms,maxAttempts:5
}
有两个地方会进入重试机制
- 进行Http调用时client.execute(request, options);如果出现IOException异常会自动进入重试。
- 出现服务端错误进入ErrorDecoder后如果响应设置了响应头Retry-After时才会进行重试。