错误现象

UT005023: Exception handling request to /actuator/hystrix.stream
github上的issue

解决方案

查看github代码发现在master分支上已经将这个问题修复,但是当前maven中央仓库最新jar包1.5.18中并没有修复。
解决方案一: 自己打一个jar包覆盖掉原来的jar包
我是自己打了包一个1.5.19版本的jar包上传到了公司私有的maven仓库

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
      <groupId>com.netflix.hystrix</groupId>
      <artifactId>hystrix-metrics-event-stream</artifactId>
      <version>1.5.19</version>
</dependency>      

解决方案二:
改用jetty容器,原因是org.eclipse.jetty.server.ResponseWriter所有方法里面加了同步锁,注意这里我没有去实践,还是要基于事实说话更好一些

分析原因

hystrix-metrics-event-stream1.5.18版本的源码HystrixSampleSseServlet.java
问题导致的原因就是:observeOn(Schedulers.io())开辟了子线程对监控数据做打印,主线程while循环里面打印ping, 两个线程中都有对同一个writer做writer.print()和writer.checkError()操作, 不同线程操作导致了输出Buffer异常,master分支上的代码解决方案其实就是添加一把同步锁synchronized (responseWriteLock)。

 sampleSubscription = sampleStream
                        .observeOn(Schedulers.io())
                        .subscribe(new Subscriber<String>() {
                            @Override
                            public void onCompleted() {
                                logger.error("HystrixSampleSseServlet: ({}) received unexpected OnCompleted from sample stream", getClass().getSimpleName());
                                moreDataWillBeSent.set(false);
                            }

                            @Override
                            public void onError(Throwable e) {
                                moreDataWillBeSent.set(false);
                            }

                            @Override
                            public void onNext(String sampleDataAsString) {
                             // 这里是RxJava的子线程不断的打印data
                                if (sampleDataAsString != null) {
                                    writer.print("data: " + sampleDataAsString + "\n\n");
                                    // explicitly check for client disconnect - PrintWriter does not throw exceptions
                                    if (writer.checkError()) {
                                        moreDataWillBeSent.set(false);
                                    }
                                    writer.flush();
                                }
                            }
                        });
                // 这里是主线程做while循环,不断的打印ping
                while (moreDataWillBeSent.get() && !isDestroyed) {
                    try {
                        Thread.sleep(pausePollerThreadDelayInMs);
                        //in case stream has not started emitting yet, catch any clients which connect/disconnect before emits start
                        writer.print("ping: \n\n");
                        // explicitly check for client disconnect - PrintWriter does not throw exceptions
                        if (writer.checkError()) {
                            moreDataWillBeSent.set(false);
                        }
                        writer.flush();
                    } catch (InterruptedException e) {
                        moreDataWillBeSent.set(false);
                    }
                }

为什么用jetty容器的时候不会产生异常

因为org.eclipse.jetty.server.ResponseWriter所有方法本身就加了同步锁
关键代码如下:

package org.eclipse.jetty.server;

public class ResponseWriter extends PrintWriter {
    
    public boolean checkError() {
        synchronized(this.lock) {
            return this._ioException != null || super.checkError();
        }
    }

    public void flush() {
        try {
            synchronized(this.lock) {
                this.isOpen();
                this.out.flush();
            }
        } catch (Throwable var4) {
            this.setError(var4);
        }

    }

    public void write(int c) {
        try {
            synchronized(this.lock) {
                this.isOpen();
                this.out.write(c);
            }
        } catch (InterruptedIOException var5) {
            LOG.debug(var5);
            Thread.currentThread().interrupt();
        } catch (IOException var6) {
            this.setError(var6);
        }

    }

    public void write(char[] buf, int off, int len) {
        try {
            synchronized(this.lock) {
                this.isOpen();
                this.out.write(buf, off, len);
            }
        } catch (InterruptedIOException var7) {
            LOG.debug(var7);
            Thread.currentThread().interrupt();
        } catch (IOException var8) {
            this.setError(var8);
        }

    }

    public void write(String s, int off, int len) {
        try {
            synchronized(this.lock) {
                this.isOpen();
                this.out.write(s, off, len);
            }
        } catch (InterruptedIOException var7) {
            LOG.debug(var7);
            Thread.currentThread().interrupt();
        } catch (IOException var8) {
            this.setError(var8);
        }

    }
}

writer.flush()这一行代码在tomcat和undertow容器中理论上可以删除

因为在writer.checkError()方法里面其实都已经调用了this.flush()方法org.apache.catalina.connector.CoyoteWriter#checkError

package org.apache.catalina.connector;

public class CoyoteWriter extends PrintWriter {
  
    public boolean checkError() {
        this.flush();
        return this.error;
    }
}

io.undertow.servlet.spec.ServletPrintWriterDelegate#checkError

package io.undertow.servlet.spec;

public final class ServletPrintWriterDelegate extends PrintWriter {
   
    public boolean checkError() {
        return this.servletPrintWriter.checkError();
    }
}

public class ServletPrintWriter {
    
    public boolean checkError() {
        this.flush();
        return this.error;
    }
}

hystrix源码编译采坑

1.工程必须是git工程,直接从github上下载的包无法编译,需要git init初始成为git目录,建议直接git clone git@github.com:Netflix/Hystrix.git
2. 我用了gradle4.0
3.nebula.netflixoss’ version 版本改为 ‘4.1.0’
4.改用阿里的maven仓库
下面是我的build.gradle

buildscript {
    repositories {
         maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
    }
    dependencies {
         classpath 'com.netflix.nebula:gradle-extra-configurations-plugin:3.0.3'
    }
}

plugins {
    id 'nebula.netflixoss' version '4.1.0'
    id 'me.champeau.gradle.jmh' version '0.3.1'
    id 'net.saliman.cobertura' version '2.2.8'
}

ext {
    githubProjectName = rootProject.name
    project.version='1.5.19'
}

allprojects {
    repositories {
         maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
    }

    apply plugin: 'net.saliman.cobertura'
}

subprojects {
    apply plugin: 'nebula.netflixoss'
    apply plugin: 'java'
    apply plugin: 'nebula.provided-base'
	apply plugin: 'nebula.compile-api'
	
    sourceCompatibility = 1.8
    targetCompatibility = 1.8



    group = "com.netflix.${githubProjectName}"

    eclipse {
        classpath {
            // include 'provided' dependencies on the classpath
            plusConfigurations += [configurations.provided]
            downloadSources = true
            downloadJavadoc = true
        }
    }

    idea {
        module {
            // include 'provided' dependencies on the classpath
            scopes.COMPILE.plus += [configurations.provided]
        }
    }
}

mvn install和mvn deploy

mvn install:install-file
-Dfile=\data\hystrix\hystrix-contrib\hystrix-metrics-event-stream\build\libs\hystrix-metrics-event-stream-1.5.19.jar
-DgroupId=com.netflix.hystrix
-DartifactId=hystrix-metrics-event-stream
-Dversion=1.5.19
-Dpackaging=jar

mvn deploy:deploy-file
-Dfile=\data\hystrix\hystrix-contrib\hystrix-metrics-event-stream\build\libs\hystrix-metrics-event-stream-1.5.19.jar
-DgroupId=com.netflix.hystrix
-DartifactId=hystrix-metrics-event-stream
-Dversion=1.5.19
-Dpackaging=jar
-DrepositoryId=custom-releases
-Durl=http://host:port/nexus/content/repositories/custom_releases

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐