Skip to content

Commit 9fadece

Browse files
committed
Added a new overload for .Synchronize() utilizing the new .NET 9 Lock class.
1 parent 98adf33 commit 9fadece

File tree

4 files changed

+44
-12
lines changed

4 files changed

+44
-12
lines changed

Rx.NET/Source/Directory.Build.targets

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
<PropertyGroup Condition="$(TargetFramework.StartsWith('net6.0-windows')) or $(TargetFramework.StartsWith('net8.0-windows')) or $(TargetFramework.StartsWith('net9.0-windows'))">
2222
<DefineConstants>$(DefineConstants);HAS_WINRT;HAS_WINFORMS;HAS_WPF;HAS_DISPATCHER;DESKTOPCLR;WINDOWS;CSWINRT</DefineConstants>
2323
</PropertyGroup>
24+
<PropertyGroup Condition="$(TargetFramework.StartsWith('net9.0'))">
25+
<DefineConstants>$(DefineConstants);HAS_LOCK_CLASS</DefineConstants>
26+
</PropertyGroup>
2427

2528
<ItemGroup Condition="('$(TargetFramework)' == 'net472' or '$(TargetFramework)' == 'uap10.0.18362' or '$(TargetFramework)' == 'netstandard2.0') and $(IsPackable)">
2629
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />

Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.Synchronize.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT License.
3-
// See the LICENSE file in the project root for more information.
3+
// See the LICENSE file in the project root for more information.
44

55
namespace System.Reactive.Concurrency
66
{
7-
internal sealed class Synchronize<TSource> : Producer<TSource, Synchronize<TSource>._>
7+
internal sealed class Synchronize<TSource, TGate> : Producer<TSource, Synchronize<TSource, TGate>._>
8+
where TGate : notnull, new()
89
{
910
private readonly IObservable<TSource> _source;
10-
private readonly object? _gate;
11+
private readonly TGate? _gate;
1112

12-
public Synchronize(IObservable<TSource> source, object gate)
13+
public Synchronize(IObservable<TSource> source, TGate gate)
1314
{
1415
_source = source;
1516
_gate = gate;
@@ -26,12 +27,12 @@ public Synchronize(IObservable<TSource> source)
2627

2728
internal sealed class _ : IdentitySink<TSource>
2829
{
29-
private readonly object _gate;
30+
private readonly TGate _gate;
3031

31-
public _(Synchronize<TSource> parent, IObserver<TSource> observer)
32+
public _(Synchronize<TSource, TGate> parent, IObserver<TSource> observer)
3233
: base(observer)
3334
{
34-
_gate = parent._gate ?? new object();
35+
_gate = parent._gate ?? new TGate();
3536
}
3637

3738
public override void OnNext(TSource value)

Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT License.
3-
// See the LICENSE file in the project root for more information.
3+
// See the LICENSE file in the project root for more information.
44

55
using System.ComponentModel;
66
using System.Reactive.Disposables;
@@ -229,7 +229,7 @@ public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> sou
229229
throw new ArgumentNullException(nameof(source));
230230
}
231231

232-
return new Synchronize<TSource>(source);
232+
return new Synchronize<TSource, object>(source);
233233
}
234234

235235
/// <summary>
@@ -252,9 +252,34 @@ public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> sou
252252
throw new ArgumentNullException(nameof(gate));
253253
}
254254

255-
return new Synchronize<TSource>(source, gate);
255+
return new Synchronize<TSource, object>(source, gate);
256256
}
257257

258+
#if HAS_LOCK_CLASS
259+
/// <summary>
260+
/// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
261+
/// </summary>
262+
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
263+
/// <param name="source">Source sequence.</param>
264+
/// <param name="gate">Gate object to synchronize each observer call on.</param>
265+
/// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
266+
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is <c>null</c>.</exception>
267+
public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, Lock gate)
268+
{
269+
if (source == null)
270+
{
271+
throw new ArgumentNullException(nameof(source));
272+
}
273+
274+
if (gate == null)
275+
{
276+
throw new ArgumentNullException(nameof(gate));
277+
}
278+
279+
return new Synchronize<TSource, Lock>(source, gate);
280+
}
281+
#endif
282+
258283
#endregion
259284
}
260285
}

Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/SynchronizationTest.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT License.
3-
// See the LICENSE file in the project root for more information.
3+
// See the LICENSE file in the project root for more information.
44

55
using System;
66
using System.Reactive.Concurrency;
@@ -37,7 +37,10 @@ public void Synchronization_Synchronize_ArgumentChecking()
3737
{
3838
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(default(IObservable<int>)));
3939
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(default(IObservable<int>), new object()));
40-
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(DummyObservable<int>.Instance, null));
40+
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(DummyObservable<int>.Instance, null as object));
41+
#if HAS_LOCK_CLASS
42+
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(DummyObservable<int>.Instance, null as Lock));
43+
#endif
4144
}
4245

4346
private class MySyncCtx : SynchronizationContext

0 commit comments

Comments
 (0)