Java:使用spring-cloud-gateway获取请求和响应报文体的几种小技巧
使用spring-cloud-gateway获取请求和响应报文体的几种小技巧
·
在使用spring-cloud-gateway开发网关应用时,经常会需要获取请求报文和响应报文,一般常用的方法是分别构建请求装饰器和响应装饰器,在装饰器中获取到报文体,然后生成新的报文体提供给装饰器作为返回值,代码如下:
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class TestFilter1 implements GlobalFilter, Ordered {
private final static Logger LOG = LoggerFactory.getLogger(TestFilter1.class);
public final static String ATTR_CACHE_REQT_BODY_BUF = "_CACHE_REQT_BODY_BUF_";
public final static String ATTR_CACHE_RESP_BODY_BUF = "_CACHE_RESP_BODY_BUF_";
@Override
public int getOrder() {
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Mono<DataBuffer> bodyMono = DataBufferUtils.join(body).map(dataBuffer -> {
// 解析响应报文
byte[] respBodyBuf = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(respBodyBuf);
DataBufferUtils.release(dataBuffer);
// 响应报文放入缓存中
exchange.getAttributes().put(ATTR_CACHE_RESP_BODY_BUF, respBodyBuf);
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer newDataBuffer = dataBufferFactory.wrap(respBodyBuf);
return newDataBuffer;
});
return super.writeWith(bodyMono);
}
};
ServerHttpRequestDecorator requestDecorator = new ServerHttpRequestDecorator(request) {
@Override
public Flux<DataBuffer> getBody() {
return DataBufferUtils.join(request.getBody()).map(dataBuffer -> {
// 解析请求报文
byte[] reqtBodyBuf = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(reqtBodyBuf);
DataBufferUtils.release(dataBuffer);
// 将请求报文放入缓存中
exchange.getAttributes().put(ATTR_CACHE_REQT_BODY_BUF, reqtBodyBuf);
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer newDataBuffer = dataBufferFactory.wrap(reqtBodyBuf);
return newDataBuffer;
}).flux();
}
};
return chain.filter(exchange.mutate().request(requestDecorator).response(responseDecorator).build())
.then(Mono.fromRunnable(() -> {
byte[] reqtBytes = (byte[]) exchange.getAttribute(ATTR_CACHE_REQT_BODY_BUF);
if (reqtBytes != null) {
// TODO 处理请求报文
}
byte[] respBytes = (byte[]) exchange.getAttribute(ATTR_CACHE_RESP_BODY_BUF);
if (respBytes != null) {
// TODO 处理响应报文
}
}));
}
}
以上代码虽然可以获取到请求和响应报文体,但都只能在这个filter的post阶段结束时获取到,对于只想打印报文日志的需求是可以满足的。但实际运用中,请求报文体很可能想在这个filter的pre阶段就获取到并做一些处理,并根据处理结果决定流程是否要继续。
下面这段代码就提供了以上功能。
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class TestFilter2 implements GlobalFilter, Ordered {
private final static Logger LOG = LoggerFactory.getLogger(TestFilter2.class);
public final static String ATTR_CACHE_REQT_BODY_BUF = "_CACHE_REQT_BODY_BUF_";
public final static String ATTR_CACHE_RESP_BODY_BUF = "_CACHE_RESP_BODY_BUF_";
@Override
public int getOrder() {
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
Mono<ServerWebExchange> exchangeMono = DataBufferUtils.join(request.getBody()).map(dataBuffer -> {
// 解析请求报文
byte[] reqtBodyBuf = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(reqtBodyBuf);
DataBufferUtils.release(dataBuffer);
// 将请求报文放入缓存中
exchange.getAttributes().put(ATTR_CACHE_REQT_BODY_BUF, reqtBodyBuf);
ServerHttpRequest requestDecorator = createServerHttpRequest(exchange, reqtBodyBuf);
ServerHttpResponse responseDecorator = createServerHttpResponse(exchange);
ServerWebExchange newExchange = exchange.mutate().request(requestDecorator).response(responseDecorator)
.build();
return newExchange;
}).switchIfEmpty(Mono.just(exchange.mutate().response(createServerHttpResponse(exchange)).build()));
return exchangeMono.flatMap(newExchange -> {
byte[] reqtBytes = (byte[]) newExchange.getAttribute(ATTR_CACHE_REQT_BODY_BUF);
if (reqtBytes != null) {
//TODO 处理请求报文
}
return chain.filter(newExchange).then(Mono.fromRunnable(() -> {
byte[] respBytes = (byte[]) newExchange.getAttribute(ATTR_CACHE_RESP_BODY_BUF);
if (respBytes != null) {
//TODO 处理响应报文
}
}));
});
}
private ServerHttpResponse createServerHttpResponse(ServerWebExchange exchange) {
ServerHttpResponse response = exchange.getResponse();
ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Mono<DataBuffer> bodyMono = DataBufferUtils.join(body).map(dataBuffer -> {
// 解析响应报文
byte[] respBodyBuf = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(respBodyBuf);
DataBufferUtils.release(dataBuffer);
// 响应报文放入缓存中
exchange.getAttributes().put(ATTR_CACHE_RESP_BODY_BUF, respBodyBuf);
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer newDataBuffer = dataBufferFactory.wrap(respBodyBuf);
return newDataBuffer;
});
return super.writeWith(bodyMono);
}
};
return responseDecorator;
}
private ServerHttpRequest createServerHttpRequest(ServerWebExchange exchange, byte[] reqtBodyBuf) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpRequestDecorator requestDecorator = new ServerHttpRequestDecorator(request) {
@Override
public Flux<DataBuffer> getBody() {
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer newDataBuffer = dataBufferFactory.wrap(reqtBodyBuf);
return Flux.just(newDataBuffer);
}
};
return requestDecorator;
}
}
上面的代码获取请求报文体的时机在本filter的pre阶段执行完成之前。
另外,需要说明的是上述代码不会把chunked形式的报文中的分隔符获取到,这些分隔符已经由spring-cloud-gateway底层处理掉了,代码中处理的DataBuffer都是不包含这些分隔符的。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)