在使用spring-cloud-gateway开发网关应用时,经常会需要获取请求报文和响应报文,一般常用的方法是分别构建请求装饰器和响应装饰器,在装饰器中获取到报文体,然后生成新的报文体提供给装饰器作为返回值,代码如下:


import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.web.server.ServerWebExchange;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class TestFilter1 implements GlobalFilter, Ordered {

	private final static Logger LOG = LoggerFactory.getLogger(TestFilter1.class);

	public final static String ATTR_CACHE_REQT_BODY_BUF = "_CACHE_REQT_BODY_BUF_";

	public final static String ATTR_CACHE_RESP_BODY_BUF = "_CACHE_RESP_BODY_BUF_";

	@Override
	public int getOrder() {
		return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
	}

	@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

		ServerHttpRequest request = exchange.getRequest();
		ServerHttpResponse response = exchange.getResponse();

		ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(response) {

			@Override
			public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {

				Mono<DataBuffer> bodyMono = DataBufferUtils.join(body).map(dataBuffer -> {

					// 解析响应报文
					byte[] respBodyBuf = new byte[dataBuffer.readableByteCount()];
					dataBuffer.read(respBodyBuf);
					DataBufferUtils.release(dataBuffer);

					// 响应报文放入缓存中
					exchange.getAttributes().put(ATTR_CACHE_RESP_BODY_BUF, respBodyBuf);

					DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
					DataBuffer newDataBuffer = dataBufferFactory.wrap(respBodyBuf);

					return newDataBuffer;
				});

				return super.writeWith(bodyMono);
			}
		};

		ServerHttpRequestDecorator requestDecorator = new ServerHttpRequestDecorator(request) {

			@Override
			public Flux<DataBuffer> getBody() {

				return DataBufferUtils.join(request.getBody()).map(dataBuffer -> {

					// 解析请求报文
					byte[] reqtBodyBuf = new byte[dataBuffer.readableByteCount()];
					dataBuffer.read(reqtBodyBuf);
					DataBufferUtils.release(dataBuffer);

					// 将请求报文放入缓存中
					exchange.getAttributes().put(ATTR_CACHE_REQT_BODY_BUF, reqtBodyBuf);

					DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
					DataBuffer newDataBuffer = dataBufferFactory.wrap(reqtBodyBuf);

					return newDataBuffer;
				}).flux();

			}
		};

		return chain.filter(exchange.mutate().request(requestDecorator).response(responseDecorator).build())
				.then(Mono.fromRunnable(() -> {

					byte[] reqtBytes = (byte[]) exchange.getAttribute(ATTR_CACHE_REQT_BODY_BUF);
					if (reqtBytes != null) {
						// TODO 处理请求报文
					}

					byte[] respBytes = (byte[]) exchange.getAttribute(ATTR_CACHE_RESP_BODY_BUF);
					if (respBytes != null) {
						// TODO 处理响应报文
					}
				}));

	}

}

        以上代码虽然可以获取到请求和响应报文体,但都只能在这个filter的post阶段结束时获取到,对于只想打印报文日志的需求是可以满足的。但实际运用中,请求报文体很可能想在这个filter的pre阶段就获取到并做一些处理,并根据处理结果决定流程是否要继续。

       下面这段代码就提供了以上功能。


import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.web.server.ServerWebExchange;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class TestFilter2 implements GlobalFilter, Ordered {

	private final static Logger LOG = LoggerFactory.getLogger(TestFilter2.class);

	public final static String ATTR_CACHE_REQT_BODY_BUF = "_CACHE_REQT_BODY_BUF_";

	public final static String ATTR_CACHE_RESP_BODY_BUF = "_CACHE_RESP_BODY_BUF_";

	@Override
	public int getOrder() {
		return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
	}

	@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

		ServerHttpRequest request = exchange.getRequest();
		ServerHttpResponse response = exchange.getResponse();

		Mono<ServerWebExchange> exchangeMono = DataBufferUtils.join(request.getBody()).map(dataBuffer -> {

			// 解析请求报文
			byte[] reqtBodyBuf = new byte[dataBuffer.readableByteCount()];
			dataBuffer.read(reqtBodyBuf);
			DataBufferUtils.release(dataBuffer);

			// 将请求报文放入缓存中
			exchange.getAttributes().put(ATTR_CACHE_REQT_BODY_BUF, reqtBodyBuf);

			ServerHttpRequest requestDecorator = createServerHttpRequest(exchange, reqtBodyBuf);
			ServerHttpResponse responseDecorator = createServerHttpResponse(exchange);

			ServerWebExchange newExchange = exchange.mutate().request(requestDecorator).response(responseDecorator)
					.build();

			return newExchange;

		}).switchIfEmpty(Mono.just(exchange.mutate().response(createServerHttpResponse(exchange)).build()));

		return exchangeMono.flatMap(newExchange -> {
			byte[] reqtBytes = (byte[]) newExchange.getAttribute(ATTR_CACHE_REQT_BODY_BUF);
			if (reqtBytes != null) {
				//TODO 处理请求报文
			}

			return chain.filter(newExchange).then(Mono.fromRunnable(() -> {

				byte[] respBytes = (byte[]) newExchange.getAttribute(ATTR_CACHE_RESP_BODY_BUF);
				if (respBytes != null) {
					//TODO 处理响应报文
				}
			}));
		});

	}

	private ServerHttpResponse createServerHttpResponse(ServerWebExchange exchange) {

		ServerHttpResponse response = exchange.getResponse();

		ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(response) {

			@Override
			public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {

				Mono<DataBuffer> bodyMono = DataBufferUtils.join(body).map(dataBuffer -> {

					// 解析响应报文
					byte[] respBodyBuf = new byte[dataBuffer.readableByteCount()];
					dataBuffer.read(respBodyBuf);
					DataBufferUtils.release(dataBuffer);

					// 响应报文放入缓存中
					exchange.getAttributes().put(ATTR_CACHE_RESP_BODY_BUF, respBodyBuf);

					DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
					DataBuffer newDataBuffer = dataBufferFactory.wrap(respBodyBuf);

					return newDataBuffer;
				});

				return super.writeWith(bodyMono);
			}
		};

		return responseDecorator;
	}

	private ServerHttpRequest createServerHttpRequest(ServerWebExchange exchange, byte[] reqtBodyBuf) {

		ServerHttpRequest request = exchange.getRequest();

		ServerHttpRequestDecorator requestDecorator = new ServerHttpRequestDecorator(request) {

			@Override
			public Flux<DataBuffer> getBody() {

				DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
				DataBuffer newDataBuffer = dataBufferFactory.wrap(reqtBodyBuf);

				return Flux.just(newDataBuffer);
			}
		};

		return requestDecorator;
	}

}

        上面的代码获取请求报文体的时机在本filter的pre阶段执行完成之前。

       另外,需要说明的是上述代码不会把chunked形式的报文中的分隔符获取到,这些分隔符已经由spring-cloud-gateway底层处理掉了,代码中处理的DataBuffer都是不包含这些分隔符的。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐