Hystrix源码分析

前言 在微服务架构中,我们通常把一个系统分拆成若干服务单元,各单元之间通过远程调用的方式实现相互依赖,但是如果因为网络原因或是服务自身出现问题使得调用方对外服务出现延迟,如果在高并发的情况下可能会因为任务积压导致服务崩溃。

Spring Cloud Hystrix 实现了断路器、线程隔离等一系列服务保护功能,具备服务降级、服务熔断、线程和信号隔离、请求缓存等强大功能。

工作流程 当我们使用 Hystrix 来包装你请求依赖服务时大体有如下的流程,

这里写图片描述

1,构建 HystrixCommand 或者 HystrixObservableCommand 对象; 2,执行命令; 3,结果是否被缓存; 4,请求线路(类似电路)是否是开路; 5,线程池/请求队列/信号量占满时会发生什么; 6,使用 HystrixObservableCommand.construct() 还是 HystrixCommand.run(); 7,计算链路健康度; 8,失败回退逻辑; 9,返回正常回应。 构建 HystrixCommand 或者 HystrixObservableCommand 对象 我们可以使用如下代码创建上述实例对象,

//用在依赖的服务返回单个操作结果的时候
HystrixCommand command = new HystrixCommand(arg1, arg2);

//用在依赖的服务返回多个操作结果的时候
HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);

执行命令 Hystrix 命令提供四种方式(HystrixCommand 支持所有四种方式,而 HystrixObservableCommand 仅支持后两种方式)来执行所包装的请求:

execute() —— 阻塞,当依赖服务响应(或者抛出异常/超时)时,返回结果 queue() —— 返回 Future 对象,通过该对象异步得到返回结果 observe() —— 返回 Observable 对象,立即发出请求,在依赖服务响应(或者抛出异常/超时)时,通过注册的 Subscriber 得到返回结果 toObservable() —— 返回 Observable 对象,但只有在订阅该对象时,才会发出请求,然后在依赖服务响应(或者抛出异常/超时)时,通过注册的 Subscriber 得到返回结果 在内部实现中,execute() 是同步调用,内部会调用 queue().get() 方法。queue() 内部会调用 toObservable().toBlocking().toFuture()。也就是说,HystrixCommand 内部均通过一个 Observable 的实现来执行请求,即使这些命令本来是用来执行同步返回回应这样的简单逻辑。

从上图中也可以看出,所有请求的调用,最后都会执行 HystrixObservableCommand# toObservable() 方法,这就是本文的切入点。先来看一下源码:

public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;

    //doOnCompleted handler already did all of the SUCCESS work
    //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
    final Action0 terminateCommandCleanup = new Action0() {

        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                handleCommandEnd(true); //user code did run
            }
        }
    };

    //mark the command as CANCELLED and store the latency (in addition to standard cleanup)
    final Action0 unsubscribeCommandCleanup = new Action0() {
        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
                if (!_cmd.executionResult.containsTerminalEvent()) {
                    _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                    _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                            .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                }
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
                if (!_cmd.executionResult.containsTerminalEvent()) {
                    _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                    _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                            .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                }
                handleCommandEnd(true); //user code did run
            }
        }
    };

    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            return applyHystrixSemantics(_cmd);
        }
    };

    final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
        @Override
        public R call(R r) {
            R afterFirstApplication = r;

            try {
                afterFirstApplication = executionHook.onComplete(_cmd, r);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
            }

            try {
                return executionHook.onEmit(_cmd, afterFirstApplication);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
                return afterFirstApplication;
            }
        }
    };

    final Action0 fireOnCompletedHook = new Action0() {
        @Override
        public void call() {
            try {
                executionHook.onSuccess(_cmd);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
            }
        }
    };

    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
             /* this is a stateful object so can only be used once */
            if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                //TODO make a new error type for this
                throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
            }

            commandStartTimestamp = System.currentTimeMillis();

            if (properties.requestLogEnabled().get()) {
                // log this command execution regardless of what happened
                if (currentRequestLog != null) {
                    currentRequestLog.addExecutedCommand(_cmd);
                }
            }

            final boolean requestCacheEnabled = isRequestCachingEnabled();
            final String cacheKey = getCacheKey();

            /* try from cache first */
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }

            Observable<R> hystrixObservable =
                    Observable.defer(applyHystrixSemantics)
                            .map(wrapWithAllOnNextHooks);

            Observable<R> afterCache;

            // put in cache
            if (requestCacheEnabled && cacheKey != null) {
                // wrap it for caching
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();
                }
            } else {
                afterCache = hystrixObservable;
            }

            return afterCache
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                    .doOnCompleted(fireOnCompletedHook);
        }
    });
}

满眼的内部类,如果不知道 RxJava 的话可能会完全不知所云,这就是所谓的响应式编程,跟传统的命令式编程差异还是挺大的。其实,如果仔细观察,最核心的逻辑便是 Observable#defer() 方法,当存在subscriber时,便会调用Func0#call() 方法。下面就来仔细分析一下。

结果是否被缓存 final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey();

if (requestCacheEnabled) {
    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
    if (fromCache != null) {
        isResponseFromCache = true;
        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
    }
 }

如果当前命令的请求缓存功能启用并且命中该缓存,那缓存的结果立即以 Observable 对象形式返回。

请求线路(类似电路)是否是开路 如果缓存功能未打开,或者缓存未命中,则进入了真正的执行流程了。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    // mark that we're starting execution on the ExecutionHook
    // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
    executionHook.onStart(_cmd);

    /* determine if we're allowed to execute */
    if (circuitBreaker.allowRequest()) {
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        final Action0 singleSemaphoreRelease = new Action0() {
            @Override
            public void call() {
                if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                    executionSemaphore.release();
                }
            }
        };

        final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
            @Override
            public void call(Throwable t) {
                eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
            }
        };

        if (executionSemaphore.tryAcquire()) {
            try {
                /* used to track userThreadExecutionTime */
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        return handleShortCircuitViaFallback();
    }
}

如果 circuitBreaker 是打开的,即不接收请求,那么就会转接到fallback处理逻辑;如果是关闭的,则进入后续环节。

线程池/请求队列/信号量占满时会发生什么 if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { //当semaphore不足时,调用fallback处理,此处的semaphore 视不同的隔离策略而定 return handleSemaphoreRejectionViaFallback(); }

如果和当前需要执行的命令相关联的线程池和请求队列(或者信号量,如果不使用线程池),Hystrix 将不会执行这个命令,而是直接使用FallBack 失败回退逻辑。

使用 HystrixObservableCommand.construct() 还是 HystrixCommand.run() private Observable executeCommandAndObserve(final AbstractCommand _cmd) { ……

    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    ……
}

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    Observable<R> userObservable;

    try {
        userObservable = getExecutionObservable();
    } catch (Throwable ex) {
        // the run() method is a user provided implementation so can throw instead of using Observable.onError
        // so we catch it here and turn it into Observable.error
        userObservable = Observable.error(ex);
    }

    return userObservable
            .lift(new ExecutionHookApplication(_cmd))
            .lift(new DeprecatedOnRunHookApplication(_cmd));
}

真正对请求的处理逻辑委托给了 AbstractCommand#getExecutionObservable() 方法,该方法是一个抽象方法,由子类实现具体逻辑,因此分别看一下 HystrixCommand 和 HystrixObservableCommand 中的具体实现。

//HystrixCommand 中的实现
final protected Observable<R> getExecutionObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                //调用 自身的 run 方法,并将结果通过just方法立即发射出去。
                return Observable.just(run());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    }).doOnSubscribe(new Action0() {
        @Override
        public void call() {
            // Save thread on which we get subscribed so that we can interrupt it later if needed
            executionThread.set(Thread.currentThread());
        }
    });
}

​ ////HystrixObservableCommand 中的实现 ​ final protected Observable getExecutionObservable() { ​ //依赖于自身 construct 方法实现 ​ return construct(); ​ }

计算链路健康度 Hystrix 会将请求成功,失败,被拒绝或超时信息报告给熔断器,熔断器维护一些用于统计数据用的计数器。

这些计数器产生的统计数据使得熔断器在特定的时刻,能短路某个依赖服务的后续请求,直到恢复期结束,若恢复期结束根据统计数据熔断器判定线路仍然未恢复健康,熔断器会再次关闭线路。

失败回退逻辑 当使用 HystrixCommand 时,通过实现 HystrixCommand.getFallback() 返回失败回退时的回应。

当使用 HystrixObservableCommand 时,通过实现 HystrixObservableCommand.resumeWithFallback() 返回 Observable 对象来通知 observers 失败回退时的回应。

返回正常回应 若命令成功被执行,Hystrix 将回应返回给调用方,或者通过 Observable 的形式返回。

总结

本文从源码角度大致梳理了 Hystrix 服务容错机制的内部实现,希望能对读者有所帮助。

版权声明:本文为CSDN博主「懋为」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/tjreal/article/details/80115256

这些信息有用吗?
Do you have any suggestions for improvement?

Thanks for your feedback!