![]() |
VOOZH | about |
We’re so glad you’re here. You can expect all the best TNS content to arrive Monday through Friday to keep you on top of the news and at the top of your game.
Check your inbox for a confirmation email where you can adjust your preferences and even join additional groups.
Follow TNS on your favorite social media networks.
Become a TNS follower on LinkedIn.
Check out the latest featured and trending stories while you wait for your first TNS newsletter.
async and its association with asynchronous network I/O, this blog post argues that the Tokio runtime at the heart of the Rust async ecosystem is also a good choice for CPU-heavy jobs such as those found in analytics engines.
async) programming model, similar to languages such as JavaScript.
To take full advantage of multiple cores and async I/O, a runtime must be used, and while several alternatives are available to the Rust community, Tokio is the de facto standard. Tokio.rs describes it as: “an asynchronous runtime for the Rust programming language. It provides the building blocks needed for writing network applications.”
While this description emphasizes Tokio’s use for network communications, the runtime can be used for other purposes, as we will explore below.
async / await), and many relatively mature libraries for streams, async locking, channels, cancellation, etc.Tokio as a task scheduler for CPU-heavy tasks, right? WROOOOOOOONG!
Runtime should never be used for CPU-bound tasks. The key point is actually that the same Runtime instance (the same thread pool) should not be used for both I/O and CPU, and we have subsequently clarified the intent of the docs (gory details on the PR).
As an aside, the Tokio docs suggest using Rayon for CPU-bound tasks. Rayon is a great choice for many applications, but it has no support for async, so if your code has to do any I/O at all, you will have to straddle the painful sync/async boundary. I also found it challenging to map a pull-based execution model where a task has to wait for all inputs to be ready before it can run to Rayon.
/health. If that request sits on a task queue somewhere because Tokio is fully using your CPU efficiently to chew through reams of data- processing tasks, Kubernetes does not get the required “Everything is Fine” response and kills your process.
This line of reasoning results in the classic conclusion that since tail latencies are critical, you can’t use Tokio for CPU-heavy tasks.
However, as the Tokio docs counsel, what is really important to avoid getting nuked by Kubernetes and friends while fully saturating the CPU is to use a separate thread pool — one for “latency is important” tasks, like responding to /health, and one for CPU-heavy tasks. The optimal number of threads for these thread pools varies by your need and is a good topic for another separate article.
Perhaps by thinking about a Tokio Runtime as a sophisticated thread pool, the idea of using different Runtime instances might seem more palatable, and we demonstrate how to do so with the dedicated executor below.
.await,” as explained in Alice Ryhl’s post. This is to give the scheduler a chance to schedule something else, steal work, etc.
Of course, “a long time” depends on your application; Ryhl recommends 10 to 100 microseconds when optimizing for response tail latencies. I think 10 to 100 milliseconds is also fine for tasks when optimizing for CPU. However, since my estimated per-task Tokio overhead is in the ~10 nanoseconds range, it is nearly impossible to even measure Tokio Runtime overhead with 10 millisecond tasks.
Second, run your tasks on a separate Runtime instance. How do you do that? Glad you asked.
pub struct DedicatedExecutor {
state: Arc<Mutex<State>>,
}
/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
/// them) on a separate Tokio Executor
struct State {
/// Channel for requests -- the dedicated executor takes requests
/// from here and runs them.
requests: Option<std::sync::mpsc::Sender<Task>>,
/// Thread which has a different Tokio runtime
/// installed and spawns tasks there
thread: Option<std::thread::JoinHandle<()>>,
}
impl DedicatedExecutor {
/// Creates a new `DedicatedExecutor` with a dedicated Tokio
/// executor that is separate from the threadpool created via
/// `[tokio::main]`.
pub fn new(thread_name: &str, num_threads: usize) -> Self {
let thread_name = thread_name.to_string();
let (tx, rx) = std::sync::mpsc::channel::<Task>();
let thread = std::thread::spawn(move || {
// Create a new Runtime to run tasks
let runtime = Tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name(&thread_name)
.worker_threads(num_threads)
// Lower OS priority of worker threads to prioritize main runtime
.on_thread_start(move || set_current_thread_priority_low())
.build()
.expect("Creating Tokio runtime");
// Pull task requests off the channel and send them to the executor
runtime.block_on(async move {
while let Ok(task) = rx.recv() {
Tokio::task::spawn(async move {
task.run().await;
});
}
let state = State {
requests: Some(tx),
thread: Some(thread),
};
Self {
state: Arc::new(Mutex::new(state)),
}
}
std::thread, which creates a separate multi-threaded Tokio Runtime to run tasks and then reads tasks from a Channel and spawns them on the new Runtime.
Note: The new thread is key. If you try to create a new Runtime on the main thread or one of the threads Tokio has created, you will get an error, as there is already a Runtime installed.
Here is the corresponding code to send a task to this second Runtime.
impl DedicatedExecutor {
/// Runs the specified Future (and any tasks it spawns) on the
/// `DedicatedExecutor`.
pub fn spawn<T>(&self, task: T) -> Job<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let fut = Box::pin(async move {
let task_output = task.await;
tx.send(task_output).ok()
});
let mut state = self.state.lock();
let task = Task {
fut,
};
if let Some(requests) = &mut state.requests {
// would fail if someone has started shutdown
requests.send(task).ok();
} else {
warn!("tried to schedule task on an executor that was shutdown");
}
Job { rx, cancel }
}
}
Future called Job that handles transferring the results from the dedicated executor back to the main executor, which looks like:
#[pin_project(PinnedDrop)]
pub struct Job<T> {
#[pin]
rx: Receiver<T>,
}
impl<T> Future for Job<T> {
type Output = Result<T, Error>;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.rx.poll(cx)
}
}