From 72590e48b391d658fa831dab18b51d6cb79e48b9 Mon Sep 17 00:00:00 2001 From: Techatrix Date: Mon, 10 Nov 2025 02:53:08 +0100 Subject: [PATCH 1/2] std.Io.Threaded: fix incorrect alignment of trailing data in closure --- lib/std/Io/Threaded.zig | 52 +++++++++++++++++++++--------------- lib/std/Io/Threaded/test.zig | 50 ++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 21 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 11f9b149caf5..0d6192dd9870 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -388,7 +388,8 @@ const AsyncClosure = struct { reset_event: ResetEvent, select_condition: ?*ResetEvent, context_alignment: std.mem.Alignment, - result_offset: usize, + result_alignment: std.mem.Alignment, + result_offset_before_padding: usize, const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent)); @@ -420,12 +421,12 @@ const AsyncClosure = struct { fn resultPointer(ac: *AsyncClosure) [*]u8 { const base: [*]u8 = @ptrCast(ac); - return base + ac.result_offset; + return @ptrFromInt(ac.result_alignment.forward(@intFromPtr(base + ac.result_offset_before_padding))); } fn contextPointer(ac: *AsyncClosure) [*]u8 { const base: [*]u8 = @ptrCast(ac); - return base + ac.context_alignment.forward(@sizeOf(AsyncClosure)); + return @ptrFromInt(ac.context_alignment.forward(@intFromPtr(base + @sizeOf(AsyncClosure)))); } fn waitAndFree(ac: *AsyncClosure, gpa: Allocator, result: []u8) void { @@ -436,7 +437,9 @@ const AsyncClosure = struct { fn free(ac: *AsyncClosure, gpa: Allocator, result_len: usize) void { const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(ac); - gpa.free(base[0 .. ac.result_offset + result_len]); + const result_offset_with_padding = ac.result_alignment.forward(ac.result_offset_before_padding); + const allocated_size = result_offset_with_padding + result_len; + gpa.free(base[0..allocated_size]); } }; @@ -460,13 +463,16 @@ fn async( }; }; const gpa = t.allocator; - const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); - const result_offset = result_alignment.forward(context_offset + context.len); - const n = result_offset + result.len; - const ac: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch { + const context_offset_before_padding = @alignOf(AsyncClosure) + @sizeOf(AsyncClosure); + const context_offset_with_padding = context_alignment.forward(context_offset_before_padding); + const result_offset_before_padding = context_offset_with_padding + context.len; + const result_offset_with_padding = result_alignment.forward(result_offset_before_padding); + const allocated_size = result_offset_with_padding + result.len; + const ac_bytes = gpa.alignedAlloc(u8, .of(AsyncClosure), allocated_size) catch { start(context.ptr, result.ptr); return null; - })); + }; + const ac: *AsyncClosure = @ptrCast(@alignCast(ac_bytes)); ac.* = .{ .closure = .{ @@ -476,7 +482,8 @@ fn async( }, .func = start, .context_alignment = context_alignment, - .result_offset = result_offset, + .result_alignment = result_alignment, + .result_offset_before_padding = result_offset_before_padding, .reset_event = .unset, .select_condition = null, }; @@ -531,10 +538,12 @@ fn concurrent( const t: *Threaded = @ptrCast(@alignCast(userdata)); const cpu_count = t.cpu_count catch 1; const gpa = t.allocator; - const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); - const result_offset = result_alignment.forward(context_offset + context.len); - const n = result_offset + result_len; - const ac_bytes = gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch + const context_offset_before_padding = @alignOf(AsyncClosure) + @sizeOf(AsyncClosure); + const context_offset_with_padding = context_alignment.forward(context_offset_before_padding); + const result_offset_before_padding = context_offset_with_padding + context.len; + const result_offset_with_padding = result_alignment.forward(result_offset_before_padding); + const allocated_size = result_offset_with_padding + result_len; + const ac_bytes = gpa.alignedAlloc(u8, .of(AsyncClosure), allocated_size) catch return error.ConcurrencyUnavailable; const ac: *AsyncClosure = @ptrCast(@alignCast(ac_bytes)); @@ -546,7 +555,8 @@ fn concurrent( }, .func = start, .context_alignment = context_alignment, - .result_offset = result_offset, + .result_alignment = result_alignment, + .result_offset_before_padding = result_offset_before_padding, .reset_event = .unset, .select_condition = null, }; @@ -580,6 +590,8 @@ fn concurrent( return @ptrCast(ac); } +/// Trailing data: +/// 1. context const GroupClosure = struct { closure: Closure, t: *Threaded, @@ -621,17 +633,15 @@ const GroupClosure = struct { gpa.free(base[0..contextEnd(gc.context_alignment, gc.context_len)]); } - fn contextOffset(context_alignment: std.mem.Alignment) usize { - return context_alignment.forward(@sizeOf(GroupClosure)); - } - fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize { - return contextOffset(context_alignment) + context_len; + const context_offset_before_padding = @alignOf(GroupClosure) + @sizeOf(GroupClosure); + const context_offset_with_padding = context_alignment.forward(context_offset_before_padding); + return context_offset_with_padding + context_len; } fn contextPointer(gc: *GroupClosure) [*]u8 { const base: [*]u8 = @ptrCast(gc); - return base + contextOffset(gc.context_alignment); + return @ptrFromInt(gc.context_alignment.forward(@intFromPtr(base + @sizeOf(GroupClosure)))); } const sync_is_waiting: usize = 1 << 0; diff --git a/lib/std/Io/Threaded/test.zig b/lib/std/Io/Threaded/test.zig index ef24b25f3496..b9872f8b19f2 100644 --- a/lib/std/Io/Threaded/test.zig +++ b/lib/std/Io/Threaded/test.zig @@ -56,3 +56,53 @@ test "concurrent vs concurrent prevents deadlock via oversubscription" { getter.await(io); putter.await(io); } + +fn paramWithExtraAlignment(param: Align64) void { + assert(param.data == 3); +} + +fn returnValueWithExtraAlignment() Align64 { + return .{ .data = 5 }; +} + +const Align64 = struct { + data: u8 align(64), +}; + +test "async closure where result or context has extra alignment" { + // A fixed buffer allocator is used instead of `std.testing.allocator` to + // not get memory that has better alignment than requested. + var buffer: [1024]u8 align(64) = undefined; + var fba: std.heap.FixedBufferAllocator = .init(buffer[1..]); + + var threaded: std.Io.Threaded = .init(fba.allocator()); + defer threaded.deinit(); + const io = threaded.io(); + + { + var future = io.async(paramWithExtraAlignment, .{.{ .data = 3 }}); + future.await(io); + } + + { + var future = io.async(returnValueWithExtraAlignment, .{}); + const result = future.await(io); + try std.testing.expectEqual(5, result.data); + } +} + +test "group closure where context has extra alignment" { + // A fixed buffer allocator is used instead of `std.testing.allocator` to + // not get memory that has better alignment than requested. + var buffer: [1024]u8 align(64) = undefined; + var fba: std.heap.FixedBufferAllocator = .init(buffer[1..]); + + var threaded: std.Io.Threaded = .init(fba.allocator()); + defer threaded.deinit(); + const io = threaded.io(); + + var group: std.Io.Group = .init; + defer group.cancel(io); + + group.async(io, paramWithExtraAlignment, .{.{ .data = 3 }}); +} From f90001dbdf1ca1919d2e647d4ddee69036d5aef0 Mon Sep 17 00:00:00 2001 From: Techatrix Date: Sun, 9 Nov 2025 21:43:20 +0100 Subject: [PATCH 2/2] std.Io: fix calls on functions that return an array type --- lib/std/Io.zig | 14 +++++++------- lib/std/Io/Threaded/test.zig | 16 ++++++++++++++++ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 810a7a8102b2..ffc614841787 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -990,7 +990,7 @@ pub fn Future(Result: type) type { /// Idempotent. Not threadsafe. pub fn cancel(f: *@This(), io: Io) Result { const any_future = f.any_future orelse return f.result; - io.vtable.cancel(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result)); + io.vtable.cancel(io.userdata, any_future, @ptrCast(&f.result), .of(Result)); f.any_future = null; return f.result; } @@ -998,7 +998,7 @@ pub fn Future(Result: type) type { /// Idempotent. Not threadsafe. pub fn await(f: *@This(), io: Io) Result { const any_future = f.any_future orelse return f.result; - io.vtable.await(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result)); + io.vtable.await(io.userdata, any_future, @ptrCast(&f.result), .of(Result)); f.any_future = null; return f.result; } @@ -1034,7 +1034,7 @@ pub const Group = struct { @call(.auto, function, args_casted.*); } }; - io.vtable.groupAsync(io.userdata, g, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start); + io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start); } /// Blocks until all tasks of the group finish. During this time, @@ -1111,7 +1111,7 @@ pub fn Select(comptime U: type) type { } }; _ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic); - s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start); + s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&args), .of(Args), TypeErased.start); } /// Blocks until another task of the select finishes. @@ -1539,9 +1539,9 @@ pub fn async( var future: Future(Result) = undefined; future.any_future = io.vtable.async( io.userdata, - @ptrCast((&future.result)[0..1]), + @ptrCast(&future.result), .of(Result), - @ptrCast((&args)[0..1]), + @ptrCast(&args), .of(Args), TypeErased.start, ); @@ -1580,7 +1580,7 @@ pub fn concurrent( io.userdata, @sizeOf(Result), .of(Result), - @ptrCast((&args)[0..1]), + @ptrCast(&args), .of(Args), TypeErased.start, ); diff --git a/lib/std/Io/Threaded/test.zig b/lib/std/Io/Threaded/test.zig index b9872f8b19f2..48e6d0d576a9 100644 --- a/lib/std/Io/Threaded/test.zig +++ b/lib/std/Io/Threaded/test.zig @@ -106,3 +106,19 @@ test "group closure where context has extra alignment" { group.async(io, paramWithExtraAlignment, .{.{ .data = 3 }}); } + +fn returnArray() [32]u8 { + return @splat(5); +} + +test "async on function with array parameter or return type" { + var threaded: std.Io.Threaded = .init(std.testing.allocator); + defer threaded.deinit(); + const io = threaded.io(); + + var future = io.async(returnArray, .{}); + const result = future.await(io); + for (result) |actual| { + try std.testing.expectEqual(5, actual); + } +}