注:本节未经校验,如有问题欢迎提issue
线路断路器用于提供稳定性并防止在分布式系统中的级联故障。它们应该结合在远程系统之间的接口使用明智的超时,以防止单个组件的故障拖垮所有组件。
作为一个例子,我们有一个web 应用程序与远程的第三方web服务进行交互。假如第三方已用完了他们的容量,他们的数据库也在高荷载作用下熔化。假设数据库在这种情况下失败,第三方 web 服务用了很长的时间来返回一个错误。这进一步使调用在长一段时间后失败。回到我们的 web 应用程序,用户已注意到其表单提交似乎比需要的使用了更长的时间。当然用户知道要做的是点击刷新按钮,将更多的请求添加到其已在运行的请求中。这最终导致 web 应用程序由于资源枯竭而失败。这将会影响所有用户,甚至是那些没有使用依赖于此第三方 web 服务的功能的。
为 web 服务的调用引入断路器会导致请求开始快速失败,让用户知道有什么地方不对劲,并且他们不需要刷新他们的请求。这还局限失效行为只影响到那些正在使用此第三方功能依赖的用户,因为没有资源枯竭,其他用户不再受影响。电路断路器还可以允许聪明的开发者来标记使用不可用功能的网站部分,或也许在断路器处于打开状态时,显示出一些适当的缓存内容。
Akka库提供名为 akka.pattern.CircuitBreaker
的断路器实现,具有如下所述的行为。
Closed
状态:callTimeout
的调用增加一个失败计数maxFailures
时,断路器跳入Open
状态Open
状态:CircuitBreakerOpenException
快速失败resetTimeout
,断路器进入Half-Open
状态Half-Open
状态:Open
状态一样快速失败Closed
状态Open
状态并经历另一个完整的resetTimeout
onOpen
、 onClose
和 onHalfOpen
提供回调ExecutionContext
中执行。下面是如何配置一个CircuitBreaker
:
import scala.concurrent.duration._
import akka.pattern.CircuitBreaker
import akka.pattern.pipe
import akka.actor.Actor
import akka.actor.ActorLogging
import scala.concurrent.Future
import akka.event.Logging
class DangerousActor extends Actor with ActorLogging {
import context.dispatcher
val breaker =
new CircuitBreaker(context.system.scheduler,
maxFailures = 5,
callTimeout = 10.seconds,
resetTimeout = 1.minute).onOpen(notifyMeOnOpen())
def notifyMeOnOpen(): Unit =
log.warning("My CircuitBreaker is now open, and will not close for one minute")
import akka.actor.UntypedActor;
import scala.concurrent.Future;
import akka.event.LoggingAdapter;
import scala.concurrent.duration.Duration;
import akka.pattern.CircuitBreaker;
import akka.event.Logging;
import static akka.pattern.Patterns.pipe;
import static akka.dispatch.Futures.future;
import java.util.concurrent.Callable;
public class DangerousJavaActor extends UntypedActor {
private final CircuitBreaker breaker;
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public DangerousJavaActor() {
this.breaker = new CircuitBreaker(
getContext().dispatcher(), getContext().system().scheduler(),
5, Duration.create(10, "s"), Duration.create(1, "m"))
.onOpen(new Runnable() {
public void run() {
notifyMeOnOpen();
}
});
}
public void notifyMeOnOpen() {
log.warning("My CircuitBreaker is now open, and will not close for one minute");
}
下面是如何将CircuitBreaker
用于保护一个异步调用,以及一个同步调用:
def dangerousCall: String = "This really isn't that dangerous of a call after all"
def receive = {
case "is my middle name" =>
breaker.withCircuitBreaker(Future(dangerousCall)) pipeTo sender()
case "block for me" =>
sender() ! breaker.withSyncCircuitBreaker(dangerousCall)
}
public String dangerousCall() {
return "This really isn't that dangerous of a call after all";
}
@Override
public void onReceive(Object message) {
if (message instanceof String) {
String m = (String) message;
if ("is my middle name".equals(m)) {
pipe(breaker.callWithCircuitBreaker(
new Callable<Future<String>>() {
public Future<String> call() throws Exception {
return future(
new Callable<String>() {
public String call() {
return dangerousCall();
}
}, getContext().dispatcher());
}
}), getContext().dispatcher()).to(getSender());
}
if ("block for me".equals(m)) {
getSender().tell(breaker
.callWithSyncCircuitBreaker(
new Callable<String>() {
@Override
public String call() throws Exception {
return dangerousCall();
}
}), getSelf());
}
}
}
注意
使用
CircuitBreaker
伴生对象的apply
或create
方法将返回在调用者的线程中执行回调的CircuitBreaker
。如果异步Future
不必要的时候,这可以是很有用的,例如仅调用同步的API。