Stash for restarting Akka Typed Actors

Marek Scholle
5 min readJun 11, 2018

--

This began as article on primitive asynchronous await-future-completion support in Akka Typed actors, but the stash prerequisite part turns to be long enough for single post. I will write the rest if this gets some attention 🙂

Akka actors are push-based: an actor is a blackbox that is pushed a message from mailbox, processes this message and returns the control back to scheduler. There is no way to let the scheduler know that an actor can’t handle a message for some internal reason; an actor must always handle message pushed to it.

A typical example is an actor that acts as a processor for data (e.g. from Kafka) that have to be persisted (e.g. to Redis):

  • on actor start, we read previously persisted data (if any); the persisted state then becomes part of in-memory actor state
  • on message, we update the internal actor state and persist it; the key requirement here is that next message must not be processed until the result of previous processing is known to be persisted

(Say we can’t or don’t want to use Akka Streams for some reason.) The persistence operation is typically represented by Future and its completion is signalled to actor as message, usually there is something like self ! Completed in onComplete future’s continuation. What we want is to temporarily suspend message processing in actor until the actor receives the completion message (Completed). As noted above, we can’t, so there has to be some kind of queue. The machinery developed below can be used for implementation of generic solution that (almost) hides completion message and delayed processing of messages received in the meantime.

Classical Akka actors (hereinafter referred to as Untyped) may mix-in stash support which allows to stash away messages that are not pushed to actor until we let the framework know—for meantime messages we say “no, this is not the message I’m waiting for, please keep it for me” and then, when the awaited completion message is received, “I’m ready now, please push me the messages you’ve kept for me”. The solution uses some internal magic (see documentation) and in my opinion breaks the purity of actor model in which the framework is only a substance on which actors live and work. As every prefabricated solution, it may not compose well with client’s needs. On the other hand, it has the useful property that stashed messages are kept outside of actor, so they are not lost in case of crash when actor restarts to its initial behavior.

Recent Akka Typed actors are a great step towards composable functional approach where the protocol between actor and scheduler is dead simple: the actor state as seen from scheduler is only a behavior (effectively a function object that captures actor state) that — when passed a message from mailbox — returns back another behavior. No magic, no mix-ins, no mandatory inheritance from Actor, no become/unbecome side effects into actor’s this.

There is a stash support in Akka Typed, but it is only a rather clever queue living (in docs example) inside actor that allows to process multiple messages at once. If an actor fails during processing of stash, messages are lost.

Before we continue: the problem applies only if the behavior is wrapped inside restarting supervisor (Akka Typed defaults to stop actor on failure which is perhaps the most notable difference to Akka Untyped): if an actor stops, messages that may be inside mailbox are lost and these are outside our control, so our problem is only to implement stash solution that survives restarts.

To see the problem, let’s write down a behavior which captures the essence of Akka Typed stash example:

If we run this behavior under restarting strategy

we get (relevant parts of log):

20:26:45.291 INFO - creating stash
20:26:45.293 INFO - A1
20:26:45.294 INFO - waiting for 1 second
20:26:45.297 INFO - stashing Log(B1)
20:26:45.297 INFO - stashing Log(C1)
20:26:46.316 INFO - awaited future completed, unstashing
20:26:46.323 INFO - B1
20:26:46.324 INFO - C1
20:26:47.294 INFO - A2
20:26:47.294 INFO - waiting for 1 second
20:26:47.294 INFO - stashing Log(B2)
20:26:47.295 INFO - stashing Fail
20:26:47.295 INFO - stashing Log(C2)
20:26:48.316 INFO - awaited future completed, unstashing
20:26:48.316 INFO - B2
20:26:48.322 ERROR - failed on demand
20:26:48.323 INFO - creating stash
20:26:49.296 INFO - A3

The restarting supervisor wrapper doesn’t know that stashing happens inside, so if a failure bubbles up to it, it happily restarts the actor to initial behavior with empty stash (duplicated creating stash). The actor fails while C2 is still unprocessed in buffer and thus C2 is lost.

What can we do? Obviously, only one stash instance should be created per actor. This means that it is not actor that should create its stash, the stash must be created outside when behavior is being spawned and captured by behavior implementation. We strive for general typesafe solution that allows actor to be implemented as behavior that may be on client’s side wrapped in Behaviors.supervise.

The basic idea is to do something like

(buffer becomes a parameter) spawned as

(This actually works, C2 is not lost.) There are only two changes:

  1. we pass already created buffer into behavior, so this instance is used after actor restarts,
  2. initial behavior has changed to buffer.unstashAll(ctx, active) so if the actor fails during stash processing, it resumes with processing after restart.

That’s the whole idea, so you could stop reading now if the solution didn’t have one fundamental flaw: behavior(buffer) is a broken Behavior instance that should be spawned only once, otherwise two or more running actors would share the same stash: if a client passes behavior(buffer) to innocent code that uses it to spawn several actors, the application easily turns into mess. A compiler can’t warn us something is wrong.

With some type kung-fu we can avoid this. We need a type that allows a client to use behavior in only one spawn statement. Let’s invent this type!

Original implementation of (initial) behavior returned Behavior[Command], now it will return SpawnableBehavior[Command]. “Spawnable” behavior takes care about what’s necessary when spawning the behavior into actor. In our case, it is act of creating an instance of stash buffer:

Note this is principally the same as first version with broken Behavior, only made safe with new types. The implementation of our example changes to

Notice how buffer is introduced in user friendly withStash block and how we write active (not buffer.unstashAll) as initial behavior as in the very first example that doesn’t work when wrapped in restarting supervisor. Wrapping and spawning this behavior in ActorTestKit is easy enough:

The only problem is that if unstashAll called after start or restart produces Behaviors.stopped under restarting supervision, the actor remains active. I consider this a bug in Akka Typed and have already created an issue. (There exists a way to overcome this problem, but I don’t want to present it now.)

The future part will show how withStash may help to reduce boilerplate with defining variouswaiting behavior. If you want it, please 👏

--

--

No responses yet