The Developer’s Cry

Yet another blog by a hobbyist programmer

Rust In Practice - Parallel programming

Any modern systems programming language supports threads, and Rust is no different. However, Rust is not very intuitive when it comes to exactly how to do parallel programming in Rust. The standard library includes threads, mutexes, and there even is a “mpsc” (multiple producer, single consumer) channel. There are some gotchas to using these correctly in Rust.

What kind of program are we making, anyway? I have a directory tree walker that traverses the filesystem. While doing so, it makes checksums of files. Now I want to do this in parallel over multiple filesystems, starting at each filesystem mountpoint.

You may wonder why I don’t let the main thread traverse the filesystem, and checksum individual files in parallel instead. The reason is that checksumming is I/O bound; a single checksumming thread can easily utilize the full disk bandwidth, while computing the checksum doesn’t take a lot of CPU at all. Modern hashing algorithms are extremely fast, they are literally waiting on I/O. If you parallelize the I/O within the same disk device, then you end up only trashing the disk. This holds up for spinning disks as well as flash drives; you get no speedup because the device can not physically go any faster than it already does. Therefore we parallelize over different mountpoints, and pray that they are on different disk devices. [Note that they may still be partitions on the same drive; I didn’t go so far as to actually check that].

Threading

In a more abstract form; we have a bunch of work (filesystem mountpoints), and divide that work over a number of parallel threads. The work items are written to a queue; each worker thread consumes one item from the queue at a time. The results are written to a shared space in memory, which is guarded by a mutex. When all work items have been processed, we let the threads join and exit the program.

Now, let’s implement that in Rust. First off, we will spawn a number of threads; we want to have one worker thread per CPU core. Afterwards, we want to join all threads. That part of the code looks like this:

let num_threads = thread::available_parallelism()
    .expect("failed to determine number of CPU cores")
    .get() as u32;

let workers = Vec::new();

for worker_id in 1..=num_threads {
    let worker = thread::spawn(
        ... // TODO add thread code here
    );
    workers.push(worker);
}

for worker in workers {
    worker.join()
        .expect("[main] failed to join thread");
}

Channeling

Before spawning the threads, we want to set up a work queue and put some items on it, that are to be processed in parallel. The workers will grab items from this queue. Notice this is a “single producer, multiple consumer” type of problem.

The Rust standard library includes a “mpsc” channel (multiple producer, single consumer). What we need is quite the opposite. Where is the “spmc” in the standard library?? Unfortunately, there is none. We need to import an external crate for that; crate crossbeam contains a bunch of tooling for parallel programming, in particular we will be using a crossbeam::channel as work queue.

A channel can have a limited capacity (a bounded channel), or it can have a dynamic capacity (unbounded channel). The channel acts like a pipe that you can transmit messages through, and thus it has a sending and a receiving side. This is typically coded as follows:

let (tx, rx) = crossbeam::channel::unbounded::<WorkItem>();

for item in work_items {
    tx.send(item)
        .expect("[main] send error");
}

// the main thread is done sending; close the sending side
drop(tx);

for worker_id in 1..=num_threads {
    // each worker uses a clone of the receiving side
    let rx = rx.clone();

    let worker = thread::spawn(
        ... // TODO add thread code here
            // TODO rx.recv() work item
    );
    ...
}

The above code shows that each worker thread clones the receiving end of the channel. Mind that Rust ensures memory safety via ownership; each thread owns its individual copy of a reference to the receiver of the channel.

The thread worker code we will implement as a closure, which is an in-lined function written as an argument to thread::spawn(). Rust has a particular syntax for capturing variables; “move ||” tells the compiler that any variables used in the closure will now be owned by the closure. Again, remember that Rust guarantees memory safety by tracking ownerships.

let worker = thread::spawn(move || {
    // the worker loops until there are
    // no more work items in the channel
    loop {
        // take work item from the channel
        match rx.recv() {
            Ok(item) => {
                worker_main(worker_id, item);
            },
            Err(_) => break,
        }
    }
    eprintln!("[{}] exiting", worker_id);
});

Sharing and locking

There is one more thing to add, we wish to collect results. The worker should put the checksums into a shared dictionary, that is indexed by filename. So, we will have a HashMap guarded by a mutex. In many languages a mutex is a separate object; in Rust the Mutex is a wrapper type that contains the instance it guards. So, the hash-map variable is a Mutex that contains a HashMap. On top of that, each thread must have its own reference to the map; therefore we clone an atomically refcounted shared pointer (Arc).

let sum_by_name = Arc::new(Mutex::new(HashMap::new()));

for task_id in 1..=num_threads {
    ...

    let mut sum_by_name = Arc::clone(&sum_by_name);

    let task = thread::spawn(
        ...

            worker_main(worker_id, item, &mut sum_by_name);

        ...
    );
}

The type of the hash-map passed to the worker main function becomes a horrifying &mut Arc<Mutex<HashMap<K,V>>>. Hooray for LSP tools such as rust-analyzer.

If you want to use the hash-map, then you must always lock the mutex (this is even true if all workers already exited):

let num_files = sum_by_name.lock()
    .expect("unable to lock mutex")
    .len();
println!("{} files", num_files);

Or, if you need to hold the lock for a longer while:

let mut guard = sum_by_name.lock()
    .expect("unable to lock mutex");

for record in records {
    guard.insert(record.name.clone(), record.sum);
}

drop(guard);

Closing

As we saw, Rust has a pretty unique way of doing parallelism. And as usual while we’re still learning Rust, it’s very much a game of “how do I …?”.

The Rust way is a little quirky, and I can’t help but mentioning how the equivalent in Go is so much easier. I was unpleasantly surprised that Rust does not have proper channels in the standard library, and one needs to grab an external crate for that (crossbeam is excellent, by the way).

The key aspect of parallel Rust is that, to guarantee memory safety, each thread is the owner of their own memory space. What you get in return is unparalleled performance (pun intended).