本文主要研究一下skywalking的spring-cloud-gateway-plugin

63f429f614cad8a17730c881a088c0b3.png

NettyRoutingFilterInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/NettyRoutingFilterInstrumentation.java

public class NettyRoutingFilterInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {​    @Override    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {        return new ConstructorInterceptPoint[0];    }​    @Override    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {        return new InstanceMethodsInterceptPoint[]{            new InstanceMethodsInterceptPoint() {                @Override                public ElementMatcher getMethodsMatcher() {                    return named("filter").and(takesArgumentWithType(0, "org.springframework.web.server.ServerWebExchange"));                }​                @Override                public String getMethodsInterceptor() {                    return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.NettyRoutingFilterInterceptor";                }​                @Override                public boolean isOverrideArgs() {                    return false;                }            }        };    }​    @Override    public ClassMatch enhanceClass() {        return byName("org.springframework.cloud.gateway.filter.NettyRoutingFilter");    }​    @Override    protected final String[] witnessClasses() {        return new String[]{"org.springframework.cloud.gateway.handler.FilteringWebHandler", "reactor.netty.http.client.HttpClientOperations"};    }}
  • NettyRoutingFilterInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.NettyRoutingFilterInterceptor拦截org.springframework.cloud.gateway.filter.NettyRoutingFilter带有org.springframework.web.server.ServerWebExchange参数的filter方法

NettyRoutingFilterInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/NettyRoutingFilterInterceptor.java

public class NettyRoutingFilterInterceptor implements InstanceMethodsAroundInterceptor {​​    @Override    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,                             MethodInterceptResult result) throws Throwable {        EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);        if (instance != null) {            SWTransmitter swTransmitter = (SWTransmitter) instance.getSkyWalkingDynamicField();            ContextManager.getRuntimeContext().put(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER, swTransmitter);        }    }​    @Override    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,                              Class>[] argumentsTypes, Object ret) throws Throwable {        if (ContextManager.getRuntimeContext().get(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER) != null) {            ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);        }        return ret;    }​​    @Override    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,                                      Class>[] argumentsTypes, Throwable t) {    }​    public static EnhancedInstance getInstance(Object o) {        EnhancedInstance instance = null;        if (o instanceof ServerWebExchangeDecorator) {            instance = getEnhancedInstance((ServerWebExchangeDecorator) o);        } else if (o instanceof DefaultServerWebExchange) {            instance = (EnhancedInstance) o;        }        return instance;    }​​    private static EnhancedInstance getEnhancedInstance(ServerWebExchangeDecorator serverWebExchangeDecorator) {        Object o = serverWebExchangeDecorator.getDelegate();        if (o instanceof ServerWebExchangeDecorator) {            return getEnhancedInstance((ServerWebExchangeDecorator) o);        } else if (o instanceof DefaultServerWebExchange) {            return (EnhancedInstance) o;        } else if (o == null) {            throw new NullPointerException("The expected class DefaultServerWebExchange is null");        } else {            throw new RuntimeException("Unknown parameter types:" + o.getClass());        }    }}
  • NettyRoutingFilterInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法获取swTransmitter,然后以名为Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER的key放入到ContextManager.getRuntimeContext();其afterMethod方法执行ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER)

HttpClientOperationsInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/HttpClientOperationsInstrumentation.java

public class HttpClientOperationsInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {​    @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {        return new ConstructorInterceptPoint[0];    }​    @Override    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {        return new InstanceMethodsInterceptPoint[]{            new InstanceMethodsInterceptPoint() {                @Override                public ElementMatcher getMethodsMatcher() {                    return named("headers").and(takesArgumentWithType(0, "io.netty.handler.codec.http.HttpHeaders"));                }​                @Override                public String getMethodsInterceptor() {                    return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsHeadersInterceptor";                }​                @Override                public boolean isOverrideArgs() {                    return false;                }            },new InstanceMethodsInterceptPoint() {                @Override                public ElementMatcher getMethodsMatcher() {                    return named("send").and(takesArgumentWithType(0, "org.reactivestreams.Publisher"));                }​                @Override                public String getMethodsInterceptor() {                    return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsSendInterceptor";                }​                @Override                public boolean isOverrideArgs() {                    return false;                }            },            new InstanceMethodsInterceptPoint() {                @Override                public ElementMatcher getMethodsMatcher() {                    return named("status");                }​                @Override                public String getMethodsInterceptor() {                    return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsStatusInterceptor";                }​                @Override                public boolean isOverrideArgs() {                    return false;                }            },        };    }​    @Override    public ClassMatch enhanceClass() {        return byName("reactor.netty.http.client.HttpClientOperations");    }}
  • HttpClientOperationsInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine
  • 它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsHeadersInterceptor拦截reactor.netty.http.client.HttpClientOperations的带有io.netty.handler.codec.http.HttpHeaders参数的headers方法
  • 它还使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsSendInterceptor拦截reactor.netty.http.client.HttpClientOperations的带有org.reactivestreams.Publisher参数的send方法
  • 它还使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsStatusInterceptor拦截reactor.netty.http.client.HttpClientOperations的status方法

HttpClientOperationsHeadersInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsHeadersInterceptor.java

public class HttpClientOperationsHeadersInterceptor implements InstanceMethodsAroundInterceptor {​    private static final ILog logger = LogManager.getLogger(HttpClientOperationsHeadersInterceptor.class);​    @Override    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,                             MethodInterceptResult result) throws Throwable {    }​    @Override    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,                              Class>[] argumentsTypes, Object ret) throws Throwable {        Object transmitter = ((EnhancedInstance) allArguments[0]).getSkyWalkingDynamicField();        if (transmitter != null) {            objInst.setSkyWalkingDynamicField(transmitter);            ((EnhancedInstance) allArguments[0]).setSkyWalkingDynamicField(null);        }        return ret;    }​​    @Override    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,                                      Class>[] argumentsTypes, Throwable t) {    }}
  • HttpClientOperationsHeadersInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法执行objInst.setSkyWalkingDynamicField(transmitter)

HttpClientOperationsSendInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsSendInterceptor.java

public class HttpClientOperationsSendInterceptor implements InstanceMethodsAroundInterceptor {​    @Override    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,                             MethodInterceptResult result) throws Throwable {        SWTransmitter transmitter = (SWTransmitter) objInst.getSkyWalkingDynamicField();        if (transmitter != null) {            HttpClientRequest request = (HttpClientRequest) objInst;​            HttpHeaders header = request.requestHeaders();            ChannelOperations channelOpt = (ChannelOperations) objInst;            InetSocketAddress remote = (InetSocketAddress) (channelOpt.channel().remoteAddress());            String peer = remote.getHostName() + ":" + remote.getPort();​            AbstractSpan span = ContextManager.createExitSpan(transmitter.getOperationName(), peer);            ContextManager.continued(transmitter.getSnapshot());            ContextCarrier contextCarrier = new ContextCarrier();            ContextManager.inject(contextCarrier);​            span.setComponent(ComponentsDefine.SPRING_CLOUD_GATEWAY);            Tags.URL.set(span, peer + request.uri());            Tags.HTTP.METHOD.set(span, request.method().name());            SpanLayer.asHttp(span);​            CarrierItem next = contextCarrier.items();            while (next.hasNext()) {                next = next.next();                header.set(next.getHeadKey(), next.getHeadValue());            }            transmitter.setSpanGateway(span.prepareForAsync());            ContextManager.stopSpan(span);        }    }​    @Override    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,                              Class>[] argumentsTypes, Object ret) throws Throwable {        return ret;    }​​    @Override    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,                                      Class>[] argumentsTypes, Throwable t) {        ContextManager.activeSpan().errorOccurred().log(t);    }}
  • HttpClientOperationsSendInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法获取request header、uri、method等信息设置到span中,最后执行ContextManager.stopSpan(span);其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

HttpClientOperationsStatusInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsStatusInterceptor.java

public class HttpClientOperationsStatusInterceptor implements InstanceMethodsAroundInterceptor {​    @Override    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,                             MethodInterceptResult result) throws Throwable {    }​    @Override    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,                              Object ret) throws Throwable {​        SWTransmitter transmitter = (SWTransmitter) objInst.getSkyWalkingDynamicField();        if (transmitter != null) {            HttpResponseStatus response = (HttpResponseStatus) ret;            if (response.code() >= 400) {                Tags.STATUS_CODE.set(transmitter.getSpanGateway().errorOccurred(), String.valueOf(response.code()));            }            transmitter.getSpanGateway().asyncFinish();            objInst.setSkyWalkingDynamicField(null);        }        return ret;    }​    @Override    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,                                      Class>[] argumentsTypes, Throwable t) {        ContextManager.activeSpan().errorOccurred().log(t);    }}
  • HttpClientOperationsStatusInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法获取transmitter,在response.code()大于等于400时设置statusCode的tag,然后执行transmitter.getSpanGateway().asyncFinish();其handleMethodException执行ContextManager.activeSpan().errorOccurred().log(t)

FilteringWebHandlerInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/FilteringWebHandlerInstrumentation.java

public class FilteringWebHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {​    @Override    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {        return new ConstructorInterceptPoint[0];    }​    @Override    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {        return new InstanceMethodsInterceptPoint[]{            new InstanceMethodsInterceptPoint() {                @Override                public ElementMatcher getMethodsMatcher() {                    return named("handle");                }​                @Override                public String getMethodsInterceptor() {                    return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.FilteringWebHandlerInterceptor";                }​                @Override                public boolean isOverrideArgs() {                    return false;                }            }        };    }​    @Override    public ClassMatch enhanceClass() {        return byName("org.springframework.cloud.gateway.handler.FilteringWebHandler");    }​}
  • FilteringWebHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.FilteringWebHandlerInterceptor拦截org.springframework.cloud.gateway.handler.FilteringWebHandler的handle方法

FilteringWebHandlerInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/FilteringWebHandlerInterceptor.java

public class FilteringWebHandlerInterceptor implements InstanceMethodsAroundInterceptor {​    private static final String SPRING_CLOUD_GATEWAY_ROUTE_PREFIX = "GATEWAY/";​    @Override    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,                             MethodInterceptResult result) throws Throwable {        EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);        if (instance == null) {            return;        }        ContextSnapshot contextSnapshot = (ContextSnapshot) instance.getSkyWalkingDynamicField();        if (contextSnapshot == null) {            return;        }​        ServerWebExchange exchange = (ServerWebExchange) allArguments[0];        String operationName = SPRING_CLOUD_GATEWAY_ROUTE_PREFIX;        Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);        operationName = operationName + route.getId();        SWTransmitter transmitter = new SWTransmitter(contextSnapshot, operationName);        instance.setSkyWalkingDynamicField(transmitter);    }​    @Override    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,                              Class>[] argumentsTypes, Object ret) throws Throwable {        EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);        if (instance == null) {            return ret;        }        SWTransmitter swTransmitter = (SWTransmitter) instance.getSkyWalkingDynamicField();        if (swTransmitter == null) {            return ret;        }        Mono mono = (Mono) ret;        return mono.doFinally(d -> {            ServerWebExchange exchange = (ServerWebExchange) allArguments[0];            HttpStatus statusCode = exchange.getResponse().getStatusCode();            if (statusCode == HttpStatus.TOO_MANY_REQUESTS) {                AbstractSpan localSpan = ContextManager.createLocalSpan(swTransmitter.getOperationName());                Tags.STATUS_CODE.set(localSpan,statusCode.toString());                SpanLayer.asHttp(localSpan);                localSpan.setComponent(ComponentsDefine.SPRING_CLOUD_GATEWAY);                ContextManager.continued(swTransmitter.getSnapshot());                ContextManager.stopSpan(localSpan);            }        });    }​​    @Override    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,                                      Class>[] argumentsTypes, Throwable t) {    }​}
  • FilteringWebHandlerInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法从exchange中获取route信息最后形成operationName创建SWTransmitter并执行instance.setSkyWalkingDynamicField(transmitter);其afterMethod方法获取SWTransmitter,然后注册mono的doFinally回调,在里头获取statusCode更细span,然后执行ContextManager.continued(swTransmitter.getSnapshot())及ContextManager.stopSpan(localSpan)

DefaultHttpHeadersInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/DefaultHttpHeadersInstrumentation.java

public class DefaultHttpHeadersInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {​    @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {        return new ConstructorInterceptPoint[] {            new ConstructorInterceptPoint() {                @Override public ElementMatcher getConstructorMatcher() {                    return takesArgumentWithType(0, "io.netty.handler.codec.DefaultHeaders");                }​                @Override public String getConstructorInterceptor() {                    return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.DefaultHttpHeadersInterceptor";                }            }        };    }​    @Override    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {        return new InstanceMethodsInterceptPoint[0];    }​    @Override    public ClassMatch enhanceClass() {        return byName("io.netty.handler.codec.http.DefaultHttpHeaders");    }}
  • DefaultHttpHeadersInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.DefaultHttpHeadersInterceptor拦截io.netty.handler.codec.http.DefaultHttpHeaders的第一个参数为io.netty.handler.codec.DefaultHeaders的方法

DefaultHttpHeadersInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/DefaultHttpHeadersInterceptor.java

public class DefaultHttpHeadersInterceptor implements InstanceConstructorInterceptor {    @Override    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {        Object transmitter = ContextManager.getRuntimeContext().get(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);        if (transmitter != null) {            objInst.setSkyWalkingDynamicField(transmitter);            ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);        }    }}
  • DefaultHttpHeadersInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法主要是将ContextManager.getRuntimeContext()中的transmitter设置到objInst中,然后从ContextManager.getRuntimeContext()移除该transmitter

小结

skywalking的spring-cloud-gateway-plugin主要有四个instrument,分别是NettyRoutingFilterInstrumentation、HttpClientOperationsInstrumentation、FilteringWebHandlerInstrumentation、DefaultHttpHeadersInstrumentation

doc

  • NettyRoutingFilterInstrumentation
  • HttpClientOperationsInstrumentation
  • FilteringWebHandlerInstrumentation
  • DefaultHttpHeadersInstrumentation
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐