Rust プログラミング 入門

Rust完全ガイド 第6回 並行処理の実装

現代のプログラミングにおいて、並行処理(Concurrency) は不可欠な技術です。
Rustでは、スレッドを使った並行処理、メッセージパッシングによるスレッド間通信、MutexRwLock を活用したデータの共有 など、高速かつ安全な並行プログラムを実装するための機能が提供されています。

一般的なプログラミング言語では、並行処理を扱う際に データ競合(Race Condition)デッドロック(Deadlock) などの問題が発生しやすくなります。
しかし、Rustは 所有権システムと型チェック によって、コンパイル時に安全性を保証し、スレッド安全なコードを実現できる という特徴を持っています。

本記事では、Rustのスレッド (std::thread) の基本、メッセージパッシング(mpsc)、共有メモリを安全に扱うための MutexRwLock について詳しく解説します。
並行処理の仕組みを理解し、効率的でバグの少ないマルチスレッドプログラムを実装するためのスキル を身につけましょう!

スレッド

コンピュータの処理速度を最大限に活用するためには、並行処理(Concurrency) を適切に扱うことが重要です。
Rustでは std::thread を使って簡単にスレッドを作成し、並行処理を実現できます。

しかし、並行処理には データ競合(Race Condition)やデッドロック(Deadlock) などの問題が伴います。
Rustの所有権システムを活用することで、これらの問題をコンパイル時に防ぎ、安全な並行プログラムを作成することが可能です。

この章では、Rustのスレッドの基本 (std::thread)、および join ハンドルを使ったスレッドの管理方法 を詳しく学びます。

Rustのスレッド (std::thread)

スレッドとは?

スレッドとは、プログラム内で並行して実行される軽量な処理単位 です。
Rustでは、標準ライブラリの std::thread を使って、新しいスレッドを簡単に作成できます。

スレッドの基本

Rustでスレッドを生成するには、thread::spawn() を使用します。

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..=5 {
            println!("スレッドのカウント: {}", i);
            thread::sleep(Duration::from_millis(500));
        }
    });

    for i in 1..=3 {
        println!("メインスレッドのカウント: {}", i);
        thread::sleep(Duration::from_millis(500));
    }
}

実行結果

メインスレッドのカウント: 1
スレッドのカウント: 1
メインスレッドのカウント: 2
スレッドのカウント: 2
メインスレッドのカウント: 3
スレッドのカウント: 3
スレッドのカウント: 4
スレッドのカウント: 5
  • thread::spawn(|| { ... }) によって、新しいスレッドを作成し、その中でループ処理を実行。
  • thread::sleep(Duration::from_millis(500)) により、スレッドごとの実行タイミングを調整。
  • メインスレッドと並行して、新しいスレッドが実行される。

スレッドの終了を待たずにプログラムが終了する問題

上記のコードでは、メインスレッドの処理が先に終了すると、新しいスレッドも途中で強制終了される可能性がある ため、意図しない動作になる場合があります。
この問題を防ぐには、join ハンドルを使ってスレッドの終了を待つ 必要があります。

joinハンドルによるスレッドの管理

join ハンドルとは?

Rustでは、スレッドを適切に管理するために JoinHandle という仕組みが提供されています。
thread::spawn() は、スレッドのハンドル(JoinHandle<T>)を返し、join() を呼ぶことで、スレッドの完了を待機することができます。

join を使ったスレッドの管理

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("スレッドのカウント: {}", i);
            thread::sleep(Duration::from_millis(500));
        }
    });

    handle.join().unwrap(); // スレッドの完了を待つ

    println!("メインスレッドが終了しました");
}

実行結果

スレッドのカウント: 1
スレッドのカウント: 2
スレッドのカウント: 3
スレッドのカウント: 4
スレッドのカウント: 5
メインスレッドが終了しました
  • thread::spawn()スレッドのハンドル(JoinHandle)を返す。
  • handle.join().unwrap(); を呼ぶことで、スレッドが終了するまでメインスレッドが待機する。
  • スレッドが完全に実行された後に、メインスレッドが終了する。

複数のスレッドを管理する

複数のスレッドを作成し、それぞれの終了を待つ場合は、ベクタに JoinHandle を格納し、すべてのスレッドを join() で待機する ことができます。

use std::thread;
use std::time::Duration;

fn main() {
    let mut handles = vec![];

    for i in 1..=3 {
        let handle = thread::spawn(move || {
            for j in 1..=5 {
                println!("スレッド {}: カウント {}", i, j);
                thread::sleep(Duration::from_millis(500));
            }
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("すべてのスレッドが完了しました");
}

実行結果

スレッド 1: カウント 1
スレッド 2: カウント 1
スレッド 3: カウント 1
スレッド 1: カウント 2
スレッド 2: カウント 2
スレッド 3: カウント 2
...
すべてのスレッドが完了しました
  • Vec<JoinHandle<()>> を用意し、各スレッドの JoinHandle をベクタに格納。
  • すべてのスレッドが join() を呼び出されるまでメインスレッドは待機。
  • スレッドが順番に処理され、すべてのスレッドの完了後にメインスレッドが終了する。

まとめ

  • Rustでは std::thread::spawn() を使って簡単にスレッドを作成できる。
  • メインスレッドが先に終了すると、スレッドも強制終了される可能性があるため、join() を使ってスレッドの終了を待つことが推奨される。
  • JoinHandle を使うことで、スレッドの管理を明確にし、メインスレッドが適切にスレッドの終了を待機できる。
  • 複数のスレッドを作成する場合は、ベクタに JoinHandle を格納し、すべてのスレッドを join() で待機する方法が一般的。

これで Rustのスレッド (std::thread) の基本と join ハンドルを用いたスレッドの管理方法 を学びました。
次回は、スレッド間で安全にデータをやり取りするための「メッセージパッシング(mpsc)」について詳しく解説します!

メッセージパッシング

並行処理を行う際に、複数のスレッド間で安全にデータをやり取りする必要があります。
Rustでは、「メッセージパッシング(Message Passing)」 という手法を用いて、スレッド間のデータ共有を安全に行うことができます。

一般的に、並行処理では 共有メモリ(Shared Memory) を使用する方法と、メッセージパッシング(Message Passing) を使用する方法の2つがあります。
Rustでは、安全性を重視し、「所有権を持つデータをメッセージとしてスレッド間で送受信する設計」 を推奨しています。

この章では、Rustのメッセージパッシングを実現する mpsc(マルチプロデューサ・シングルコンシューマ)、および sendrecv を用いたスレッド間通信 について詳しく学びます。

mpsc(マルチプロデューサ・シングルコンシューマ)

mpscとは?

Rustの標準ライブラリには、スレッド間通信を行うための 「チャネル(Channel)」 が用意されています。
チャネルには 「送信側(Producer)」「受信側(Consumer)」 があり、「送信側がデータを送信し、受信側がそれを受け取る」 という仕組みになっています。

Rustでは std::sync::mpsc(multi-producer, single-consumer) というモジュールを利用することで、複数のスレッド(Producer)から1つのスレッド(Consumer)にデータを送信するチャネル を作成できます。

基本的な mpsc の使用方法

まずは、1つのスレッドからメインスレッドにデータを送る基本的な例を見てみましょう。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel(); // チャネルを作成

    thread::spawn(move || {
        let message = String::from("こんにちは、Rust!");
        tx.send(message).unwrap(); // 送信
    });

    let received = rx.recv().unwrap(); // 受信
    println!("受信したメッセージ: {}", received);
}

コードのポイント

  • mpsc::channel()送信側(tx)と受信側(rx)を作成
  • tx.send(data) でデータを送信し、rx.recv() で受信する
  • 送信後に message の所有権が移動するため、送信後に message を使用することはできない。

recv()try_recv() の違い

  • recv() は、メッセージを受信するまでブロック(待機) する。
  • try_recv() は、すぐに結果を返す(メッセージがない場合は Err を返す)
match rx.try_recv() {
    Ok(msg) => println!("受信: {}", msg),
    Err(_) => println!("まだメッセージが届いていません"),
}

sendrecvによるスレッド間通信

複数のスレッドからメッセージを送る

Rustの mpscマルチプロデューサ・シングルコンシューマ(mpsc) なので、複数の送信側(Producer)を持つことができます。
tx.clone() を使うことで、複数のスレッドが同じチャネルにデータを送信可能です。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    for i in 1..=3 {
        let tx_clone = tx.clone();
        thread::spawn(move || {
            let message = format!("スレッド {} からのメッセージ", i);
            tx_clone.send(message).unwrap();
        });
    }

    for received in rx {
        println!("受信: {}", received);
    }
}

実行結果

受信: スレッド 1 からのメッセージ
受信: スレッド 2 からのメッセージ
受信: スレッド 3 からのメッセージ

コードのポイント

  • tx.clone() を使い、各スレッドが独立した送信ハンドルを持てるようにする。
  • for received in rx を使うと、受信チャネルが閉じられるまで自動でメッセージを受け取る。

非同期メッセージの受信

recv()ブロッキング処理(データが来るまで待機) ですが、
Rustでは iter() を使うことで、受信チャネルが閉じるまで待ち続ける処理を簡単に書くことができます。

for received in rx.iter() {
    println!("受信: {}", received);
}

この方法は、すべての送信者が終了すると自動的にループが終了 するため、プログラムをスムーズに終了させるのに役立ちます。

まとめ

  • Rustでは mpsc::channel() を使うことで、安全なメッセージパッシングによるスレッド間通信が可能。
  • send() を使ってデータを送信し、recv() または try_recv() で受信できる。
  • tx.clone() を使うことで、複数のスレッド(Producer)が1つのチャネル(Consumer)にデータを送ることが可能。
  • for received in rx を使うと、チャネルが閉じるまで自動的にメッセージを処理できる。

これで Rustのメッセージパッシング (mpsc) を使ったスレッド間通信の基本 を学びました。
次回は、スレッド間でメモリを共有しつつ、安全にアクセスを管理するための MutexRwLock の仕組み について詳しく解説していきます!

MutexRwLock

並行処理では、複数のスレッドが同じデータにアクセスする ことが頻繁にあります。
しかし、適切な管理を行わなければ、データ競合(Race Condition)や不正なメモリアクセス が発生し、プログラムの挙動が不安定になります。

Rustでは、安全な並行処理を実現するために Mutex<T>RwLock<T> といった同期プリミティブが提供されています。
これらを使うことで、複数のスレッドが同じデータを扱う際に、適切なロックを行いながらメモリの整合性を保つことができます。

この章では、共有メモリの管理を行う Mutex<T>、および Arc<T> との組み合わせによる安全なデータ共有 について学びます。

共有メモリの管理 (Mutex<T>)

Mutex<T> とは?

Mutex<T>「相互排他ロック(Mutual Exclusion)」 の略で、
複数のスレッドが同時に同じデータへアクセスするのを防ぐ仕組み です。

Rustでは、std::sync::Mutex を使うことで、スレッド間で安全にデータを共有できます。
ロックを取得したスレッドのみがデータにアクセスできるため、データ競合を防ぐことが可能です。

Mutex<T> の基本的な使い方

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5); // 共有データを持つ Mutex を作成

    {
        let mut num = m.lock().unwrap(); // ロックを取得
        *num += 1; // データを変更
    } // スコープを抜けるとロックが自動的に解放される

    println!("変更後の値: {:?}", m);
}

コードのポイント

  • Mutex::new(5)5 を格納した Mutex を作成する。
  • m.lock().unwrap() でロックを取得し、データへの可変アクセスを許可する。
  • スコープを抜けると自動でロックが解放されるため、安全にデータを管理できる。

しかし、このコードは シングルスレッド環境 では問題なく動作しますが、
複数のスレッドで同じデータを共有する場合には Arc<T> を組み合わせる必要があります。

Arc<T>との組み合わせ

Arc<T>とは?

Rustの所有権システムでは、1つの変数の所有者は1つのスレッドのみ です。
そのため、Mutex<T> を複数のスレッドで共有するには、Arc<T>(Atomic Reference Counting)を使用して所有権を複数のスレッドで共有可能にする必要 があります。

Arc<Mutex<T>> を使ったスレッド間データ共有

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0)); // 共有データを作成
    let mut handles = vec![];

    for _ in 0..5 {
        let counter = Arc::clone(&counter); // 参照カウントを増やす
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("最終カウント: {}", *counter.lock().unwrap());
}

コードのポイント

  • Arc::new(Mutex::new(0))スレッド間で共有可能なデータを作成 する。
  • Arc::clone(&counter) を使って、各スレッドに Mutex<T> への参照を渡す。
  • 各スレッドは counter.lock().unwrap() でロックを取得し、データを変更する。
  • スレッドの終了を join() で待機し、最終的なカウントを表示する。

デッドロック(Deadlock)に注意

Mutex<T> を使う際に注意すべきなのが デッドロック です。
デッドロックとは、2つ以上のスレッドがお互いのロックを待ち続け、処理が停止する現象 です。

デッドロックの例

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    let lock1 = Arc::new(Mutex::new(()));
    let lock2 = Arc::new(Mutex::new(()));

    let l1 = Arc::clone(&lock1);
    let l2 = Arc::clone(&lock2);

    let handle1 = thread::spawn(move || {
        let _guard1 = l1.lock().unwrap();
        thread::sleep(Duration::from_millis(50)); // 少し待機
        let _guard2 = l2.lock().unwrap(); // ここでデッドロック発生
    });

    let handle2 = thread::spawn(move || {
        let _guard2 = l2.lock().unwrap();
        thread::sleep(Duration::from_millis(50)); // 少し待機
        let _guard1 = l1.lock().unwrap(); // ここでデッドロック発生
    });

    handle1.join().unwrap();
    handle2.join().unwrap();
}

デッドロックを防ぐ方法

  1. ロックの順番を統一する(すべてのスレッドで同じ順序でロックを取得する)。
  2. タイムアウト付きのロックを使う(一定時間ロックできなければ処理を中断する)。

まとめ

  • Mutex<T> はスレッド間で安全にデータを共有するための相互排他ロックを提供する。
  • lock().unwrap() を使うことで、ロックを取得しデータを変更できるが、スコープを抜けると自動で解放される。
  • 複数のスレッドで Mutex<T> を共有する場合は Arc<T> を使い、所有権を適切に管理する。
  • デッドロックを避けるために、ロックの順番を統一する、またはタイムアウト付きのロックを活用することが重要。

これで 第6回「並行処理の実装」 の学習が完了しました。
次回の 第7回では「ファイル処理とWeb開発」 について学びます。
Rustを使ったファイルの読み書きやエラーハンドリング、さらにWebフレームワークを活用したAPI開発の基本を学び、Rustをより実践的な領域で活用する方法を解説していきます!

  • この記事を書いた人

ふくまる

機械設計業をしていたが25歳でエンジニアになると決意して行動開始→ 26歳でエンジニアに転職→ 28歳でフリーランスエンジニアに→ 現在、34歳でフリーランス7年目 Go案件を受注中 Go,GCPが得意分野

-Rust, プログラミング, 入門