溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何進(jìn)行SpringCloud Gateway 全鏈路實(shí)現(xiàn)分析

發(fā)布時(shí)間:2021-11-15 16:40:58 來(lái)源:億速云 閱讀:154 作者:柒染 欄目:云計(jì)算

本篇文章為大家展示了如何進(jìn)行SpringCloud Gateway 全鏈路實(shí)現(xiàn)分析,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

1. 背景

隨著微服務(wù)架構(gòu)的流行,服務(wù)按照不同的維度進(jìn)行拆分,一次請(qǐng)求往往需要涉及到多個(gè)服務(wù)。而諸多的服務(wù)可能分布在了幾千臺(tái)服務(wù)器,橫跨多個(gè)不同的數(shù)據(jù)中心。為了快速定位和解決故障,應(yīng)用性能進(jìn)行分析,全鏈路監(jiān)控組件就在這樣的問(wèn)題背景下產(chǎn)生了。最出名的是谷歌公開(kāi)的論文提到的 Google Dapper。想要在這個(gè)上下文中理解分布式系統(tǒng)的行為,就需要監(jiān)控那些橫跨了不同的應(yīng)用、不同的服務(wù)器之間的關(guān)聯(lián)動(dòng)作。

1.1 全鏈路原理

通過(guò)業(yè)務(wù)調(diào)用過(guò)程中添加并傳遞調(diào)用鏈ID,實(shí)現(xiàn)應(yīng)用間生成鏈路數(shù)據(jù),最終串聯(lián)成一條完整的調(diào)用鏈。 其中整個(gè)調(diào)用過(guò)程中每個(gè)請(qǐng)求都要透?jìng)鱐xId、SpanId和pSpanId。

如何進(jìn)行SpringCloud Gateway 全鏈路實(shí)現(xiàn)分析

1.2 Spring Cloud Gateway

作為Spring Cloud官方推出的第二代網(wǎng)關(guān)框架,Spring cloud gateway是基于Spring 5.0、Spring Boot2.0和Reactor等技術(shù)開(kāi)發(fā)的網(wǎng)關(guān),采用了NIO模型進(jìn)行通信。

如何進(jìn)行SpringCloud Gateway 全鏈路實(shí)現(xiàn)分析

1.2.2 Mono與Flux

Mono表示的是包含 0 或者 1 個(gè)元素的異步序列,即要么成功發(fā)布元素,要么錯(cuò)誤

如何進(jìn)行SpringCloud Gateway 全鏈路實(shí)現(xiàn)分析

Flux和Mono之間可以相互轉(zhuǎn)換,比如對(duì)一個(gè) Flux 序列進(jìn)行計(jì)數(shù)操作,得到的結(jié)果是一個(gè) Mono對(duì)象,或者把兩個(gè) Mono 序列合并在一起,得到的是一個(gè) Flux 對(duì)象。

2. Spring Cloud Gateway不做監(jiān)控

Spring Cloud Gateway作為入口網(wǎng)關(guān),主要負(fù)責(zé)服務(wù)的路由轉(zhuǎn)發(fā)。如果網(wǎng)關(guān)沒(méi)有進(jìn)行監(jiān)控,則全鏈路會(huì)缺失網(wǎng)關(guān)節(jié)點(diǎn),直接展示為用戶訪問(wèn)后續(xù)應(yīng)用;不能有效定位用戶請(qǐng)求慢是網(wǎng)關(guān)問(wèn)題還是后續(xù)節(jié)點(diǎn)。

如何進(jìn)行SpringCloud Gateway 全鏈路實(shí)現(xiàn)分析

3.1.1 Spring Cloud Gateway的請(qǐng)求入口

org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#apply 先將接收到的HttpServerRequest或者最終需要返回的HttpServerResponse包裝轉(zhuǎn)換為ReactorServerHttpRequest和ReactorServerHttpResponse,再處理請(qǐng)求。

public Mono<Void> apply(HttpServerRequest request, HttpServerResponse response) {

NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(response.alloc());

ServerHttpRequest adaptedRequest;

ServerHttpResponse adaptedResponse;

try {

adaptedRequest = new ReactorServerHttpRequest(request, bufferFactory);

adaptedResponse = new ReactorServerHttpResponse(response, bufferFactory);

} catch (URISyntaxException ex) {

logger.error("Invalid URL ">

3.1.2 構(gòu)造網(wǎng)關(guān)上下文

org.springframework.web.server.adapter.HttpWebHandlerAdapter#handle

  • createExchange()構(gòu)造網(wǎng)關(guān)上下文ServerWebExchange

  • getDelegate()通過(guò)委托的方式獲取一系列需要處理的WebHandler

public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {

ServerWebExchange exchange = createExchange(request, response);

return getDelegate().handle(exchange)

.onErrorResume(ex -> handleFailure(request, response, ex))

.then(Mono.defer(response::setComplete));

}

 

protected ServerWebExchange createExchange(ServerHttpRequest request, ServerHttpResponse response) {

return new DefaultServerWebExchange(request, response, this.sessionManager,

getCodecConfigurer(), getLocaleContextResolver(), this.applicationContext);

}

3.1.3 進(jìn)入Filter鏈

org.springframework.cloud.gateway.handler.FilteringWebHandler#handle 獲得 GatewayFilter 數(shù)組,并根據(jù)獲得的 GatewayFilter 數(shù)組創(chuàng)建DefaultGatewayFilterChain,過(guò)濾處理請(qǐng)求。

public Mono<Void> handle(ServerWebExchange exchange) {

Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);

List<GatewayFilter> gatewayFilters = route.getFilters();

List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);

combined.addAll(gatewayFilters);

 

AnnotationAwareOrderComparator.sort(combined);

logger.debug("Sorted gatewayFilterFactories: "+ combined);

return new DefaultGatewayFilterChain(combined).filter(exchange);

}

3.1.4 執(zhí)行Filter鏈

org.springframework.cloud.gateway.handler.FilteringWebHandler$DefaultGatewayFilterChain#filter 過(guò)濾器的鏈?zhǔn)秸{(diào)用

public Mono<Void> filter(ServerWebExchange exchange) {

return Mono.defer(() -> {

if (this.index < filters.size()) {

GatewayFilter filter = filters.get(this.index);

DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);

return filter.filter(exchange, chain);

} else {

return Mono.empty(); // complete

}

});

}

3.1.5 Gateway Filter適配器

org.springframework.cloud.gateway.handler.FilteringWebHandler$GatewayFilterAdapter#filter GatewayFilterAdapter是GlobalFilter過(guò)濾器的包裝類,最終委托Global Filter進(jìn)行執(zhí)行。

private static class GatewayFilterAdapter implements GatewayFilter {
	private final GlobalFilter delegate;
	public GatewayFilterAdapter(GlobalFilter delegate) {
		this.delegate = delegate;
	}

	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		return this.delegate.filter(exchange, chain);
	}
}

3.1.6 Netty路由網(wǎng)關(guān)過(guò)濾器

org.springframework.cloud.gateway.filter.NettyRoutingFilter#filter

如何進(jìn)行SpringCloud Gateway 全鏈路實(shí)現(xiàn)分析

GlobalFilter實(shí)現(xiàn)有很多,此處只分析NettyRoutingFIlter和NettyWriteResponseFilter。而NettyRoutingFilter負(fù)責(zé)使用 Netty HttpClient 代理對(duì)下游的請(qǐng)求。

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

// 獲得 requestUrl

URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

 

// 判斷是否能夠處理,http或https前綴

String scheme = requestUrl.getScheme();

if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {

return chain.filter(exchange);

}

// 設(shè)置已經(jīng)路由

setAlreadyRouted(exchange);

 

ServerHttpRequest request = exchange.getRequest();

// 創(chuàng)建Netty Request Method對(duì)象

final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString());

final String url = requestUrl.toString();

HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(), exchange);

 

final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();

filtered.forEach(httpHeaders::set);

 

String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);

boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);

boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);

// 請(qǐng)求后端服務(wù)

return this.httpClient.request(method, url, req -> {

final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach)

.headers(httpHeaders)

.chunkedTransfer(chunkedTransfer)

.failOnServerError(false)

.failOnClientError(false);

if (preserveHost) {

String host = request.getHeaders().getFirst(HttpHeaders.HOST);

proxyRequest.header(HttpHeaders.HOST, host);

}

return proxyRequest.sendHeaders() //發(fā)送請(qǐng)求頭

.send(request.getBody().map(dataBuffer -> // 發(fā)送請(qǐng)求Body

((NettyDataBuffer)dataBuffer).getNativeBuffer()));

}).doOnNext(res -> {

ServerHttpResponse response = exchange.getResponse();

// put headers and status so filters can modify the response

HttpHeaders headers = new HttpHeaders();

res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

exchange.getAttributes().put("original_response_content_type", headers.getContentType());

HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(

this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE);

response.getHeaders().putAll(filteredResponseHeaders);

 

HttpStatus status = HttpStatus.resolve(res.status().code());

if (status != null) {

response.setStatusCode(status);

} else if (response instanceof AbstractServerHttpResponse) {

// https://jira.spring.io/browse/SPR-16748

((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code());

} else {

throw new IllegalStateException("Unable to set status code on response: ">

3.1.7 Netty 回寫(xiě)響應(yīng)網(wǎng)關(guān)過(guò)濾器

org.springframework.cloud.gateway.filter.NettyWriteResponseFilter#filter NettyWriteResponseFilter 與NettyRoutingFilter成對(duì)出現(xiàn),負(fù)責(zé)將代理響應(yīng)寫(xiě)回網(wǎng)關(guān)客戶端響應(yīng)。

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

// then方法實(shí)現(xiàn)After Filter邏輯

return chain.filter(exchange).then(Mono.defer(() -> {

// 獲得Netty Response

HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);

if (clientResponse == null) {

return Mono.empty();

}

log.trace("NettyWriteResponseFilter start");

ServerHttpResponse response = exchange.getResponse();

// 將Netty Response回寫(xiě)給客戶端

NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();

//TODO: what if it's not netty

final Flux<NettyDataBuffer> body = clientResponse.receive()

.retain() // ByteBufFlux => ByteBufFlux

.map(factory::wrap); // ByteBufFlux => Flux<NettyDataBuffer> 

MediaType contentType = response.getHeaders().getContentType();

return (isStreamingMediaType(contentType) ?

response.writeAndFlushWith(body.map(Flux::just)) : 

response.writeWith(body));

}));

}

4. Spring Cloud Gateway進(jìn)行監(jiān)控

當(dāng)最終將網(wǎng)關(guān)也進(jìn)行監(jiān)控,可以全局的看到應(yīng)用請(qǐng)求流轉(zhuǎn),網(wǎng)關(guān)的請(qǐng)求業(yè)務(wù)分流,各應(yīng)用的調(diào)用負(fù)載情況。

如何進(jìn)行SpringCloud Gateway 全鏈路實(shí)現(xiàn)分析

上述內(nèi)容就是如何進(jìn)行SpringCloud Gateway 全鏈路實(shí)現(xiàn)分析,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI