Skip to content

Commit

Permalink
coro: s/ReturnType/@typeof(@call(...))/
Browse files Browse the repository at this point in the history
This lets deduce generic functions return value. ReturnType is still
used in few places as it would be hard to deduce the arguments in some
contexts ... 🤔
  • Loading branch information
Cloudef committed Sep 13, 2024
1 parent 54d7018 commit 24e2fb4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
9 changes: 4 additions & 5 deletions src/coro/Scheduler.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ const aio = @import("aio");
const io = @import("io.zig");
const Frame = @import("Frame.zig");
const Task = @import("Task.zig");
const ReturnType = @import("minilib").ReturnType;
const options = @import("../coro.zig").options;

allocator: std.mem.Allocator,
Expand Down Expand Up @@ -51,7 +50,6 @@ pub const SpawnOptions = struct {
/// Spawns a new task, the task may do local IO operations which will not block the whole process using the `io` namespace functions
/// Call `task.complete` to collect the result and free the stack
/// Or alternatively `task.cancel` to cancel the task
/// Normally one would use the `spawn` method, but in case a generic functions return type can't be deduced, use this any variant.
pub fn spawnAny(self: *@This(), Result: type, comptime func: anytype, args: anytype, opts: SpawnOptions) SpawnError!Task {
if (self.state == .tear_down) return error.Unexpected;

Expand All @@ -68,9 +66,10 @@ pub fn spawnAny(self: *@This(), Result: type, comptime func: anytype, args: anyt
/// Spawns a new task, the task may do local IO operations which will not block the whole process using the `io` namespace functions
/// Call `task.complete` to collect the result and free the stack
/// Or alternatively `task.cancel` to cancel the task
pub fn spawn(self: *@This(), comptime func: anytype, args: anytype, opts: SpawnOptions) SpawnError!Task.Generic(ReturnType(func)) {
var task = try self.spawnAny(ReturnType(func), func, args, opts);
return task.generic(ReturnType(func));
pub fn spawn(self: *@This(), comptime func: anytype, args: anytype, opts: SpawnOptions) SpawnError!Task.Generic(@TypeOf(@call(.auto, func, args))) {
const RT = @TypeOf(@call(.auto, func, args));
var task = try self.spawnAny(RT, func, args, opts);
return task.generic(RT);
}

/// Step the scheduler by a single step.
Expand Down
22 changes: 13 additions & 9 deletions src/coro/ThreadPool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ const Scheduler = @import("Scheduler.zig");
const Task = @import("Task.zig");
const Frame = @import("Frame.zig");
const DynamicThreadPool = @import("minilib").DynamicThreadPool;
const ReturnType = @import("minilib").ReturnType;
const ReturnTypeMixedWithErrorSet = @import("minilib").ReturnTypeMixedWithErrorSet;
const MixErrorUnionWithErrorSet = @import("minilib").MixErrorUnionWithErrorSet;

pool: DynamicThreadPool,
source: aio.EventSource,
Expand All @@ -33,9 +32,13 @@ pub const CancellationToken = struct {
canceled: bool = false,
};

fn entrypoint(self: *@This(), completed: *std.atomic.Value(bool), token: *CancellationToken, comptime func: anytype, res: anytype, args: anytype) void {
fn hasToken(comptime func: anytype) bool {
const fun_info = @typeInfo(@TypeOf(func)).Fn;
if (fun_info.params.len > 0 and fun_info.params[0].type.? == *const CancellationToken) {
return fun_info.params.len > 0 and fun_info.params[0].type.? == *const CancellationToken;
}

fn entrypoint(self: *@This(), completed: *std.atomic.Value(bool), token: *CancellationToken, comptime func: anytype, res: anytype, args: anytype) void {
if (comptime hasToken(func)) {
res.* = @call(.auto, func, .{token} ++ args);
} else {
res.* = @call(.auto, func, args);
Expand All @@ -48,12 +51,12 @@ fn entrypoint(self: *@This(), completed: *std.atomic.Value(bool), token: *Cancel
pub const YieldError = DynamicThreadPool.SpawnError;

/// Yield until `func` finishes on another thread
pub fn yieldForCompletition(self: *@This(), func: anytype, args: anytype, config: DynamicThreadPool.SpawnConfig) ReturnTypeMixedWithErrorSet(func, YieldError) {
pub fn yieldForCompletition(self: *@This(), func: anytype, args: anytype, config: DynamicThreadPool.SpawnConfig) MixErrorUnionWithErrorSet(@TypeOf(@call(.auto, func, if (comptime hasToken(func)) .{&CancellationToken{}} ++ args else args)), YieldError) {
var completed = std.atomic.Value(bool).init(false);
var res: ReturnType(func) = undefined;
_ = self.num_tasks.fetchAdd(1, .monotonic);
defer _ = self.num_tasks.fetchSub(1, .release);
var token: CancellationToken = .{};
var res: @TypeOf(@call(.auto, func, if (comptime hasToken(func)) .{&token} ++ args else args)) = undefined;
try self.pool.spawn(entrypoint, .{ self, &completed, &token, func, &res, args }, config);
while (!completed.load(.acquire)) {
const nerr = io.do(.{
Expand All @@ -76,19 +79,20 @@ pub fn yieldForCompletition(self: *@This(), func: anytype, args: anytype, config
}

/// Spawn a new coroutine which will immediately call `yieldForCompletition` for later collection of the result
/// Normally one would use the `spawnForCompletition` method, but in case a generic functions return type can't be deduced, use this any variant.
pub fn spawnAnyForCompletition(self: *@This(), scheduler: *Scheduler, Result: type, func: anytype, args: anytype, config: DynamicThreadPool.SpawnConfig) Scheduler.SpawnError!Task {
return scheduler.spawnAny(Result, yieldForCompletition, .{ self, func, args, config }, .{ .stack = .{ .managed = 1024 * 24 } });
}

/// Helper for getting the Task.Generic when using spawnForCompletition tasks.
pub fn Generic2(comptime func: anytype) type {
const ReturnTypeMixedWithErrorSet = @import("minilib").ReturnTypeMixedWithErrorSet;
return Task.Generic(ReturnTypeMixedWithErrorSet(func, YieldError));
}

/// Spawn a new coroutine which will immediately call `yieldForCompletition` for later collection of the result
pub fn spawnForCompletition(self: *@This(), scheduler: *Scheduler, func: anytype, args: anytype, config: DynamicThreadPool.SpawnConfig) Scheduler.SpawnError!Generic2(func) {
const Result = ReturnTypeMixedWithErrorSet(func, YieldError);
pub fn spawnForCompletition(self: *@This(), scheduler: *Scheduler, func: anytype, args: anytype, config: DynamicThreadPool.SpawnConfig) Scheduler.SpawnError!Task.Generic(MixErrorUnionWithErrorSet(@TypeOf(@call(.auto, func, if (comptime hasToken(func)) .{&CancellationToken{}} ++ args else args)), YieldError)) {
const RT = @TypeOf(@call(.auto, func, args));
const Result = MixErrorUnionWithErrorSet(RT, YieldError);
const task = try self.spawnAnyForCompletition(scheduler, Result, func, args, config);
return task.generic(Result);
}
Expand Down

0 comments on commit 24e2fb4

Please sign in to comment.