SpringCloud客户端负载均衡——Ribbon

Ribbon——A ribbon is a long, narrow piece of cloth that you use for tying things together or as a decoration.

成都创新互联公司主营陇川网站建设的网络公司,主营网站建设方案,app软件开发公司,陇川h5小程序定制开发搭建,陇川网站营销推广欢迎陇川等地区企业咨询

Ribbon是一个工具类框架,不需要独立部署。

负载均衡设备/负载均衡软件模块都会维护一个可用的服务清单,通过心跳检测来剔除故障节点,保证清单中都是可用节点。

客户端负载均衡,由客户端节点维护要访问的服务清单,服务清单来自于注册中心。

如前所示,使用客户端负载均衡调用分两步:

1. 服务提供者注册到服务中心。

2. 服务消费者通过标有@LoadBalanced注解的RestTemplate进行服务调用。

在service-consumer服务中,通过调用RestTemplate的getForEntity方法,GET调用hello-service的/hello接口。

RestTemplate

GET

RestTemplate有两类GET实现:getForEntity和getForObject。

getForEntity()有三个重载实现,均返回ResponseEntity,

// url为请求地址,responseType为响应体body的类型,uriVariables为url参数

// uriVariables配合url中的占位符进行动态传参,如:

// entity = getForEntity("http://user-serivce/user?name={1}", String.class, "John");,将John传给参数name

public ResponseEntity getForEntity(String url, Class responseType, Object... uriVariables) throws RestClientException {

RequestCallback requestCallback = acceptHeaderRequestCallback(responseType); // new AcceptHeaderRequestCallback(responseType)

ResponseExtractor> responseExtractor = responseEntityExtractor(responseType); // new ResponseEntityResponseExtractor<>(responseType)

return nonNull(execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables));

}

// uriVariables为Map类型,key需要与url中的占位符对应,如:

// params.put("myname", "John");

// entity = getForEntity("http://user-service/user?name={myname}", String.class, params); 将key为myname对应的value——John传给name

public ResponseEntity getForEntity(String url, Class responseType, Map uriVariables);

// 使用URI对象代替url和uriVariables

public ResponseEntity getForEntity(URI url, Class responseType);

// 使用:

ResponseEntity entity = restTemplate.getForEntity(url, String.class, "John");

String body = entity.getBody();

ResponseEntity entity = restTemplate.getForEntity(url, User.class, "John");

User body = entity.getBody();

getForObject()也有三个重载实现,传入execute方法的不是ResponseExtractor,而是HttpMessageConverterExtractor,返回的则是对象类型,三个重载和getForEntity的三个重载关系类似:

public T getForObject(String url, Class responseType, Object... uriVariables) throws RestClientException {

RequestCallback requestCallback = acceptHeaderRequestCallback(responseType); // new AcceptHeaderRequestCallback(responseType)

HttpMessageConverterExtractor responseExtractor = new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);

return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);

}

public T getForObject(String url, Class responseType, Map uriVariables);

public T getForObject(URI url, Class responseType);

// 使用:

String result = restTemplate.getForObject(url, String.class, "John");

User user = restTempleate.getForObject(url, User.class, "John");

// 使用getForObject可以省略从response中获取body的步骤

POST

RestTemplate有三类POST实现:postForEntity和postForObject,postForLocation。

postForEntity()有三个重载实现,均返回ResponseEntity,

// 相较于getForEntity,新增参数Object request,reqeust如果是HttpEntity对象,RestTemplate将其当作完整的http请求对象处理,request中包含了header和body的内容。如果request是普通对象,RestTemplate将其转换为HttpEntity来处理,request作为body。

// if (request instanceof HttpEntity){this.requestEntity = (HttpEntity) request; }

// else if (requestBody != null) { this.requestEntity = new HttpEntity<>(request); }

// else { this.requestEntity = HttpEntity.EMPTY; }

// 使用:

// User user = new User("didi", 30);

// entity = postForEntity("http://user-serivce/user", user, String.class, "John");,将John传给参数name

public ResponseEntity postForEntity(String url, Object request, Class responseType, Object... uriVariables) throws RestClientException {

RequestCallback requestCallback = httpEntityCallback(request, responseType); // new HttpEntityRequestCallback(requestBody, responseType)

ResponseExtractor> responseExtractor = responseEntityExtractor(responseType); // new ResponseEntityResonseExtracor<>(responseType)

return nonNull(execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables));

}

public ResponseEntity postForEntity(String url, Object request, Class responseType, Map uriVariables);

public ResponseEntity postForEntity(URI url, Object request, Class responseType);

postForObject()也有三个重载实现,传入execute方法的不是ResponseExtractor,而是HttpMessageConverterExtractor,返回的则是对象类型,三个重载和postForEntity的三个重载关系类似:

public T postForObject(String url, Object request, Class responseType, Object... uriVariables) throws RestClientException {

RequestCallback requestCallback = httpEntityCallback(request, responseType);

HttpMessageConverterExtractor responseExtractor = new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);

return execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables);

}

public T postForObject(String url, Object request, Class responseType, Map uriVariables);

public T postForObject(URI url, Object request, Class responseType);

postForLocation()用于发送post请求,返回新资源的URI,有三个重载实现,均返回URI对象,

public URI postForLocation(String url, Object request, Object... uriVariables) throws RestClientException {

RequestCallback requestCallback = httpEntityCallback(request); // new HttpEntityRequestCallback(request, null)

HttpHeaders headers = execute(url, HttpMethod.POST, requestCallback, headersExtractor(), uriVariables); // new HeadersExtractor()

return (headers != null ? headers.getLocation() : null);

}

public URI postForLocation(String url, Object request, Map uriVariables);

public URI postForLocation(URI url, Object ruquest);

execute

RestTemplate中,不同的请求方式,最终会调用到execute的三个重载实现上来

// ------------------------------------

public T execute(String url, HttpMethod method, RequestCallback requestCallback, ResponseExtractor responseExtractor, Object... uriVariables) {

URI expanded = getUriTemplateHandler().expand(url, uriVariables);

return doExecute(expanded, method, requestCallback, responseExtractor);

}

public T execute(String url, HttpMethod method, RequestCallback requestCallback, ResponseExtractor responseExtractor, Map uriVariables);

public T execute(URI url, HttpMethod method, RequestCallback requestCallback, ResponseExtractor responseExtractor);

execute()的三个重载实现,都会调用doExecute()方法,去执行请求

protected T doExecute(URI url, HttpMethod method, RequestCallback requestCallback, ResponseExtractor responseExtractor) throws RestClientException {

ClientHttpResponse response = null;

ClientHttpRequest request = createRequest(url, method);

if (requestCallback != null) { requestCallback.doWithRequest(request); }

response = request.execute();// 此处执行前会被拦截

handleResponse(url, method, response);

return (responseExtractor != null ? responseExtractor.extractData(response) : null);

}

doExecute()方法接收的参数中,有RequestCallback和ResponseExtractor。

RequestCallback

AcceptHeaderRequestCallback implements RequestCallback // AcceptHeaderRequestCallback用于GET请求

HttpEntityRequestCallback extends AcceptHeaderRequestCallback // HttpEntityRequestCallback 用于POST、PUT等请求

ResponseEntity

// ResponseEntity扩展自HttpEntity,增加了http的status(http请求状态码)

package org.springframework.http;

public class ResponseEntity extends HttpEntity {

private final Object status; // status为int或HttpStatus类型

// getter/setter...

}

// HttpEntity表示http的request或response的entity,包含headers(http请求的头信息)和body(http请求的请求体)

package org.springframework.http;

public class HttpEntity {

private final HttpHeaders headers;

private final T body;

// getter/setter...

}

@LoadBalanced

在服务消费者中,给RestTemplate添加了@LoadBalanced注解,根据注释,该注解用于标记RestTemplate使用LoadBalancerClient来配置,即客户端负载均衡器。

// Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.

public @interface LoadBalanced {}

LoadBalancerClient

客户端负载均衡器,具有如下能力:

// Represents a client-side load balancer. 即客户端负载均衡器

interface LoadBalancerClient extends ServiceInstanceChooser {

// 使用serviceId服务执行request请求

T execute(String serviceId, LoadBalancerRequest request) throws IOException;

T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request) throws IOException;

// 将逻辑服务名http://myservice/path/to/service 替换为host:port的形式

URI reconstructURI(ServiceInstance instance, URI original);

}

// 选择一个server用来发送请求的实现接口

interface ServiceInstanceChooser {

// 根据serviceId,从负载均衡器选择一个服务实例ServiceInstance

ServiceInstance choose(String serviceId);

}

LoadBalancerClient有一个实现类RibbonLoadBalancerClient。

在RestTemplate的doExecute()方法中,调用request.execute()之前,会被LoadBalancerInterceptor拦截。该拦截器中有一个LoadBalancerClient实例,此外该拦截器在LoadBalancerAutoConfiguration中被创建。而LoadBalancerAutoConfiguration有两个特殊的注解@ConditionalOnClass(RestTemplate.class)和@ConditionalOnBean(LoadBalancerClient.class),且注释明确说明LoadBalancerAutoConfiguration为Ribbon的自动化配置类。

spring-cloud-commons的loadbalancer包中的配置类,以2.1.2为例

LoadBalancerAutoConfiguration:创建LoadBalancerInterceptor、创建RestTemplateCustomizer(匿名内部类)、创建LoadBalancerRequestFactory、创建SmartInitializingSingleton(匿名内部类)

AsyncLoadBalancerAutoConfiguration:针对AsyncRestTemplate做的类似配置

spring-cloud-netflix-ribbon中的几个配置类,以2.1.2为例

RibbonClientConfiguration:创建IClientConfig、创建IRule,创建IPing,创建ServerList,创建ServerListUpdater,创建ILoadBalancer(使用ZoneAwareLoadBalancer实现),创建ServerListFilter、创建RibbonLoadBalancerContext,创建RetryHandler,创建ServerIntrospector

RibbonAutoConfiguration:创建HasFeatures,创建SpringClientFactory,创建LoadBalancerClient(使用RibbonLoadBalancerClient实现),创建LoadBalancedRetryFactory(使用RibbonLoadBalancedRetryFactory实现),创建PropertiesFactory,创建RibbonApplicationContextInitializer,创建RestTemplateCustomizer(使用匿名内部类),创建RibbonClientHttpRequestFactory

RestCilentRibbonConfiguration:创建RestClient

LoadBalancerAutoConfiguration

Ribbon的自动化配置类:

@Configuration

@ConditionalOnClass(RestTemplate.class) // 需要RestTemplate类在classpath中

@ConditionalOnBean(LoadBalancerClient.class) // 需要LoadBalancerClient的实现Bean在BeanFactory中

@EnableConfigurationProperties(LoadBalancerRetryProperties.class)

class LoadBalancerAutoConfiguration {

@LoadBalanced

@Autowired(required = false)

// 工程中注册的RestTemplate的Bean会在此被加载

private List restTemplates = Collections.emptyList();

@Autowired(required = false)

private List transformers = Collections.emptyList();

// 创建SmartInitializingSingleton的Bean,负责用每个Customizer去修饰每个RestTemplate

@Bean

public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(

ObjectProvider> restTemplateCustomizers) {

return () -> restTemplateCustomizers.ifAvailable(customizers -> {

for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {

for (RestTemplateCustomizer customizer : customizers) {

customizer.customize(restTemplate);

}

}

});

}

@Configuration

@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")

static class LoadBalancerInterceptorConfig {

// 创建拦截器Bean,入参为客户端负载均衡器

@Bean

public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {

return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);

}

// 创建一个RestTemplateCustomizer的Bean,负责将负载均衡拦截器加到入参RestTemplate的拦截器列表中,添加方式为get、add、set

@Bean

@ConditionalOnMissingBean

public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {

return restTemplate -> {

List list = new ArrayList<>(restTemplate.getInterceptors());

list.add(loadBalancerInterceptor);

restTemplate.setInterceptors(list);

}

}

}

}

LoadBalancerInterceptor

负载均衡拦截器,用于在请求最终执行前进行拦截,在拦截器的intercept()方法中,首先从request中获取服务名称serviceName,然后调用request工厂的createRequest()方法,创建一个负载均衡的request——LoadBalancerRequest实例,最后将其连同serviceName一起作为LoadBalancerClient的execute()方法的入参。

class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

LoadBalancerClient loadBalancer;

LoadBalancerRequestFactory requestFactory;

public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) {

// 此处调用request的getURI方法,

final URI originalUri = request.getURI();

String serviceName = originalUri.getHost();

return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));

}

}

LoadBalancerRequestFactory——创建LoadBalancerRequest的工厂

public LoadBalancerRequest createRequest(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) {

// 返回LoadBalancerRequest的匿名内部类

return instance -> {

// 该LoadBalancerRequest的匿名内部类实现,先创建一个ServiceRequestWrapper的request,然后调用execution的execute方法执行请求

HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);

return execution.execute(serviceRequest, body);

};

}

LoadBalancerRequest

LoadBalancerRequest使用该接口的apply()的方法,为request添加处理动作

interface LoadBalancerRequest {

T apply(ServiceInstance instance);

}

ServiceRequestWrapper

ServiceRequestWrapper继承自HttpRequestWrapper,HttpRequestWrapper对外提供了获取一个request的method、URI、headers、methodValue等信息的方法。

ServiceRequestWrapper改写了默认的getURI()方法,使用客户端负载均衡器LoadBalancerClient的重构URI的方法,将入参request的URI进行重构,其具体实现在LoadBalancerClient的实现类RibbonLoadBalancerClient中。

class ServiceRequestWrapper extends HttpRequestWrapper {

private final ServiceInstance instance;

private final LoadBalancerClient loadBalancer;

public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance, LoadBalancerClient loadBalancer) {}

@Override

public URI getURI() {

// 调用loadBalancer的reconstructURI方法,进行URI重构,改写成host:port的形式,具体实现在RibbonLoadBalancerClient中

return this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());

}

}

RibbonLoadBalancerClient

RibbonLoadBalancerClient实现了LoadBalancerClient和ServiceInstanceChooser中的execute、reconstructURI、choose方法,完成了请求执行、URI重构和选择服务实例的任务,execute

class RibbonLoadBalancerClient implements LoadBalancerClient {

SpringClientFactory clientFactory;

@Override

public URI reconstructURI(ServiceInstance instance, URI original) {

String serviceId = instance.getServiceId();

RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);

URI uri; Server server;

if (instance instanceof RibbonServer) {

server = ((RibbonServer) instance).getServer();

uri = updateToSecureConnectionIfNeeded(original, ribbonServer);

} else {

server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());

IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);

ServerIntrospector serverIntrospector = serverIntrospector(serviceId);

uri = updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server);

}

// 用server中的host、port等替换原始uri

return context.reconstructURIWithServer(server, uri);

}

public ServiceInstance choose(String serviceId, Object hint) {

// 先调用getLoadBalancer方法,根据serviceId,获取一个ILoadBalancer

// 然后调用getServer(ILoadBalancer loadBalancer, Object hint),使用loadBalancer选择一个Server,hint默认为"default"

Server server = getServer(getLoadBalancer(serviceId), hint);

if (server == null) { return null; }

// 用入参serviceId、选择的Server,构造一个RibbonServer

return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server));

}

public T execute(String serviceId, LoadBalancerRequest request, Object hint) {

// 首先选择一个RibbonServer,该部分流程与choose相同

Server server = getServer(getLoadBalancer(serviceId), hint);

RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server));

// 调用execute的重载实现,执行请求

return execute(serviceId, ribbonServer, request);

}

public T execute(String serviceId, ServiceInstance serviceIntance, LoadBalancerRequest request) {

Server server = null;

if (serviceInstance instanceof RibbonServer) {

server = ((RibbonServer) serviceInstance).getServer();

}

RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);

RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

try { 无锡做人流手术多少钱 http://www.ytsg029.com/

// 调用apply,向服务实例发起请求

T returnVal = request.apply(serviceInstance);

statsRecorder.recordStats(returnVal);

return returnVal;

} catch (IOException ex) {

statsRecorder.recordStats(ex); throw ex;

} catch (Exception ex) {

statsRecorder.recordStats(ex); throw ex;

}

return null;

}

// Ribbon 实现了ServiceInstance接口,即服务实例接口

public static class RibbonServer implements ServiceInstance {

private final String serviceId;

private final Server server;

private final boolean secure;

private Map metadata;

// @Override方法

}

}

ILoadBalancer

在RibbonLoadBalancerClient的choose()和execute()方法中,都是通过调用ILoadBalancer的chooseServer()方法,来选择一个服务实例Server的,该ILoadBalancer接口是由Ribbon定义的。

在ILoadBalancer接口中,定义了软件负载均衡器的操作:一个服务实例的集合、标记一个服务停止、选择服务

package com.netflix.loadbalancer;

interface ILoadBalancer {

void addServers(List newServers); // 初始化、后续添加服务列表

Server chooseServer(Object key); // 从负载均衡器选择一个服务实例

void markServerDown(Server server); // 标记并通知某个服务实例已经停止

List getReachableServers(); // up/reachable状态的服务实例,可以提供正常服务

List getAllServers(); // 所有已知的服务实例,reachable/unreachable都包括

}

其中的Server,代表一个服务端节点,包含了一个服务的基本信息:host、port、scheme、id、zone、元数据等等。

在RibbonLoadBalancerClient的choose()和execute()方法中,通过getLoadBalancer()方法,来根据serviceId获取ILoadBalancer的实例,然后将其包装成RibbonServer。

配置类RibbonClientConfiguration创建ILoadBalancer时如果配置文件里有配置,则使用配置的实现,否则默认使用ZoneAwareLoadBalancer实现。

ClientHttpRequestExecution

RibbonLoadBalancerClient的execute()方法中,调用了入参LoadBalancerRequest的apply方法,execute()方法在LoadBlancerInterceptor的intercept方法中调用,并传入LoadBalancerRequestFactory.createRequest创建的LoadBalancerRequest实现,其实现中最终使用ClientHttpRequestExecution的execute方法执行请求。

总结

使用时,注册一个使用@LoadBalanced注解修饰的RestTemplate,在需要发起请求的地方调用RestTemplate的相应的请求方法,最终调用到其doExecute方法。

@LoadBalanced注解关联了LoadBalancerClient。

配置类LoadBalancerAutoConfiguration注册了如下Bean:

SmartInitializingSingleton:遍历restTemplates、遍历customizers,并customizer.customize(restTemplate)

LoadBalancerRequestFactory:使用LoadBalancerClient构造

LoadBalancerInterceptor:使用LoadBalancerClient和LoadBalancerRequestFactory创建

RestTemplateCustomizer:使用LoadBalancerInterceptor构造一个匿名类,将注册的LoadBalancerInterceptor添加进restTemplate的interceptors列表中

拦截器LoadBalancerInterceptor的intercept方法从原始请求中获取URI,然后使用LoadBalancerClient的execute方法执行请求,接收两个参数:serviceName即host和请求工厂创建的request

请求工厂LoadBalancerRequestFactory的createRequest方法,由原始请求创建一个LoadBalancerRequest的匿名实现

负载均衡请求LoadBalancerRequest接口只有apply方法,其匿名实现创建HttpRequest的实现类ServcieRequestWrapper的实例,然后由ClientHttpRequestExecution的execute方法执行请求,返回响应ClientHttpResponse

ServiceRequestWrapper重写了父类HttpRequestWrapper的getURI方法,返回LoadBalancerClient的reconstructURI方法重构的URI

ClientHttpRequestExecution的实现类是InterceptingClientHttpRequest的内部类IntercpetingRequestExecution,其execute方法遍历interceptors,如果有拦截器,就执行拦截方法,如果没有了,就执行请求。

在4中,LoadBalancerClient的execute方法执行请求,其实现类是RibbonLoadBalancerClient。execute 先通过serviceId获取ILoadBalancer,然后调用ILoadBalancer的chooseServer方法,选择一个Server,并将之转换成RibbonServer,RibbonServer是ServiceInstance的子类,最后调用LoadBalancerRequest的apply方法,执行请求,返回响应。


分享文章:SpringCloud客户端负载均衡——Ribbon
分享URL:http://ybzwz.com/article/gceech.html