More than 1 year has passed since last update.
Rust 100 Ex 🏃【36/37】 ブロッキング・非同期用の実装・キャンセル ~ラストスパート!~
前の記事
- 【0】 準備 ← 初回
- ...
- 【35】 非同期ランタイム・Futureトレイト ~非同期のお作法~ ← 前回
- 【36】 ブロッキング・非同期用の実装・キャンセル ~ラストスパート!~ ← 今回
100 Exercise To Learn Rust 演習第36回になります、ついにラス前で、解説に対応した演習は今回が最後です!
今回は非同期特有の考え方や操作が中心となっているみたいですね、サクッと行きたいと思います!(事情につき3問あります)
8章の非同期について、全体的に筆者の理解度が低いです...他の本とかも読んでる最中...マサカリ大歓迎です!
記事は執筆時点での理解という感じです
今回の関連ページ
[08_futures/05_blocking] ブロッキングを回避すべし
問題はこちらです。
// TODO: the `echo` server uses non-async primitives.
// When running the tests, you should observe that it hangs, due to a
// deadlock between the caller and the server.
// Use `spawn_blocking` inside `echo` to resolve the issue.
use std::io::{Read, Write};
use tokio::net::TcpListener;
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
loop {
let (socket, _) = listener.accept().await?;
let mut socket = socket.into_std()?;
socket.set_nonblocking(false)?;
let mut buffer = Vec::new();
socket.read_to_end(&mut buffer)?;
socket.write_all(&buffer)?;
}
}
「ブロッキングI/Oを使用するとデッドロックすることを確かめて、 spawn_blocking を使って解消してください」という問題です。
解説
というわけで、 spawn_blocking を使うように書き換えましょう。
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
loop {
let (socket, _) = listener.accept().await?;
+ tokio::task::spawn_blocking(|| -> anyhow::Result<()> {
let mut socket = socket.into_std()?;
socket.set_nonblocking(false)?;
let mut buffer = Vec::new();
socket.read_to_end(&mut buffer)?;
socket.write_all(&buffer)?;
+ Ok(())
+ });
}
}
どう変わったのでしょうか...?ブロッキングとノンブロッキングバージョンでどう違うのでしょう...?解説を試みてみます。
非同期処理でブロックしないとは、「 .await でこまめに主導権をランタイムに返す」ということです。もし長時間ブロックするスレッドがあり、ずっと .await してくれない場合、tokioランタイムにはそのタスクをどうにかする術はありません!
この問題うまいなぁと思うのは、 #[tokio::test] マクロ以下なのでシングルスレッド処理になっているんですよね...よって問題では次のようにしてデッドロックが起こっています。
- tokioランタイムはシングルスレッドしか扱えない状況
-
.awaitが入るたびにタスクを お手玉 している
-
- ノンブロッキングI/Oの場合は、
read_to_end呼び出し時に.awaitできるから、そこでwrite_all側のタスクに切り替わるので、デッドロックしない - しかし、ブロッキングI/Oの場合は、
read_to_endでtokioランタイムにスレッド主導権が返らないから、ずっと待ちになってしまう、デッドロック!
※ 以下の図は概略です!
全部非同期の場合
今回のブロッキングI/O混入バージョン
では spawn_blocking でどう解決するのか...?このメソッドを使うことで、 重たいブロッキング処理専用のスレッドを建ててそちらで実行 されます!
演習回答バージョン
暗号計算等重い処理やブロッキングAPIしか提供されておらずそれを使わなければならない場合などに対しては、この spawn_blocking 専用スレッドを用いると良さそうです。
[08_futures/06_async_aware_primitives] 非同期プリミティブ (非同期用実装)
問題はこちらです。
/// TODO: the code below will deadlock because it's using std's channels,
/// which are not async-aware.
/// Rewrite it to use `tokio`'s channels primitive (you'll have to touch
/// the testing code too, yes).
///
/// Can you understand the sequence of events that can lead to a deadlock?
use std::sync::mpsc;
pub struct Message {
payload: String,
response_channel: mpsc::Sender<Message>,
}
/// Replies with `pong` to any message it receives, setting up a new
/// channel to continue communicating with the caller.
pub async fn pong(mut receiver: mpsc::Receiver<Message>) {
loop {
if let Ok(msg) = receiver.recv() {
println!("Pong received: {}", msg.payload);
let (sender, new_receiver) = mpsc::channel();
msg.response_channel
.send(Message {
payload: "pong".into(),
response_channel: sender,
})
.unwrap();
receiver = new_receiver;
}
}
}
#[cfg(test)]
mod tests {
use crate::{pong, Message};
use std::sync::mpsc;
#[tokio::test]
async fn ping() {
let (sender, receiver) = mpsc::channel();
let (response_sender, response_receiver) = mpsc::channel();
sender
.send(Message {
payload: "pong".into(),
response_channel: response_sender,
})
.unwrap();
tokio::spawn(pong(receiver));
let answer = response_receiver.recv().unwrap().payload;
assert_eq!(answer, "pong");
}
}
ブロッキングして到着を待つ std::sync::mpsc; だとデッドロックが起きるので、非同期用の tokio::sync::mpsc を使うように書き直そうという問題です。
解説
回答としては、 .await が使用可能な tokio::sync::mpsc に置き換えるだけです!
// use std::sync::mpsc;
+use tokio::sync::mpsc;
pub struct Message {
payload: String,
response_channel: mpsc::Sender<Message>,
}
/// Replies with `pong` to any message it receives, setting up a new
/// channel to continue communicating with the caller.
pub async fn pong(mut receiver: mpsc::Receiver<Message>) {
loop {
+ if let Some(msg) = receiver.recv().await {
println!("Pong received: {}", msg.payload);
let (sender, new_receiver) = mpsc::channel(1);
msg.response_channel
.send(Message {
payload: "pong".into(),
response_channel: sender,
})
+ .await
.unwrap();
receiver = new_receiver;
}
}
}
#[cfg(test)]
mod tests {
use crate::{pong, Message};
// use std::sync::mpsc;
+ use tokio::sync::mpsc;
#[tokio::test]
async fn ping() {
let (sender, receiver) = mpsc::channel(1);
let (response_sender, mut response_receiver) = mpsc::channel(1);
sender
.send(Message {
payload: "pong".into(),
response_channel: response_sender,
})
+ .await
.unwrap();
tokio::spawn(pong(receiver));
+ let answer = response_receiver.recv().await.unwrap().payload;
assert_eq!(answer, "pong");
}
}
「なぜデッドロックするかわかる?」と問われているので、先ほどと同様にシーケンス図に描き起こしてみようと思います。
std::sync::mpsc バージョン
即効でデッドロックしましたね... recv() は .await したいです!
tokioに置き換えましょう。
tokio::sync::mpsc バージョン
無事にピンポン 👁 :ping_pong:
できています。
ちなみに本エクササイズの Book の方の解説では std::sync::Mutex と tokio::sync::Mutex が取り上げられており、さらにstdの方を用いた場合のデッドロックの仕組みまで解説されています1。
解説側は Mutex 、 エクササイズ側は mpsc を題材にしたみたいですね。この2つにとどまらず、前の問題に出てきた非同期I/O (tokio::io) やTCPソケット等 (tokio::net) 等、tokioクレートには非同期用プリミティブがたくさん用意されており、また大体が標準ライブラリが持つAPIに似ていてそこに .await を付けられるようになっています。tokioクレート以外でも、例えばreqwestクレートはfeatureフラグで非同期・ブロッキングを切り替えられたりできて興味深いですね。
コストの面を考えると、何でもかんでも非同期プリミティブを使うべきではない (ブロッキングプリミティブを使えという意味ではありません)という言説2もあり、考えることが多い非同期ですが、クレートの対応具合や対応の仕方を見てみると面白そうです(Bookに忖度した感想)。
[08_futures/07_cancellation] タスクのキャンセル
問題はこちらです。主要な行をハイライトしておきます。
// TODO: fix the `assert_eq` at the end of the tests.
// Do you understand why that's the resulting output?
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;
pub async fn run(listener: TcpListener, n_messages: usize, timeout: Duration) -> Vec<u8> {
let mut buffer = Vec::new();
for _ in 0..n_messages {
let (mut stream, _) = listener.accept().await.unwrap();
let _ = tokio::time::timeout(timeout, async {
stream.read_to_end(&mut buffer).await.unwrap();
})
.await;
}
buffer
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn ping() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let messages = vec!["hello", "from", "this", "task"];
let timeout = Duration::from_millis(20);
let handle = tokio::spawn(run(listener, messages.len(), timeout.clone()));
for message in messages {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (_, mut writer) = socket.split();
let (beginning, end) = message.split_at(message.len() / 2);
// Send first half
writer.write_all(beginning.as_bytes()).await.unwrap();
tokio::time::sleep(timeout * 2).await;
writer.write_all(end.as_bytes()).await.unwrap();
// Close the write side of the socket
let _ = writer.shutdown().await;
}
let buffered = handle.await.unwrap();
let buffered = std::str::from_utf8(&buffered).unwrap();
+ assert_eq!(buffered, "");
}
}
最終回手前にして久々の「実行結果どうなる...?」問題ですね...!
解説
Book で解説されている通り、 .await がある場所でのみ tokio::time::timeout 等を利用して外から停止させることができます。裏を返すと前の問題のような .await せずブロッキングしてしまうようなタスクは停止できない ということですね。こまめな .await が大切です。
.await は入れ子構造になっています。タイムアウト時にランタイムにコントロールを返した .await の時点で停止されるため、それを考慮して回答します!
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;
pub async fn run(listener: TcpListener, n_messages: usize, timeout: Duration) -> Vec<u8> {
let mut buffer = Vec::new();
for _ in 0..n_messages {
let (mut stream, _) = listener.accept().await.unwrap();
let _ = tokio::time::timeout(timeout, async {
// 最初を受け取ったところでread_to_end実装内部のawaitがyieldする
// そしてタイムアウトするので、前半だけ残る
stream.read_to_end(&mut buffer).await.unwrap();
})
.await;
}
buffer
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn ping() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let messages = vec!["hello", "from", "this", "task"];
let timeout = Duration::from_millis(20);
let handle = tokio::spawn(run(listener, messages.len(), timeout.clone()));
+ let mut answer = String::new();
for message in messages {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (_, mut writer) = socket.split();
let (beginning, end) = message.split_at(message.len() / 2);
+ answer = format!("{}{}", answer, beginning);
// Send first half
writer.write_all(beginning.as_bytes()).await.unwrap();
tokio::time::sleep(timeout * 2).await;
writer.write_all(end.as_bytes()).await.unwrap();
// Close the write side of the socket
let _ = writer.shutdown().await;
}
let buffered = handle.await.unwrap();
let buffered = std::str::from_utf8(&buffered).unwrap();
- assert_eq!(buffered, "");
+ assert_eq!(buffered, &answer);
// 答えは、各文字列の前半分を足し合わせた
// hefrthta
}
}
read_to_end メソッドでの .await 中にタイムアウトが来るように設定されています。 read_to_end の内部でも .await が呼ばれてバッファに書き込みを行っていると予想でき、前半部分だけが書き込まれているだろうと踏まえて回答すればそれが正解です。ハードコードせずその意図を組み入れた回答をしてみました!
AsyncDropがほしい...!(ない)
Book にてキャンセルの話のついでにクリーンアップの話題が出ています。そして「不幸なことに、Rustは非同期クリーンアップ操作に関して明確な手段を提供していません」みたいなことが書かれています。
クリーンアップと言えば、そう、我々には Drop トレイトがあります(参考: 第12回)。しかしそのシグネチャには、 当然非同期的な要素は含まれません 。
pub trait Drop {
// Required method
fn drop(&mut self);
}
例えば tokio::runtime::Handle::current を使うことで無理やり非同期処理を実行しようとすることは可能ですが、クリーンアップの確実性の担保が難しいです。
Bookでは Drop について、ケースに合わせて例えば次のように実装すると良いと書かれています。
- ランタイムで新しいタスクを作成する (挙げたPoCはこの方法)
-
mpsc等を使ってメッセージを送り、別タスクにクリーンアップを任せる (良さそう...?) - 裏で別スレッドを立ち上げる
Bookの例にも挙げられているSQLトランザクションのabort等、確実に実行してほしくて、かつ非同期ではない、いつものRustなら Drop トレイトにまかせておけばおkな処理でも、上記に挙げたような手法を取るしかないのが現状です。
このような要望に答えてくれそうな AsynDrop を考案しているWG もあるらしいですが、一方で「 AsyncDrop みたいなものが要らないような実装を心がけろ (おそらく上述したような対処をしようの意)」という言説もあります...umm
直感的に、 drop メソッドは軽い処理であってほしいような気がするので、"重め"なことをするであろう AsyncDrop というのはその思想に反していそうではあります3。一方、Pythonの with や Goの defer のような機能が Drop に求められているのも事実です。
そのため筆者的には with や defer により近い、 Drop とは異なるが、 Drop と同様スコープが外れる時にRAIIライクなクリーンアップをしてくれるトレイトがほしいなぁとか思ったりします。 drop 実行前に実行されるイメージです。もしそんな感じの CleanUp トレイト(筆者命名)があれば、 AsyncCleanUp トレイトみたいなのはありなんじゃないでしょうか?(妄想)
さて、無事にここまでこれました。次回、いよいよ最終回です...!
次の記事: 【37】 Axumでクラサバ! ~最終回~
-
解説で登場するソースコード、
MutexGuardは!Sendなため一見するとコンパイルが通らなそうに見えるのですが、tokio::spawnとかで包まない場合コンパイル通りますね(包む場合はコンパイルエラー)...block_onはSendを要求しないからでしょうか...? ↩ -
そのため逆に言えば
mpscで外にクリーンアップを任せるのが筋の良い方法に見えます。ただ結局非同期mpscを使う都合上、考えることは多そうです。spawn_blockingで別スレッドを建ててblocking_sendとか...? ↩
Register as a new user and use Qiita more conveniently
- You get articles that match your needs
- You can efficiently read back useful information
- You can use dark theme
