Skip to content

Commit

Permalink
Fallback: cleanups and kludge for darwin /dev/tty
Browse files Browse the repository at this point in the history
  • Loading branch information
Cloudef committed Jun 26, 2024
1 parent 78b1358 commit be8e2b3
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 72 deletions.
121 changes: 59 additions & 62 deletions src/aio/Fallback.zig
Original file line number Diff line number Diff line change
@@ -1,53 +1,44 @@
const builtin = @import("builtin");
const std = @import("std");
const posix = @import("common/posix.zig");
const aio = @import("../aio.zig");
const Operation = @import("ops.zig").Operation;
const Pool = @import("common/types.zig").Pool;
const FixedArrayList = @import("common/types.zig").FixedArrayList;
const posix = @import("common/posix.zig");
const DoubleBufferedFixedArrayList = @import("common/types.zig").DoubleBufferedFixedArrayList;

// This mess of a code just shows how much io_uring was needed

fn debug(comptime fmt: []const u8, args: anytype) void {
if (@import("builtin").is_test) {
std.debug.print("fallback: " ++ fmt ++ "\n", args);
} else {
if (comptime !aio.options.debug) return;
const log = std.log.scoped(.fallback);
log.debug(fmt, args);
}
}

pub const EventSource = posix.EventSource;

const Result = struct { failure: Operation.Error, id: u16 };

source: EventSource,
tpool: *std.Thread.Pool,
ops: Pool(Operation.Union, u16),
next: []u16,
readiness: []posix.Readiness,
link_lock: std.DynamicBitSetUnmanaged,
started: std.DynamicBitSetUnmanaged,
pending: std.DynamicBitSetUnmanaged,
pfd: FixedArrayList(posix.pollfd, u32),
prev_id: ?u16 = null, // for linking operations
finished: FixedArrayList(Result, u16),
finished_mutex: std.Thread.Mutex = .{},
// copied on completition
finished_copy: FixedArrayList(Result, u16),
next: []u16, // linked operation, points to self if none
readiness: []posix.Readiness, // readiness fd that gets polled before we perform the operation
link_lock: std.DynamicBitSetUnmanaged, // operation is waiting for linked operation to finish first
pending: std.DynamicBitSetUnmanaged, // operation is pending on readiness fd (poll)
started: std.DynamicBitSetUnmanaged, // operation has been queued, it's being performed if pending is false
pfd: FixedArrayList(posix.pollfd, u32), // current fds that we must poll for wakeup
tpool: *std.Thread.Pool, // thread pool for performing operations, not all operations will be performed here
source: EventSource, // when threads finish, they signal it using this event source
finished: DoubleBufferedFixedArrayList(Result, u16), // operations that are finished, double buffered to be thread safe

pub fn isSupported(_: []const type) bool {
return true; // very optimistic :D
}

fn minThreads() u32 {
// might need this on BSD too.
// it's not a great solution, but this at least lets apps that poll /dev/tty
// work with one dedicated thread given for it.
// unfortunately pselect/select which will work on /dev/tty are just too annoying
// to try and kludge into this backend.
return if (builtin.target.isDarwin()) 2 else 1;
}

pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
var source = try EventSource.init();
errdefer source.deinit();
var tpool = try allocator.create(std.Thread.Pool);
tpool.init(.{ .allocator = allocator, .n_jobs = aio.options.num_threads }) catch |err| return switch (err) {
error.LockedMemoryLimitExceeded, error.ThreadQuotaExceeded => error.SystemResources,
else => |e| e,
};
var ops = try Pool(Operation.Union, u16).init(allocator, n);
errdefer ops.deinit(allocator);
const next = try allocator.alloc(u16, n);
Expand All @@ -56,28 +47,35 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
errdefer allocator.free(readiness);
var link_lock = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n);
errdefer link_lock.deinit(allocator);
var started = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n);
errdefer started.deinit(allocator);
var pending = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n);
errdefer pending.deinit(allocator);
var started = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n);
errdefer started.deinit(allocator);
var pfd = try FixedArrayList(posix.pollfd, u32).init(allocator, n + 1);
errdefer pfd.deinit(allocator);
var finished = try FixedArrayList(Result, u16).init(allocator, n);
var tpool = try allocator.create(std.Thread.Pool);
errdefer allocator.destroy(tpool);
const thread_count: u32 = aio.options.num_threads orelse @intCast(@max(minThreads(), std.Thread.getCpuCount() catch 1));
tpool.init(.{ .allocator = allocator, .n_jobs = thread_count }) catch |err| return switch (err) {
error.LockedMemoryLimitExceeded, error.ThreadQuotaExceeded => error.SystemResources,
else => |e| e,
};
errdefer tpool.deinit();
var source = try EventSource.init();
errdefer source.deinit();
var finished = try DoubleBufferedFixedArrayList(Result, u16).init(allocator, n);
errdefer finished.deinit(allocator);
var finished_copy = try FixedArrayList(Result, u16).init(allocator, n);
errdefer finished_copy.deinit(allocator);
return .{
.source = source,
.tpool = tpool,
.ops = ops,
.next = next,
.readiness = readiness,
.link_lock = link_lock,
.started = started,
.pending = pending,
.started = started,
.pfd = pfd,
.tpool = tpool,
.source = source,
.finished = finished,
.finished_copy = finished_copy,
};
}

Expand All @@ -88,14 +86,13 @@ pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
while (iter.next()) |e| uopUnwrapCall(e.v, posix.closeReadiness, .{self.readiness[e.k]});
self.ops.deinit(allocator);
allocator.free(self.next);
allocator.free(self.readiness);
self.link_lock.deinit(allocator);
self.started.deinit(allocator);
self.pending.deinit(allocator);
allocator.free(self.readiness);
self.started.deinit(allocator);
self.pfd.deinit(allocator);
self.finished.deinit(allocator);
self.finished_copy.deinit(allocator);
self.source.deinit();
self.finished.deinit(allocator);
self.* = undefined;
}

Expand Down Expand Up @@ -144,7 +141,7 @@ pub fn queue(self: *@This(), comptime len: u16, work: anytype, cb: ?aio.Dynamic.
} else {
var ids: std.BoundedArray(u16, len) = .{};
errdefer for (ids.constSlice()) |id| self.removeOp(id);
inline for (&work.ops) |*op| ids.append(try self.queueOperation(op)) catch unreachable;
inline for (&work.ops) |*op| ids.append(try self.queueOperation(op)) catch return error.SubmissionQueueFull;
if (cb) |f| for (ids.constSlice()) |id| f(self.ops.nodes[id].used, @enumFromInt(id));
}
}
Expand Down Expand Up @@ -207,15 +204,13 @@ pub fn immediate(comptime len: u16, work: anytype) aio.Error!u16 {
}

fn finish(self: *@This(), id: u16, failure: Operation.Error) void {
defer self.source.notify();
self.finished_mutex.lock();
defer self.finished_mutex.unlock();
debug("finish: {} {}", .{ id, failure });
for (self.finished.items[0..self.finished.len]) |*i| if (i.id == id) {
i.* = .{ .id = id, .failure = failure };
return;
};
// for (self.finished.items[0..self.finished.len]) |*i| if (i.id == id) {
// i.* = .{ .id = id, .failure = failure };
// return;
// };
self.finished.add(.{ .id = id, .failure = failure }) catch unreachable;
self.source.notify();
}

fn cancel(self: *@This(), id: u16) enum { in_progress, not_found, ok } {
Expand Down Expand Up @@ -346,16 +341,9 @@ fn completition(op: anytype, self: *@This(), res: Result) void {
}

fn handleFinished(self: *@This(), cb: ?aio.Dynamic.CompletionCallback) aio.CompletionResult {
{
self.finished_mutex.lock();
defer self.finished_mutex.unlock();
@memcpy(self.finished_copy.items[0..self.finished.items.len], self.finished.items[0..self.finished.items.len]);
self.finished_copy.len = self.finished.len;
self.finished.reset();
}

const finished = self.finished.swap();
var num_errors: u16 = 0;
for (self.finished_copy.items[0..self.finished_copy.len]) |res| {
for (finished) |res| {
if (res.failure != error.Success) {
debug("complete: {}: {} [FAIL] {}", .{ res.id, std.meta.activeTag(self.ops.nodes[res.id].used), res.failure });
} else {
Expand All @@ -374,8 +362,7 @@ fn handleFinished(self: *@This(), cb: ?aio.Dynamic.CompletionCallback) aio.Compl
self.removeOp(res.id);
if (cb) |f| f(uop, @enumFromInt(res.id), res.failure != error.Success);
}

return .{ .num_completed = self.finished_copy.len, .num_errors = num_errors };
return .{ .num_completed = @intCast(finished.len), .num_errors = num_errors };
}

fn uopUnwrapCall(uop: *Operation.Union, comptime func: anytype, args: anytype) @typeInfo(@TypeOf(func)).Fn.return_type.? {
Expand All @@ -384,3 +371,13 @@ fn uopUnwrapCall(uop: *Operation.Union, comptime func: anytype, args: anytype) @
}
unreachable;
}

fn debug(comptime fmt: []const u8, args: anytype) void {
if (@import("builtin").is_test) {
std.debug.print("fallback: " ++ fmt ++ "\n", args);
} else {
if (comptime !aio.options.debug) return;
const log = std.log.scoped(.fallback);
log.debug(fmt, args);
}
}
18 changes: 14 additions & 4 deletions src/aio/common/posix.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const Operation = @import("../ops.zig").Operation;
const windows = @import("windows.zig");

pub const RENAME_NOREPLACE = 1 << 0;
pub const PIDFD_NONBLOCK = @as(usize, 1 << @bitOffsetOf(std.posix.O, "NONBLOCK"));
pub const O_NONBLOCK = @as(usize, 1 << @bitOffsetOf(std.posix.O, "NONBLOCK"));

const EventFd = struct {
fd: std.posix.fd_t,
Expand Down Expand Up @@ -275,8 +275,18 @@ pub inline fn openReadiness(op: anytype) OpenReadinessError!Readiness {
return switch (comptime Operation.tagFromPayloadType(@TypeOf(op.*))) {
.nop => .{},
.fsync => .{},
.write => .{ .fd = op.file.handle, .mode = .out },
.read => .{ .fd = op.file.handle, .mode = .in },
.write => blk: {
if (builtin.target.isDarwin() and std.posix.isatty(op.file.handle)) {
return .{}; // nice :D will block one thread
}
break :blk .{ .fd = op.file.handle, .mode = .out };
},
.read => blk: {
if (builtin.target.isDarwin() and std.posix.isatty(op.file.handle)) {
return .{}; // nice :D will block one thread
}
break :blk .{ .fd = op.file.handle, .mode = .in };
},
.accept, .recv => .{ .fd = op.socket, .mode = .in },
.socket, .connect => .{},
.send => .{ .fd = op.socket, .mode = .out },
Expand All @@ -301,7 +311,7 @@ pub inline fn openReadiness(op: anytype) OpenReadinessError!Readiness {
if (builtin.target.os.tag == .windows) {
@panic("fixme");
} else if (comptime @hasDecl(std.posix.system, "pidfd_open")) {
const res = std.posix.system.pidfd_open(op.child, PIDFD_NONBLOCK);
const res = std.posix.system.pidfd_open(op.child, O_NONBLOCK);
const e = std.posix.errno(res);
if (e != .SUCCESS) return switch (e) {
.INVAL, .SRCH => unreachable,
Expand Down
52 changes: 46 additions & 6 deletions src/aio/common/types.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ pub fn FixedArrayList(T: type, SZ: type) type {
items: []T,
len: SZ = 0,

pub const Error = error{
OutOfMemory,
};
pub const Error = error{OutOfMemory};

pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() {
return .{ .items = try allocator.alloc(T, n) };
Expand All @@ -30,6 +28,50 @@ pub fn FixedArrayList(T: type, SZ: type) type {
};
}

pub fn DoubleBufferedFixedArrayList(T: type, SZ: type) type {
return struct {
mutex: std.Thread.Mutex = .{},
safe: FixedArrayList(T, SZ),
copy: []T align(std.atomic.cache_line),

pub const Error = error{OutOfMemory};

pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() {
var safe = try FixedArrayList(T, SZ).init(allocator, n);
errdefer safe.deinit(allocator);
const copy = try allocator.alloc(T, n);
errdefer allocator.free(copy);
return .{ .safe = safe, .copy = copy };
}

pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
self.safe.deinit(allocator);
allocator.free(self.copy);
self.* = undefined;
}

pub fn add(self: *@This(), item: T) Error!void {
self.mutex.lock();
defer self.mutex.unlock();
try self.safe.add(item);
}

pub fn reset(self: *@This()) void {
self.mutex.lock();
defer self.mutex.unlock();
self.safe.reset();
}

pub fn swap(self: *@This()) []const T {
self.mutex.lock();
defer self.mutex.unlock();
defer self.safe.reset();
@memcpy(self.copy[0..self.safe.len], self.safe.items[0..self.safe.len]);
return self.copy[0..self.safe.len];
}
};
}

pub fn Pool(T: type, SZ: type) type {
return struct {
pub const Node = union(enum) { free: ?SZ, used: T };
Expand All @@ -38,9 +80,7 @@ pub fn Pool(T: type, SZ: type) type {
num_free: SZ = 0,
num_used: SZ = 0,

pub const Error = error{
OutOfMemory,
};
pub const Error = error{OutOfMemory};

pub fn init(allocator: std.mem.Allocator, n: SZ) Error!@This() {
return .{ .nodes = try allocator.alloc(Node, n) };
Expand Down

0 comments on commit be8e2b3

Please sign in to comment.