Skip to content

Commit 9328bed

Browse files
committed
create PushMatch assertion
1 parent 23a2ae9 commit 9328bed

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed

Src/FluentAssertions.Reactive/ReactiveAssertions.cs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Linq.Expressions;
45
using System.Reactive;
56
using System.Reactive.Linq;
67
using System.Reactive.Threading.Tasks;
@@ -9,6 +10,7 @@
910
using FluentAssertions.Execution;
1011
using FluentAssertions.Primitives;
1112
using FluentAssertions.Specialized;
13+
using JetBrains.Annotations;
1214
using Microsoft.Reactive.Testing;
1315

1416
namespace FluentAssertions.Reactive
@@ -248,6 +250,98 @@ public AndConstraint<ReactiveAssertions<TPayload>> NotComplete(TimeSpan timeout,
248250
public AndConstraint<ReactiveAssertions<TPayload>> NotComplete(string because = "", params object[] becauseArgs)
249251
=> NotComplete(TimeSpan.FromMilliseconds(100), because, becauseArgs);
250252

253+
254+
/// <summary>
255+
/// Asserts that at least one notification matching <paramref name="predicate"/> was pushed to the <see cref="FluentTestObserver{TPayload}"/>
256+
/// within the specified <paramref name="timeout"/>.<br />
257+
/// This includes any previously recorded notifications since it has been created or cleared.
258+
/// </summary>
259+
/// <param name="predicate">A predicate to match the items in the collection against.</param>
260+
/// <param name="timeout">the maximum time to wait for the notification to arrive</param>
261+
/// <param name="because">
262+
/// A formatted phrase as is supported by <see cref="string.Format(string,object[])" /> explaining why the assertion
263+
/// is needed. If the phrase does not start with the word <i>because</i>, it is prepended automatically.
264+
/// </param>
265+
/// <param name="becauseArgs">
266+
/// Zero or more objects to format using the placeholders in <paramref name="because"/>.
267+
/// </param>
268+
/// <exception cref="ArgumentNullException"><paramref name="predicate"/> is <c>null</c>.</exception>
269+
public AndConstraint<ReactiveAssertions<TPayload>> PushMatch([NotNull] Expression<Func<TPayload, bool>> predicate, TimeSpan timeout, string because = "", params object[] becauseArgs)
270+
{
271+
if (predicate == null) throw new ArgumentNullException(nameof(predicate));
272+
273+
IList<TPayload> notifications = new List<TPayload>();
274+
AssertionScope assertion = Execute.Assertion
275+
.WithExpectation("Expected {context:observable} {0} to push an item matching {1}{reason}", Subject, predicate.Body)
276+
.BecauseOf(because, becauseArgs);
277+
278+
try
279+
{
280+
Func<TPayload, bool> func = predicate.Compile();
281+
notifications = Observer.RecordedNotificationStream
282+
.Select(r => r.Value)
283+
.Dematerialize()
284+
.Where(func)
285+
.Take(1)
286+
.Timeout(timeout)
287+
.Catch<TPayload, TimeoutException>(exception => Observable.Empty<TPayload>())
288+
.ToList()
289+
.ToTask()
290+
.ExecuteInDefaultSynchronizationContext();
291+
}
292+
catch (Exception e)
293+
{
294+
if (e is AggregateException aggregateException)
295+
e = aggregateException.InnerException;
296+
assertion.FailWith(", but it failed with a {0}.", e);
297+
}
298+
299+
assertion
300+
.ForCondition(notifications.Any())
301+
.FailWith(" within {0}.", timeout);
302+
303+
return new AndConstraint<ReactiveAssertions<TPayload>>(this);
304+
}
305+
306+
/// <inheritdoc cref="PushMatch"/>
307+
public async Task<AndConstraint<ReactiveAssertions<TPayload>>> PushMatchAsync([NotNull] Expression<Func<TPayload, bool>> predicate, TimeSpan timeout,
308+
string because = "", params object[] becauseArgs)
309+
{
310+
if (predicate == null)
311+
throw new ArgumentNullException(nameof(predicate));
312+
313+
IList<TPayload> notifications = new List<TPayload>();
314+
AssertionScope assertion = Execute.Assertion
315+
.WithExpectation("Expected {context:observable} {0} to push an item matching {1}{reason}", Subject, predicate.Body)
316+
.BecauseOf(because, becauseArgs);
317+
318+
try
319+
{
320+
Func<TPayload, bool> func = predicate.Compile();
321+
notifications = await Observer.RecordedNotificationStream
322+
.Select(r => r.Value)
323+
.Dematerialize()
324+
.Where(func)
325+
.Take(1)
326+
.Timeout(timeout)
327+
.Catch<TPayload, TimeoutException>(exception => Observable.Empty<TPayload>())
328+
.ToList()
329+
.ToTask().ConfigureAwait(false);
330+
}
331+
catch (Exception e)
332+
{
333+
if (e is AggregateException aggregateException)
334+
e = aggregateException.InnerException;
335+
assertion.FailWith(", but it failed with a {0}.", e);
336+
}
337+
338+
assertion
339+
.ForCondition(notifications.Any())
340+
.FailWith(" within {0}.", timeout);
341+
342+
return new AndWhichConstraint<ReactiveAssertions<TPayload>, IEnumerable<TPayload>>(this, notifications);
343+
}
344+
251345
protected Task<IList<Recorded<Notification<TPayload>>>> GetRecordedNotifications(TimeSpan timeout) =>
252346
Observer.RecordedNotificationStream
253347
.TakeUntil(recorded => recorded.Value.Kind == NotificationKind.OnError)

0 commit comments

Comments
 (0)