In diesem Blogbeitrag zeige ich Ihnen, wie Sie einen exponentiellen Backoff-Algorithmus in Kombination mit einer Stopping-Supervisor-Strategie verwenden können. In vielen Fällen möchten Sie eine 'gefährliche Operation' wiederholen, etwas, das zum Absturz führen kann, z.B. ein Aufruf eines externen Dienstes. Die OneForOneStrategy undAllForOneStrategy Supervisor-Strategien haben zwei Argumente für ein Wiederholungsfenster: maxNrOfRetries und withinTimeRange können verwendet werden, um eine maximale Anzahl von Wiederholungen innerhalb eines maximalen Zeitraums festzulegen. Dieser Wiederholungsmechanismus versucht es so schnell wie möglich und wartet nicht zwischen den Wiederholungen. In manchen Fällen möchten Sie, dass der Wiederholungsversuch verzögert wird, damit der fehlgeschlagene Dienst nicht mit unnötigen Wiederholungen überlastet wird; er könnte damit beschäftigt sein, sich zu erholen und das könnte einige Zeit dauern. Anstatt den fehlgeschlagenen Dienst mit der gleichen Anfrage zu überlasten, möchten wir einen Algorithmus verwenden, der entscheidet, länger zu warten, wenn der gefährliche Vorgang immer wieder fehlschlägt.
Exponentieller abgeschnittener Backoff ist ein solcher Algorithmus, der auch bei der Vermeidung von Netzwerküberlastungen eingesetzt wird. Der Algorithmus ist recht einfach:
[scala]
case class ExponentialBackOff(slotTime: FiniteDuration, ceiling: Int = 10, stayAtCeiling: Boolean = false,
Steckplatz: Int = 1, rand: Random = new Random(), waitTime: FiniteDuration = Duration.Zero,
Wiederholungen: Int = 0, Wiederholungen: Int = 0, totalRetries: Long = 0) {
def isStarted = Wiederholungen > 0
def reset(): ExponentialBackOff = {
copy(slot = 1, waitTime = Duration.Zero, resets = resets + 1, retries = 0)
}
def nextBackOff: ExponentialBackOff = {
def time: FiniteDuration = slotTime * times
def times = {
val exp = rand.nextInt(slot + 1)
math.round(math.pow(2, exp) - 1)
}
wenn (Schlitz >= Decke && !stayAtCeiling) reset()
sonst {
val (newSlot, newWait: FiniteDuration) = if (slot >= Decke) {
(Decke, Zeit)
} sonst {
(Slot + 1, Zeit)
}
copy(slot = newSlot,
waitTime = newWait,
Wiederholungen = Wiederholungen + 1,
totalRetries = totalRetries + 1)
}
}
}
[/scala]
Ich habe mich dafür entschieden, den obigen Code unveränderlich zu machen, damit er sicherer in nebenläufigem Code verwendet werden kann. Unveränderlichkeit ist nicht immer notwendig, wenn Sie innerhalb von Akteuren verwendet werden, aber es ist besser, unveränderliche Datenstrukturen in Kombination mit var zu verwenden als veränderliche Datenstrukturen mit val, für den Fall, dass Sie sie auf irgendeine Weise mit anderen Akteuren teilen möchten.
Sowohl reset als auch nextBackOff geben einen neuen ExponentialBackOff zurück. Der Algorithmus schreitet zufällig durch eine Reihe von Slots voran. Jeder Slot hat eine zugehörige Wartezeit, mit jedem nächsten Slot wird die Wartezeit exponentiell erhöht. Wenn der BackOff zurückgesetzt wird, beginnt er wieder beim ersten Slot. Als Nächstes werden wir uns ansehen, wie wir diesen Algorithmus mit Akka-Aktoren verwenden können. Wir werden den Standardansatz "Let it crash" verwenden und einen Akteur überwachen, der möglicherweise abstürzen kann. Wenn der Akteur (ich nenne diesen Akteur den 'gefährlichen Akteur') bei der gefährlichen Operation abstürzt, wird sein Supervisor den Akteur anhalten, einen neuen Akteur erstellen und weiterhin möglicherweise fehlgeschlagene Nachrichten und nächste Nachrichten an den neuen Akteur senden. Der Supervisor wird als BackOffSender bezeichnet, der alle Nachrichten an einen untergeordneten Akteur (den DangerousActor ) weiterleitet. Der obige BackOffSender und der DangerousActor verwenden eine Reihe von Nachrichten zur Kommunikation. Alle diese Nachrichten werden in einem Objekt gespeichert, dem BackOffProtocol . Wenn Sie die Nachrichten in einem Protokollobjekt zusammenfassen, können Sie Nachrichten, die miteinander in Beziehung stehen, leicht finden. Es ist auch einfacher, innerhalb der Akteure zu sehen, welche Nachrichten verwendet werden und zu welchem 'Nachrichtenprotokoll' sie gehören.
[scala]
object BackOffProtocol {
/**
* Der Nachrichtentyp, der in der Domäne verwendet wird. Der BackOffSender leitet diesen Nachrichtentyp
* an seinen gefährlichen Child-Actor weiter, der damit eine gefährliche Operation durchführt.
*/
case class Msg(id: Long, data: String)
/**
* Wird zwischen dem Supervisor (dem BackOffSender) und dem gefährlichen Child-Actor verwendet, um den ursprünglichen Absender der Msg-Nachricht zu ermitteln.
* Wenn eine Nachricht eine Zeit lang fehlschlägt und nach einer Reihe von Wiederholungen erfolgreich ist, kann dem ursprünglichen Absender der
* Msg geantwortet werden.
*/
case class TrackedMsg(msg: Msg, sender: ActorRef)
/**
* Der gefährliche Akteur muss diese Nachricht an seinen Supervisor (den BackOffSender) senden, wenn er
* die Nachricht erfolgreich verarbeitet hat. Wenn der BackOffSender diese Nachricht erhält, setzt er den BackOff-Algorithmus zurück.
*/
case class Sent(id: Long)
}
[/scala]
Der BackOffSender erzeugt den dangerousActor aus einem Props-Objekt und stellt ihn wieder her. Akka verwendet die elterliche Aufsicht , was bedeutet, dass ein Elternteil seine Kinder beaufsichtigen muss. Das Props-Objekt enthält alle Informationen, um den gefährlichen Akteur zu erstellen. Es ermöglicht es dem BackOffSender auch, ein Kind zu erstellen, ohne zu wissen, wie oder welche Abhängigkeiten es benötigt.
Der folgende Code zeigt den BackOffSender.
[scala]
class BackOffSender(dangerousProps: Props, slotTime: FiniteDuration = 10 millis, ceiling: Int = 10, stayAtCeiling: Boolean = false) extends Actor with ActorLogging {
import BackOffProtocol._
var backOff = ExponentialBackOff(slotTime, ceiling, stayAtCeiling)
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
var dangerousActor = context.actorOf(dangerousProps)
context.watch(dangerousActor)
// Nachrichten, die (noch) nicht quittiert wurden
var possiblyFailed = Vector[TrackedMsg]()
var scheduleResult: Option[Cancellable] = None
implicit val ec = context.system.dispatcher
val Tick = "tick"
def receive = {
case msg: Msg =>
val trackedMsg = TrackedMsg(msg, sender)
possiblyFailed = possiblyFailed :+ trackedMsg
// nur sofort senden, wenn das BackOff noch nicht benötigt oder durch ein Ack zurückgesetzt wurde.
// ansonsten mit der durch backOff festgelegten Verzögerung einplanen.
if (!backOff.isStarted) dangerousActor.forward(trackedMsg)
if (backOff.isStarted && scheduleResult.isEmpty) scheduleResult = Some(context.system.scheduler.scheduleOnce(backOff.waitTime, self, Tick))
case Tick =>
possiblyFailed.foreach(trackedMsg ? dangerousActor.tell(trackedMsg, trackedMsg.sender))
scheduleResult = None
case Terminated(failedRef) = >
// neu erstellen und beobachten
dangerousActor = context.actorOf(dangerousProps)
context.watch(dangerousActor)
// wenn es fehlgeschlagene Nachrichten gab, planen Sie nach der nächsten Wartezeit im BackOff-Alg.
// wenn eine davon fehlschlägt, führt dies zu einem weiteren Abbruch und einer möglicherweise höheren Verzögerung
backOff = backOff.nextBackOff
scheduleResult = Some(context.system.scheduler.scheduleOnce(backOff.waitTime, self, Tick))
case Sent(idKey) =>
//Ack des gefährlichen Akteurs. Entfernen Sie die erfolgreiche Nachricht und setzen Sie das BackOff zurück.
possiblyFailed = possiblyFailed.filterNot(tracked ? tracked.msg.id == idKey)
backOff = backOff.reset()
}
}
[/scala]
Der BackOffSender speichert alle Nachrichten und ihre zugehörigen Absender, bevor er sie an den gefährlichen Akteur weiterleitet. Sobald der gefährliche Akteur die korrekte Verarbeitung einer Nachricht mit einer Gesendet-Nachricht bestätigt, wird sie aus der Liste der möglicherweise fehlgeschlagenen Nachrichten entfernt. Der BackOffSender verwendet die Stopp-Strategie anstelle der Standard-Neustart-Strategie. Anstatt den gefährlichen Akteur neu zu starten, hält er ihn an, wenn er abstürzt. Nachrichten an den gefährlichen Akteur werden über den BackOffSender weitergeleitet, so dass der gefährliche Akteur einfach angehalten und von Grund auf neu erstellt werden kann. Die ActorRef des gefährlichen Akteurs ändert sich, wenn er gestoppt und neu erstellt wird. Das ist anders als beim Neustart eines Akteurs mit der Anweisung Restart , bei der die ActorRef immer auf die richtige Akteursinstanz verweist. Da der BackOffSender zwischen dem gefährlichen Akteur und dem Client sitzt, der mit dem gefährlichen Akteur interagieren möchte, hat er mehr Möglichkeiten als nur den Neustart des Akteurs, er könnte sogar auf einen anderen Dienst zurückgreifen. Der BackOffSender ist sowohl der Supervisor als auch ein Monitor des gefährlichen Akteurs, er erstellt und überwacht das Child. Wenn das Kind beendet wird, erhältder BackOffSendereine Nachricht Terminated. Wenn dies geschieht, erstellt der BackOffSender den gefährlichen Akteur neu, beobachtet ihn und sendet eine verzögerte 'Tick'-Nachricht an sich selbst.
Die Verzögerung wird durch den Backoff-Algorithmus bestimmt. Wenn der BackOffSender einen Tick erhält, versucht er, alle möglicherweise fehlgeschlagenen Nachrichten erneut zu senden. Neue Nachrichten, die empfangen werden, während der gefährliche Akteur ausfällt, werden ebenfalls mit einer Verzögerung eingeplant. Wenn eine gesendete Bestätigung empfangen wird, wird der BackOff-Algorithmus zurückgesetzt und die nächsten Nachrichten werden sofort wieder an den gefährlichen Akteur weitergeleitet.
Das folgende Beispiel zeigt einen gefälschten DangerousActor, der im Einheitstest verwendet wird.
[scala]
/**
* Ein 'gefährlicher' Akteur, der eine gefährliche Ressource verwendet, um seine Arbeit zu erledigen.
* Dieser Akteur wird in dem Moment beendet, in dem er eine Exception wirft
*/
class DangerousActor(dangerousResource: DangerousResource) extends Actor with ActorLogging {
import BackOffProtocol._
def receive = {
case trackedMsg: TrackedMsg =>
// die gefährliche Operation durchführen
val response = dangerousResource.danger(trackedMsg.msg.data)
// dem Supervisor mitteilen, dass die gefährliche Operation erfolgreich war
context.parent ! Sent(trackedMsg.msg.id)
// antworte dem Absender mit dem Ergebnis
sender ! trackedMsg.msg.copy(data = response)
}
}
[/scala]
Das vorgestellte Beispiel hat noch ein paar Nachteile, deren Lösung ich dem Leser überlasse. Der possiblyFailed-Vektor kann ins Unendliche wachsen, wenn der gefährliche Akteur immer wieder scheitert. Eine gute Lösung hierfür ist, den possiblyFailed-Vektor zu maximieren und die ältesten Einträge nach dem LRU-Prinzip zu entfernen. Die Standard-Stoppstrategie verwendet die Vorgaben für die OneForOneStrategie, was bedeutet, dass sie es immer wieder versuchen wird. Wenn Sie den exponentiellen Backoff "abschneiden" möchten, können Sie diese Standardargumente ändern und Ihre eigene benutzerdefinierte Supervisor-Strategie erstellen, die auf der standardmäßigen stoppingStrategy basiert. Das vollständige Beispiel einschließlich Unit-Tests ist aufgithub verfügbar. Es gibt auch ein Beispiel für einen gefährlichen Akka Camel Producer , der zeigt, wie Sie sich bei einem Antwortstatus von 500 von einem http-Server zurückziehen können. Wenn Sie mehr über Akka und seine Fehlertoleranzfunktionen wissen möchten, lesen Sie die großartige Online-Dokumentation oder kaufen Sie (schamlos) Akka in Action bei manning.com , das als Early Access verfügbar ist! Kapitel 3 über Fehlertoleranz wird sehr bald verfügbar sein.
Verfasst von
Cristiana
Some bio goes here
Unsere Ideen
Weitere Blogs
Contact
Let’s discuss how we can support your journey.



