Stash for restarting Akka Typed Actors
This began as article on primitive asynchronous await-future-completion support in Akka Typed actors, but the stash prerequisite part turns to be long enough for single post. I will write the rest if this gets some attention 🙂
Akka actors are push-based: an actor is a blackbox that is pushed a message from mailbox, processes this message and returns the control back to scheduler. There is no way to let the scheduler know that an actor can’t handle a message for some internal reason; an actor must always handle message pushed to it.
A typical example is an actor that acts as a processor for data (e.g. from Kafka) that have to be persisted (e.g. to Redis):
- on actor start, we read previously persisted data (if any); the persisted state then becomes part of in-memory actor state
- on message, we update the internal actor state and persist it; the key requirement here is that next message must not be processed until the result of previous processing is known to be persisted
(Say we can’t or don’t want to use Akka Streams for some reason.) The persistence operation is typically represented by Future
and its completion is signalled to actor as message, usually there is something like self ! Completed
in onComplete
future’s continuation. What we want is to temporarily suspend message processing in actor until the actor receives the completion message (Completed
). As noted above, we can’t, so there has to be some kind of queue. The machinery developed below can be used for implementation of generic solution that (almost) hides completion message and delayed processing of messages received in the meantime.
Classical Akka actors (hereinafter referred to as Untyped) may mix-in stash support which allows to stash away messages that are not pushed to actor until we let the framework know—for meantime messages we say “no, this is not the message I’m waiting for, please keep it for me” and then, when the awaited completion message is received, “I’m ready now, please push me the messages you’ve kept for me”. The solution uses some internal magic (see documentation) and in my opinion breaks the purity of actor model in which the framework is only a substance on which actors live and work. As every prefabricated solution, it may not compose well with client’s needs. On the other hand, it has the useful property that stashed messages are kept outside of actor, so they are not lost in case of crash when actor restarts to its initial behavior.
Recent Akka Typed actors are a great step towards composable functional approach where the protocol between actor and scheduler is dead simple: the actor state as seen from scheduler is only a behavior (effectively a function object that captures actor state) that — when passed a message from mailbox — returns back another behavior. No magic, no mix-ins, no mandatory inheritance from Actor
, no become
/unbecome
side effects into actor’s this
.
There is a stash support in Akka Typed, but it is only a rather clever queue living (in docs example) inside actor that allows to process multiple messages at once. If an actor fails during processing of stash, messages are lost.
Before we continue: the problem applies only if the behavior is wrapped inside restarting supervisor (Akka Typed defaults to stop actor on failure which is perhaps the most notable difference to Akka Untyped): if an actor stops, messages that may be inside mailbox are lost and these are outside our control, so our problem is only to implement stash solution that survives restarts.
To see the problem, let’s write down a behavior which captures the essence of Akka Typed stash example:
If we run this behavior under restarting strategy
we get (relevant parts of log):
20:26:45.291 INFO - creating stash
20:26:45.293 INFO - A1
20:26:45.294 INFO - waiting for 1 second
20:26:45.297 INFO - stashing Log(B1)
20:26:45.297 INFO - stashing Log(C1)
20:26:46.316 INFO - awaited future completed, unstashing
20:26:46.323 INFO - B1
20:26:46.324 INFO - C1
20:26:47.294 INFO - A2
20:26:47.294 INFO - waiting for 1 second
20:26:47.294 INFO - stashing Log(B2)
20:26:47.295 INFO - stashing Fail
20:26:47.295 INFO - stashing Log(C2)
20:26:48.316 INFO - awaited future completed, unstashing
20:26:48.316 INFO - B2
20:26:48.322 ERROR - failed on demand
20:26:48.323 INFO - creating stash
20:26:49.296 INFO - A3
The restarting supervisor wrapper doesn’t know that stashing happens inside, so if a failure bubbles up to it, it happily restarts the actor to initial behavior with empty stash (duplicated creating stash
). The actor fails while C2
is still unprocessed in buffer
and thus C2
is lost.
What can we do? Obviously, only one stash instance should be created per actor. This means that it is not actor that should create its stash, the stash must be created outside when behavior is being spawned and captured by behavior implementation. We strive for general typesafe solution that allows actor to be implemented as behavior that may be on client’s side wrapped in Behaviors.supervise
.
The basic idea is to do something like
(buffer
becomes a parameter) spawned as
(This actually works, C2
is not lost.) There are only two changes:
- we pass already created
buffer
into behavior, so this instance is used after actor restarts, - initial behavior has changed to
buffer.unstashAll(ctx, active)
so if the actor fails during stash processing, it resumes with processing after restart.
That’s the whole idea, so you could stop reading now if the solution didn’t have one fundamental flaw: behavior(buffer)
is a broken Behavior
instance that should be spawn
ed only once, otherwise two or more running actors would share the same stash: if a client passes behavior(buffer)
to innocent code that uses it to spawn several actors, the application easily turns into mess. A compiler can’t warn us something is wrong.
With some type kung-fu we can avoid this. We need a type that allows a client to use behavior in only one spawn
statement. Let’s invent this type!
Original implementation of (initial) behavior returned Behavior[Command]
, now it will return SpawnableBehavior[Command]
. “Spawnable” behavior takes care about what’s necessary when spawning the behavior into actor. In our case, it is act of creating an instance of stash buffer:
Note this is principally the same as first version with broken Behavior
, only made safe with new types. The implementation of our example changes to
Notice how buffer
is introduced in user friendly withStash
block and how we write active
(not buffer.unstashAll
) as initial behavior as in the very first example that doesn’t work when wrapped in restarting supervisor. Wrapping and spawning this behavior in ActorTestKit
is easy enough:
The only problem is that if unstashAll
called after start or restart produces Behaviors.stopped
under restarting supervision, the actor remains active. I consider this a bug in Akka Typed and have already created an issue. (There exists a way to overcome this problem, but I don’t want to present it now.)
The future part will show how withStash
may help to reduce boilerplate with defining variouswaiting
behavior. If you want it, please 👏