Dispatchers
An Akka MessageDispatcher
is what makes Akka Actors "tick", it is the engine of the machine so to speak.
All MessageDispatcher
implementations are also an ExecutionContext
, which means that they can be used
to execute arbitrary code, for instance Futures.
Default dispatcher
Every ActorSystem
will have a default dispatcher that will be used in case nothing else is configured for an Actor
.
The default dispatcher can be configured, and is by default a Dispatcher
with the specified default-executor
.
If an ActorSystem is created with an ExecutionContext passed in, this ExecutionContext will be used as the default executor for all
dispatchers in this ActorSystem. If no ExecutionContext is given, it will fallback to the executor specified in
akka.actor.default-dispatcher.default-executor.fallback
. By default this is a "fork-join-executor", which
gives excellent performance in most cases.
Looking up a Dispatcher
Dispatchers implement the ExecutionContext
interface and can thus be used to run Future
invocations etc.
// this is scala.concurrent.ExecutionContext
// for use with Futures, Scheduler, etc.
final ExecutionContext ex = system.dispatchers().lookup("my-dispatcher");
Setting the dispatcher for an Actor
So in case you want to give your Actor
a different dispatcher than the default, you need to do two things, of which the first is
is to configure the dispatcher:
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
注釈
Note that the parallelism-max
does not set the upper bound on the total number of threads
allocated by the ForkJoinPool. It is a setting specifically talking about the number of hot
threads the pool keep running in order to reduce the latency of handling a new incoming task.
You can read more about parallelism in the JDK's ForkJoinPool documentation.
And here's another example that uses the "thread-pool-executor":
my-thread-pool-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "thread-pool-executor"
# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads to cap factor-based core number to
core-pool-size-min = 2
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 2.0
# maximum number of threads to cap factor-based number to
core-pool-size-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
注釈
The thread pool executor dispatcher is implemented using by a java.util.concurrent.ThreadPoolExecutor
.
You can read more about it in the JDK's ThreadPoolExecutor documentation.
For more options, see the default-dispatcher section of the Configuration.
Then you create the actor as usual and define the dispatcher in the deployment configuration.
ActorRef myActor =
system.actorOf(Props.create(MyUntypedActor.class),
"myactor");
akka.actor.deployment {
/myactor {
dispatcher = my-dispatcher
}
}
An alternative to the deployment configuration is to define the dispatcher in code.
If you define the dispatcher
in the deployment configuration then this value will be used instead
of programmatically provided parameter.
ActorRef myActor =
system.actorOf(Props.create(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor3");
注釈
The dispatcher you specify in withDispatcher
and the dispatcher
property in the deployment
configuration is in fact a path into your configuration.
So in this example it's a top-level section, but you could for instance put it as a sub-section,
where you'd use periods to denote sub-sections, like this: "foo.bar.my-dispatcher"
Types of dispatchers
There are 3 different types of message dispatchers:
Dispatcher
This is an event-based dispatcher that binds a set of Actors to a thread pool. It is the default dispatcher used if one is not specified.
Sharability: Unlimited
Mailboxes: Any, creates one per Actor
Use cases: Default dispatcher, Bulkheading
- Driven by:
java.util.concurrent.ExecutorService
specify using "executor" using "fork-join-executor", "thread-pool-executor" or the FQCN of an
akka.dispatcher.ExecutorServiceConfigurator
- Driven by:
PinnedDispatcher
This dispatcher dedicates a unique thread for each actor using it; i.e. each actor will have its own thread pool with only one thread in the pool.
Sharability: None
Mailboxes: Any, creates one per Actor
Use cases: Bulkheading
- Driven by: Any
akka.dispatch.ThreadPoolExecutorConfigurator
by default a "thread-pool-executor"
- Driven by: Any
CallingThreadDispatcher
- This dispatcher runs invocations on the current thread only. This dispatcher does not create any new threads, but it can be used from different threads concurrently for the same actor. See CallingThreadDispatcher for details and restrictions.
- Sharability: Unlimited
- Mailboxes: Any, creates one per Actor per Thread (on demand)
- Use cases: Testing
- Driven by: The calling thread (duh)
More dispatcher configuration examples
Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO:
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}
And then using it:
ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class)
.withDispatcher("blocking-io-dispatcher"));
Configuring a PinnedDispatcher
:
my-pinned-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
And then using it:
ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class)
.withDispatcher("my-pinned-dispatcher"));
Note that thread-pool-executor
configuration as per the above my-thread-pool-dispatcher
example is
NOT applicable. This is because every actor will have its own thread pool when using PinnedDispatcher
,
and that pool will have only one thread.
Note that it's not guaranteed that the same thread is used over time, since the core pool timeout
is used for PinnedDispatcher
to keep resource usage down in case of idle actors. To use the same
thread all the time you need to add thread-pool-executor.allow-core-timeout=off
to the
configuration of the PinnedDispatcher
.
Contents