Last Updated, 3 min read

The 1🐝🏎️ Challenge in Zig and Rust


After completing the challenge in cpp I decide to redo it in Zig and Rust to better understand the language related subtilities and tradeoffs.

Starting with rust.

I don’t have the dumb version of the solution in rust but straight away I notice how easy it is to handle errors in Rust just because you have no other option.

Result<…> and Option<…> makes things really elegant. But still rust does a pretty good job at hiding the underlying behaviour like memory allocations.

But upon testing optimized version of a multi-threaded solution that only perfoms a single copy of location strings from underlying buffer of std::io::BufReader to the std::collections::BTreeMap vs the unoptimized solution of reading from the underlying buffer to a std::string::String and then parsing (2 copies), to my surprise the unoptimized solution consistently did ~3 seconds better than the optimized solution and not just that the unoptimized solution is sooo much nicer to read and as a result easier to maintain.

Optimized Code.

use std::{
io::{BufRead, Read, Seek},
thread::JoinHandle,
};
const NUM_THREADS: usize = 8;
struct LocationStats {
min: i32,
max: i32,
sum: i64,
freq: usize,
}
fn get_file_path() -> std::path::PathBuf {
let args: std::vec::Vec<String> = std::env::args().collect();
match args.len() {
2 => std::path::PathBuf::from(&args[1]),
_ => std::path::PathBuf::from("../data/measurements.txt"),
}
}
fn print_result(
m: &std::collections::BTreeMap<String, LocationStats>,
) -> Result<(), std::io::Error> {
print!("{{");
let mut it = m.iter().peekable();
while let Some((location, stat)) = it.next() {
let mut avg = stat.sum as f64 / stat.freq as f64 / 10.0;
avg = (avg * 10.0).round() / 10.0;
print!(
"{}={:.1}/{:.1}/{:.1}",
location,
stat.min as f64 / 10.0,
avg,
stat.max as f64 / 10.0
);
if let Some(_) = it.peek() {
print!(", ");
}
}
print!("}}");
Ok(())
}
fn parse_line(line: &[u8]) -> Option<(String, i32)> {
let semicolon_pos = line.iter().position(|&b| b == b';')?;
let location = &line[..semicolon_pos];
let temp_bytes = &line[semicolon_pos + 1..];
if location.is_empty() || temp_bytes.is_empty() {
return None;
}
// Parse temperature from bytes
let temp_str = std::str::from_utf8(temp_bytes).ok()?.trim();
let temperature: f64 = temp_str.parse().ok()?;
let temp_int = (temperature * 10.0).round() as i32;
// Convert location bytes to String
let location_str = std::str::from_utf8(location).ok()?.to_string();
Some((location_str, temp_int))
}
fn update_map(
main_map: &mut std::collections::BTreeMap<String, LocationStats>,
batch_map: std::collections::BTreeMap<String, LocationStats>,
) {
for (location, stats) in batch_map {
let entry = main_map.entry(location).or_insert(LocationStats {
min: std::i32::MAX,
max: std::i32::MIN,
sum: 0,
freq: 0,
});
entry.min = std::cmp::min(entry.min, stats.min);
entry.max = std::cmp::max(entry.max, stats.max);
entry.sum += stats.sum;
entry.freq += stats.freq;
}
}
fn update_stats(
m: &mut std::collections::BTreeMap<String, LocationStats>,
location: String,
temperature: i32,
) {
let entry = m.entry(location).or_insert(LocationStats {
min: std::i32::MAX,
max: std::i32::MIN,
sum: 0,
freq: 0,
});
entry.min = std::cmp::min(entry.min, temperature);
entry.max = std::cmp::max(entry.max, temperature);
entry.sum += temperature as i64;
entry.freq += 1;
}
fn skip_first_line(start: u64, f: &mut std::fs::File) -> u64 {
assert!(start > 0);
let mut skipped_bytes = 0u64;
f.seek(std::io::SeekFrom::Start(start - 1)).unwrap();
let mut prev_char = [0u8; 1];
f.read_exact(&mut prev_char).unwrap();
if prev_char[0] != b'\n' {
// Skip rest of partial line and count the bytes
loop {
let mut byte = [0u8; 1];
match f.read_exact(&mut byte) {
Ok(()) => {
skipped_bytes += 1;
if byte[0] == b'\n' {
break;
}
}
Err(_) => break,
}
}
}
skipped_bytes
}
fn process_batch(
thread_idx: usize,
file_path: &std::path::Path,
start: u64,
batch_bytes: u64,
) -> std::collections::BTreeMap<String, LocationStats> {
let mut m = std::collections::BTreeMap::<String, LocationStats>::new();
let mut f = std::fs::File::open(file_path).unwrap();
let mut processed_bytes = 0u64;
if thread_idx == 0 {
f.seek(std::io::SeekFrom::Start(start)).unwrap();
} else {
processed_bytes += skip_first_line(start, &mut f);
}
let mut buf_reader = std::io::BufReader::new(f);
loop {
if processed_bytes >= batch_bytes {
break;
}
let buf = match buf_reader.fill_buf() {
Ok(buf) if buf.is_empty() => break, // EOF
Ok(buf) => buf,
Err(_) => break,
};
let line_end = buf.iter().position(|&b| b == b'\n');
match line_end {
Some(pos) => {
let line = &buf[..pos];
if !line.is_empty() {
if let Some((location, temperature)) = parse_line(line) {
update_stats(&mut m, location, temperature);
}
}
let bytes_consumed = pos + 1;
buf_reader.consume(bytes_consumed);
processed_bytes += bytes_consumed as u64;
}
// No newline
// case 1: when internal buffer ends with a line split
// case 2: EOF
// just call read_until directly, it'll handle the buffer internally
None => {
let mut line_buf = Vec::new();
match buf_reader.read_until(b'\n', &mut line_buf) {
Ok(0) | Ok(_) if line_buf.is_empty() => break,
Ok(n) => {
let line = if line_buf.ends_with(&[b'\n']) {
&line_buf[..line_buf.len() - 1]
} else {
&line_buf[..]
};
if !line.is_empty() {
if let Some((location, temperature)) = parse_line(line) {
update_stats(&mut m, location, temperature);
}
}
processed_bytes += n as u64;
}
Err(_) => break,
}
}
}
}
m
}
fn process_in_batches(
file_path: &std::path::Path,
file_size: u64,
) -> std::collections::BTreeMap<String, LocationStats> {
let batch_size = (file_size + NUM_THREADS as u64 - 1) / NUM_THREADS as u64;
let mut handles: std::vec::Vec<JoinHandle<std::collections::BTreeMap<String, LocationStats>>> =
Vec::with_capacity(NUM_THREADS);
for i in 0..NUM_THREADS {
let start = i as u64 * batch_size;
let path = file_path.to_path_buf();
handles.push(std::thread::spawn(move || {
process_batch(i, &path, start, batch_size)
}));
}
let mut m = std::collections::BTreeMap::<String, LocationStats>::new();
for handle in handles {
let batch_map = handle.join().unwrap();
update_map(&mut m, batch_map);
}
m
}
fn process_in_single_batch(
file_path: &std::path::Path,
file_size: u64,
) -> std::collections::BTreeMap<String, LocationStats> {
process_batch(0, file_path, 0, file_size)
}
fn process(file_path: &std::path::Path) -> std::collections::BTreeMap<String, LocationStats> {
let f = std::fs::File::open(file_path).unwrap();
let file_size = f.metadata().unwrap().len();
if file_size > 4 * 1024 {
process_in_batches(file_path, file_size)
} else {
process_in_single_batch(file_path, file_size)
}
}
fn main() {
let file_path = get_file_path();
let m = process(&file_path);
print_result(&m).unwrap();
}

Unoptimized Code.

use std::{
io::{BufRead, Read, Seek},
thread::JoinHandle,
};
const NUM_THREADS: usize = 8;
struct LocationStats {
min: i32,
max: i32,
sum: i64,
freq: usize,
}
fn get_file_path() -> std::path::PathBuf {
let args: std::vec::Vec<String> = std::env::args().collect();
match args.len() {
2 => std::path::PathBuf::from(&args[1]),
_ => std::path::PathBuf::from("../data/measurements.txt"),
}
}
fn print_result(
m: &std::collections::BTreeMap<String, LocationStats>,
) -> Result<(), std::io::Error> {
print!("{{");
let mut it = m.iter().peekable();
while let Some((location, stat)) = it.next() {
let mut avg = stat.sum as f64 / stat.freq as f64 / 10.0;
avg = (avg * 10.0).round() / 10.0;
print!(
"{}={:.1}/{:.1}/{:.1}",
location,
stat.min as f64 / 10.0,
avg,
stat.max as f64 / 10.0
);
if let Some(_) = it.peek() {
print!(", ");
}
}
print!("}}");
Ok(())
}
fn parse_line(line: &str) -> Option<(String, i32)> {
let line = line.trim();
if line.is_empty() {
return None;
}
let semicolon_pos = line.find(';')?;
let location = &line[..semicolon_pos];
let temp_str = &line[semicolon_pos + 1..];
if location.is_empty() || temp_str.is_empty() {
return None;
}
let temperature: f64 = temp_str.parse().ok()?;
let temp_int = (temperature * 10.0).round() as i32;
let location_str = location.to_string();
Some((location_str, temp_int))
}
fn update_map(
main_map: &mut std::collections::BTreeMap<String, LocationStats>,
batch_map: std::collections::BTreeMap<String, LocationStats>,
) {
for (location, stats) in batch_map {
let entry = main_map.entry(location).or_insert(LocationStats {
min: std::i32::MAX,
max: std::i32::MIN,
sum: 0,
freq: 0,
});
entry.min = std::cmp::min(entry.min, stats.min);
entry.max = std::cmp::max(entry.max, stats.max);
entry.sum += stats.sum;
entry.freq += stats.freq;
}
}
fn update_stats(
m: &mut std::collections::BTreeMap<String, LocationStats>,
location: String,
temperature: i32,
) {
let entry = m.entry(location).or_insert(LocationStats {
min: std::i32::MAX,
max: std::i32::MIN,
sum: 0,
freq: 0,
});
entry.min = std::cmp::min(entry.min, temperature);
entry.max = std::cmp::max(entry.max, temperature);
entry.sum += temperature as i64;
entry.freq += 1;
}
fn skip_first_line(start: u64, fp: &std::path::Path) -> u64 {
assert!(start > 0);
let mut skipped_bytes = 0u64;
let mut f = std::fs::File::open(fp).unwrap();
f.seek(std::io::SeekFrom::Start(start - 1)).unwrap();
let mut prev_char = [0u8; 1];
f.read_exact(&mut prev_char).unwrap();
if prev_char[0] != b'\n' {
let mut buf_reader = std::io::BufReader::new(f);
let mut buf = std::string::String::with_capacity(128);
skipped_bytes += buf_reader.read_line(&mut buf).unwrap() as u64;
}
skipped_bytes
}
fn process_batch(
thread_idx: usize,
file_path: &std::path::Path,
start: u64,
batch_bytes: u64,
) -> std::collections::BTreeMap<String, LocationStats> {
let mut f = std::fs::File::open(file_path).unwrap();
let mut processed_bytes = 0u64;
if thread_idx == 0 {
f.seek(std::io::SeekFrom::Start(start)).unwrap();
} else {
processed_bytes += skip_first_line(start, file_path);
f.seek(std::io::SeekFrom::Start(start + processed_bytes))
.unwrap();
}
let mut buf_reader = std::io::BufReader::new(f);
let mut m = std::collections::BTreeMap::<String, LocationStats>::new();
let mut line = std::string::String::with_capacity(128);
loop {
if processed_bytes >= batch_bytes {
break;
}
line.clear();
let bytes_read = buf_reader.read_line(&mut line).unwrap() as u64;
if bytes_read == 0 {
break; // EOF
}
processed_bytes += bytes_read;
if let Some((location, temperature)) = parse_line(&line) {
update_stats(&mut m, location, temperature);
}
}
m
}
fn process_in_batches(
file_path: &std::path::Path,
file_size: u64,
) -> std::collections::BTreeMap<String, LocationStats> {
let batch_size = (file_size + NUM_THREADS as u64 - 1) / NUM_THREADS as u64;
let mut handles: std::vec::Vec<JoinHandle<std::collections::BTreeMap<String, LocationStats>>> =
Vec::with_capacity(NUM_THREADS);
for i in 0..NUM_THREADS {
let start = i as u64 * batch_size;
let path = file_path.to_path_buf();
handles.push(std::thread::spawn(move || {
process_batch(i, &path, start, batch_size)
}));
}
let mut m = std::collections::BTreeMap::<String, LocationStats>::new();
for handle in handles {
let batch_map = handle.join().unwrap();
update_map(&mut m, batch_map);
}
m
}
fn process_in_single_batch(
file_path: &std::path::Path,
file_size: u64,
) -> std::collections::BTreeMap<String, LocationStats> {
process_batch(0, file_path, 0, file_size)
}
fn process(file_path: &std::path::Path) -> std::collections::BTreeMap<String, LocationStats> {
let f = std::fs::File::open(file_path).unwrap();
let file_size = f.metadata().unwrap().len();
if file_size > 4 * 1024 {
process_in_batches(file_path, file_size)
} else {
process_in_single_batch(file_path, file_size)
}
}
fn main() {
let file_path = get_file_path();
let m = process(&file_path);
print_result(&m).unwrap();
}

I understand that the optimised version can me made cleaner but it is an absolute nightmare to refactor because of rust’s ownership and borrow rules.

Additionally, I was using std::str::from_utf8 to convert location slice to owned String struct and if you read the docs it mention std::std::from_utf8_unchecked which if which you can use withing unsafe{…} blocks, I tried with both versions and the unsafe version did save me some time but only ~2% if i’m being generous. Honestly I think this type of optimizations depend on the context so I’m okay with the unsafe version.

I wasn’t expecting these behaviour from rust, I am pleasantly surprised, so my somewhat educated guess is that writing maintainable code and letting the compiler handle the optimization really pays off while using rust, at least for this workload. Nice.

Final Version

const NUM_THREADS: usize = 8;
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
struct LocationEntry {
location: String,
min: i32,
max: i32,
sum: i64,
freq: usize,
}
impl LocationEntry {
pub fn new() -> Self {
Self {
location: std::string::String::new(),
min: std::i32::MAX,
max: std::i32::MIN,
sum: 0,
freq: 0,
}
}
}
struct FastMap {
data: std::vec::Vec<LocationEntry>,
mask: usize,
}
impl FastMap {
fn hash(&self, key: &[u8]) -> usize {
let mut hash = 0 as usize;
for ch in key {
hash = hash * 1315423911 as usize + *ch as usize;
}
hash
}
fn get_idx(&self, key: &[u8]) -> usize {
let hash = self.hash(key);
let mut idx = hash & self.mask;
loop {
let location = &self.data[idx].location;
if location.len() <= 0 || location.as_bytes() == key {
return idx;
}
idx = (idx + 1) & self.mask;
}
}
fn sort_inplace(&mut self) {
self.data.sort();
}
pub fn update(&mut self, location: &[u8], temperature: i32) {
let idx = self.get_idx(location);
if self.data[idx].location.len() <= 0 {
self.data[idx].location = unsafe { std::str::from_utf8_unchecked(location).to_string() }
}
self.data[idx].freq += 1;
self.data[idx].sum += temperature as i64;
self.data[idx].min = std::cmp::min(self.data[idx].min, temperature);
self.data[idx].max = std::cmp::max(self.data[idx].max, temperature);
}
pub fn update_batch(&mut self, other: &Self) {
for i in 0..other.data.len() {
let other_location = &other.data[i].location;
if other_location.len() <= 0 {
continue;
}
let this_idx = self.get_idx(other_location.as_bytes());
if self.data[this_idx].location.len() <= 0 {
self.data[this_idx].location = other_location.to_string();
}
self.data[this_idx].freq += other.data[i].freq;
self.data[this_idx].sum += other.data[i].sum;
self.data[this_idx].min = std::cmp::min(self.data[this_idx].min, other.data[i].min);
self.data[this_idx].max = std::cmp::max(self.data[this_idx].max, other.data[i].max);
}
}
pub fn print_sorted(&mut self) {
self.sort_inplace();
let mut it = self.data.iter().peekable();
print!("{{");
while let Some(location_entry) = it.next() {
let location = &location_entry.location;
if location.len() <= 0 {
continue;
}
let mut avg = location_entry.sum as f64 / location_entry.freq as f64 / 10.0;
avg = (avg * 10.0).round() / 10.0;
print!(
"{}={:.1}/{:.1}/{:.1}",
location,
location_entry.min as f64 / 10.0,
avg,
location_entry.max as f64 / 10.0
);
if let Some(_) = it.peek() {
print!(", ");
}
}
print!("}}");
}
pub fn new() -> Self {
Self {
data: vec![LocationEntry::new(); 1 << 14],
mask: (1 << 14) - 1,
}
}
}
fn get\*file_path() -> std::path::PathBuf {
let args: std::vec::Vec<String> = std::env::args().collect();
match args.len() {
2 => std::path::PathBuf::from(&args[1]),
- => std::path::PathBuf::from("../data/measurements.txt"),
}
}
fn skip_first_line(start: u64, mmap_f: &[u8]) -> u64 {
assert!(start > 0);
let prev_char = mmap_f[(start - 1) as usize];
if prev_char == b'\n' {
return 0;
}
let mut new_line_char_pos = start as usize;
loop {
if new_line_char_pos >= mmap_f.len() || mmap_f[new_line_char_pos] == b'\n' {
break;
}
new_line_char_pos += 1;
}
new_line_char_pos as u64 - start + 1
}
fn parse_i32_from_byte_slice(slice: &[u8]) -> i32 {
let mut pos = 0 as usize;
let is_negative = if slice[0] == b'-' {
pos += 1;
true
} else {
false
};
let num = if slice[pos + 1] == b'.' {
(slice[pos] - b'0') as i32 * 10 + (slice[pos + 2] - b'0') as i32
} else {
(slice[pos] - b'0') as i32 * 100
+ (slice[pos + 1] - b'0') as i32 * 10
+ (slice[pos + 3] - b'0') as i32
};
if is_negative {
-num
} else {
num
}
}
fn parse_line(line: &[u8]) -> Option<(&[u8], i32)> {
if line.is_empty() {
return None;
}
let mut semicol_pos = 0 as usize;
loop {
if line[semicol_pos] == b';' {
break;
}
semicol_pos += 1;
}
let location = &line[..semicol_pos];
let temperature_slice = &line[semicol_pos + 1..];
if location.is_empty() || temperature_slice.is_empty() {
return None;
}
// saves ~3s in ~34s total runtime.
let temperature = parse_i32_from_byte_slice(temperature_slice);
Some((location, temperature))
}
fn process_batch(thread_idx: usize, mmap_f: &[u8], start: u64, batch_bytes: u64) -> FastMap {
let mut processed_bytes = 0 as u64;
if thread_idx != 0 {
processed_bytes += skip_first_line(start, mmap_f);
}
let mut m = FastMap::new();
loop {
if processed_bytes >= batch_bytes {
break;
}
let line_start = (start + processed_bytes) as usize;
if line_start >= mmap_f.len() {
break;
}
let mut new_line_char_pos = start + processed_bytes;
loop {
if new_line_char_pos as usize >= mmap_f.len()
|| mmap_f[new_line_char_pos as usize] == b'\n'
{
break;
}
new_line_char_pos += 1;
}
let line_end = new_line_char_pos as usize;
if let Some((location, temperature)) = parse_line(&mmap_f[line_start..line_end]) {
m.update(location, temperature);
}
processed_bytes += (line_end - line_start + 1) as u64;
}
m
}
fn process_in_batches(file_path: &std::path::Path, file_size: u64) -> FastMap {
// Create mmap once and share it across all threads
let f = std::fs::File::open(file_path).unwrap();
let mmap_f = unsafe { memmap2::Mmap::map(&f).unwrap() };
let mmap_arc = std::sync::Arc::new(mmap_f);
let batch_size = (file_size + NUM_THREADS as u64 - 1) / NUM_THREADS as u64;
let mut handles: std::vec::Vec<std::thread::JoinHandle<FastMap>> =
Vec::with_capacity(NUM_THREADS);
for i in 0..NUM_THREADS {
let start = i as u64 * batch_size;
let mmap_clone = mmap_arc.clone();
handles.push(std::thread::spawn(move || {
process_batch(i, &mmap_clone, start, batch_size)
}));
}
let mut m = FastMap::new();
for handle in handles {
let batch_map = handle.join().unwrap();
m.update_batch(&batch_map);
}
m
}
fn process_in_single_batch(file_path: &std::path::Path, file_size: u64) -> FastMap {
let f = std::fs::File::open(file_path).unwrap();
let mmap_f = unsafe { memmap2::Mmap::map(&f).unwrap() };
process_batch(0, &mmap_f, 0, file_size)
}
fn process(file_path: &std::path::Path) -> FastMap {
let f = std::fs::File::open(file_path).unwrap();
let file_size = f.metadata().unwrap().len();
if file_size > 4 * 1024 {
process_in_batches(file_path, file_size)
} else {
process_in_single_batch(file_path, file_size)
}
}
fn main() {
let file_path = get_file_path();
let mut m = process(&file_path);
m.print_sorted();
}
Terminal window
Time (mean ± σ): 32.062 s ± 0.048 s [User: 78.749 s, System: 12.305 s]
Range (min max): 31.996 s … 32.126 s 10 runs

👑 Zig 👑

The single threaded baseline version of zig took ~53 seconds which is pretty impressive but I was left 🤯 after benchmarking the multi-threaded version.

const std = @import("std");
const FILE_PATH = "/root/code/1brc/data/measurements.txt";
const LocationStat = struct {
min: i32,
max: i32,
freq: usize,
sum: i64,
};
fn printResults(m: *std.StringHashMap(LocationStat), arena_alloc: std.mem.Allocator) !void {
var keys_arr = try std.ArrayList([]const u8).initCapacity(arena_alloc, 2 * 1024 * 1024);
var keyIter = m.keyIterator();
while (keyIter.next()) |location_slice| {
try keys_arr.append(arena_alloc, location_slice.*);
}
std.mem.sort([]const u8, keys_arr.items, {}, struct {
fn lessThan(_: void, a: []const u8, b: []const u8) bool {
return std.mem.order(u8, a, b) == .lt;
}
}.lessThan);
var stdout_buffer: [2 * 1024 * 1024]u8 = undefined;
var stdout_writer = std.fs.File.stdout().writer(&stdout_buffer);
const stdout = &stdout_writer.interface;
var first = true;
try stdout.writeAll("{");
for (keys_arr.items) |location| {
const stat = m.get(location).?;
const mean = @as(f64, @floatFromInt(stat.sum)) / @as(f64, @floatFromInt(stat.freq));
if (!first) {
try stdout.writeAll(", ");
}
first = false;
try stdout.print("{s}={d:.1}/{d:.1}/{d:.1}", .{
location,
@as(f64, @floatFromInt(stat.min)) / 10.0,
mean / 10.0,
@as(f64, @floatFromInt(stat.max)) / 10.0,
});
}
try stdout.writeAll("}\n");
try stdout.flush();
}
fn parseLine(line: []const u8) !struct { []const u8, i32 } {
const semicol_pos = std.mem.indexOfScalar(u8, line, ';').?;
const temperature = try std.fmt.parseFloat(f32, line[semicol_pos + 1 ..]);
return .{ line[0..semicol_pos], @as(i32, @intFromFloat(temperature * 10.0)) };
}
fn updateMap(location_slice: []const u8, temperature: i32, m: *std.StringHashMap(LocationStat), arena_alloc: std.mem.Allocator) !void {
const res = try m.getOrPut(location_slice);
if (!res.found_existing) {
const owned_ptr = try arena_alloc.dupe(u8, location_slice);
res.key_ptr.* = owned_ptr;
res.value_ptr.* = LocationStat{
.min = temperature,
.max = temperature,
.freq = 1,
.sum = temperature,
};
} else {
res.value_ptr.min = @min(res.value_ptr.min, temperature);
res.value_ptr.max = @max(res.value_ptr.max, temperature);
res.value_ptr.sum += temperature;
res.value_ptr.freq += 1;
}
}
fn onebrc(file: std.fs.File, arena_alloc: std.mem.Allocator) !void {
var file_buffer: [8192]u8 = undefined;
var reader = file.reader(&file_buffer);
var m = std.StringHashMap(LocationStat).init(arena_alloc);
while (true) {
const line = reader.interface.takeDelimiterExclusive('\n') catch |read_err| {
if (read_err == error.EndOfStream) {
break;
}
return read_err;
};
if (line.len == 0) continue;
const location_slice, const temperature = try parseLine(line);
try updateMap(location_slice, temperature, &m, arena_alloc);
}
try printResults(&m, arena_alloc);
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var arena = std.heap.ArenaAllocator.init(allocator);
defer arena.deinit();
const arena_allocator = arena.allocator();
const args = try std.process.argsAlloc(allocator);
defer std.process.argsFree(allocator, args);
var file_path: []const u8 = undefined;
if (args.len < 2) {
file_path = FILE_PATH;
} else {
file_path = args[1];
}
const file = try std.fs.cwd().openFile(file_path, .{});
defer file.close();
try onebrc(file, arena_allocator);
}

The multi-threaded version without:

  1. Mmap
  2. hand-rolled parser for temperature parsing
  3. custom hashmap with lightweight non-cryptographic hash function

is on par with the with the rust and cpp version with all the above optimizations.

If you haven’t followed the other code or even if you have let me remind you that after implementing the multi-threaded version in cpp and rust with above mentioned optimization is what brought my solution on par with the challenge winning solution which is in java.

and here the zig solution without those optimizations is able to keep up!

const std = @import("std");
const FILE_PATH = "/root/code/1brc/data/measurements.txt";
const NUM_THREADS = 8;
const LocationStat = struct {
min: i32,
max: i32,
freq: usize,
sum: i64,
};
fn printResults(m: *const std.StringHashMap(LocationStat), allocator: std.mem.Allocator) !void {
var keys = std.ArrayList([]const u8){};
defer keys.deinit(allocator);
var key_iter = m.keyIterator();
while (key_iter.next()) |key| {
try keys.append(allocator, key.*);
}
std.mem.sort([]const u8, keys.items, {}, struct {
fn lessThan(_: void, a: []const u8, b: []const u8) bool {
return std.mem.order(u8, a, b) == .lt;
}
}.lessThan);
var stdout_buf: [2 * 1024 * 1024]u8 = undefined;
var stdout_writer = std.fs.File.stdout().writer(&stdout_buf);
const writer = &stdout_writer.interface;
try writer.writeAll("{");
for (keys.items, 0..) |location, i| {
const stat = m.get(location).?;
const mean = @as(f64, @floatFromInt(stat.sum)) / @as(f64, @floatFromInt(stat.freq));
if (i > 0) try writer.writeAll(", ");
try writer.print("{s}={d:.1}/{d:.1}/{d:.1}", .{
location,
@as(f64, @floatFromInt(stat.min)) / 10.0,
mean / 10.0,
@as(f64, @floatFromInt(stat.max)) / 10.0,
});
}
try writer.writeAll("}\n");
try writer.flush();
}
fn parseLine(line: []const u8) !struct { []const u8, i32 } {
const semicol_pos = std.mem.indexOfScalar(u8, line, ';').?;
const temperature = try std.fmt.parseFloat(f32, line[semicol_pos + 1 ..]);
return .{ line[0..semicol_pos], @as(i32, @intFromFloat(temperature * 10.0)) };
}
fn updateMap(location: []const u8, temperature: i32, m: *std.StringHashMap(LocationStat), allocator: std.mem.Allocator) !void {
const entry = try m.getOrPut(location);
if (!entry.found_existing) {
entry.key_ptr.* = try allocator.dupe(u8, location);
entry.value_ptr.* = .{
.min = temperature,
.max = temperature,
.freq = 1,
.sum = temperature,
};
} else {
const stat = entry.value_ptr;
stat.min = @min(stat.min, temperature);
stat.max = @max(stat.max, temperature);
stat.sum += temperature;
stat.freq += 1;
}
}
fn skipToNextLine(file: std.fs.File, start_pos: usize) !u64 {
if (start_pos == 0) return 0;
var buf: [1]u8 = undefined;
var reader = file.reader(&buf);
try reader.seekTo(start_pos - 1);
const prev_char = try reader.interface.takeByte();
if (prev_char == '\n') return 0;
try reader.seekTo(start_pos);
var bytes_skipped: u64 = 0;
while (true) {
const char = try reader.interface.takeByte();
bytes_skipped += 1;
if (char == '\n') break;
}
return bytes_skipped;
}
fn processBatch(
thread_idx: usize,
start_pos: usize,
batch_size: u64,
m: *std.StringHashMap(LocationStat),
file: std.fs.File,
arena_alloc: std.mem.Allocator,
) !void {
var processed_bytes: u64 = 0;
if (thread_idx > 0) {
processed_bytes = try skipToNextLine(file, start_pos);
}
var file_buffer: [8192]u8 = undefined;
var reader = file.reader(&file_buffer);
try reader.seekTo(start_pos + processed_bytes);
while (processed_bytes < batch_size) {
const line = reader.interface.takeDelimiterExclusive('\n') catch |read_err| {
if (read_err == error.EndOfStream) {
break;
}
return read_err;
};
const location_slice, const temperature = try parseLine(line);
try updateMap(location_slice, temperature, m, arena_alloc);
processed_bytes += line.len + 1;
}
}
fn accumulateBatchResults(m: *std.StringHashMap(LocationStat), batch_results: *[NUM_THREADS]std.StringHashMap(LocationStat)) !void {
for (batch_results) |batch_map| {
var key_iter = batch_map.keyIterator();
while (key_iter.next()) |location_slice| {
const result = try m.getOrPut(location_slice.*);
if (!result.found_existing) {
result.key_ptr.* = location_slice.*;
result.value_ptr.* = batch_map.get(location_slice.*).?;
} else {
const batch_stat = batch_map.get(location_slice.*).?;
result.value_ptr.min = @min(result.value_ptr.min, batch_stat.min);
result.value_ptr.max = @max(result.value_ptr.max, batch_stat.max);
result.value_ptr.sum += batch_stat.sum;
result.value_ptr.freq += batch_stat.freq;
}
}
}
}
fn processParallelInMultipleBatches(file_size: u64, file_path: []const u8) !void {
const batch_size = (file_size + NUM_THREADS - 1) / NUM_THREADS;
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const gpa_alloc = gpa.allocator();
var handles: [NUM_THREADS]std.Thread = undefined;
var m: [NUM_THREADS]std.StringHashMap(LocationStat) = undefined;
var arenas: [NUM_THREADS]std.heap.ArenaAllocator = undefined;
var files: [NUM_THREADS]std.fs.File = undefined;
for (0..NUM_THREADS) |i| {
arenas[i] = std.heap.ArenaAllocator.init(gpa_alloc);
m[i] = std.StringHashMap(LocationStat).init(arenas[i].allocator());
files[i] = try std.fs.cwd().openFile(file_path, .{});
}
defer {
for (0..NUM_THREADS) |i| {
m[i].deinit();
arenas[i].deinit();
files[i].close();
}
}
for (0..NUM_THREADS) |i| {
const start_pos = i * batch_size;
handles[i] = try std.Thread.spawn(.{}, processBatch, .{ i, @as(usize, start_pos), batch_size, &m[i], files[i], arenas[i].allocator() });
}
for (0..NUM_THREADS) |i| {
handles[i].join();
}
var final_arena = std.heap.ArenaAllocator.init(gpa_alloc);
defer final_arena.deinit();
var final_map = std.StringHashMap(LocationStat).init(final_arena.allocator());
defer final_map.deinit();
try accumulateBatchResults(&final_map, &m);
try printResults(&final_map, final_arena.allocator());
}
fn processInSingleBatch(batch_size: u64, file_path: []const u8) !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const gpa_alloc = gpa.allocator();
var arena = std.heap.ArenaAllocator.init(gpa_alloc);
defer arena.deinit();
const arena_alloc = arena.allocator();
var m = std.StringHashMap(LocationStat).init(arena_alloc);
const file = try std.fs.cwd().openFile(file_path, .{});
defer file.close();
try processBatch(0, 0, batch_size, &m, file, arena_alloc);
try printResults(&m, arena_alloc);
}
fn onebrc(file_path: []const u8) !void {
const file = try std.fs.cwd().openFile(file_path, .{});
defer file.close();
const file_size = (try file.stat()).size;
if (file_size > 4 * 1024) {
try processParallelInMultipleBatches(file_size, file_path);
} else {
try processInSingleBatch(file_size, file_path);
}
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
const args = try std.process.argsAlloc(allocator);
defer std.process.argsFree(allocator, args);
const file_path = if (args.len < 2) FILE_PATH else args[1];
try onebrc(file_path);
}
Terminal window
Time (mean ± σ): 31.507 s ± 0.149 s [User: 78.501 s, System: 14.278 s]
Range (min max): 31.415 s … 31.926 s 10 runs

I am surprised to begin with but I think I might have an idea of what’s going on and if i’m right it’s because of Arena allocator??

I went on to trying the “fully optimized” solution with custom HashMap with open-address, non-cryptographic hash function and linear probing in case of collisions and a load factor well 0.5, Using Mmap and custom int parsing and for some reason did about ever so slightly worse.

I was surprised again but not so pleasantly this time. Why?? and what’s going on??

The custom HashMap and the int parsing are definately supposed to do better so maybe the byte by byte traversing using mmap is the culprit with CPU having to deal with a ton of page faults.

So for a sanity check I wrote a version with the same HashMap and parsing optimisation but using buffered reads from the file.

and… 🥁🥁🥁

Terminal window
Time (mean ± σ): 26.683 s ± 0.100 s [User: 60.712 s, System: 11.416 s]
Range (min max): 26.492 s … 26.791 s 10 runs

Yup, turns out working with mmap over multiple threads does more harm than good because of page faults. But this is my machine with just 16 GB of RAM (i’m poor) and the mmap will almost certainly be faster with more memory available.