Skip to content

Commit 9f92c3f

Browse files
committed
+Distinct
1 parent 66fce5a commit 9f92c3f

File tree

5 files changed

+235
-0
lines changed

5 files changed

+235
-0
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ finally
6363
- `Collect` - collect items into a custom collection and emit the collection at the end
6464
- `ConcatMap` - concatenate in order the inner async sequences mapped from the main sequence
6565
- `ConcatWith` - concatenate in order with another async sequence
66+
- `Distinct` - makes sure only distinct elements get relayed
67+
- `DistinctUntilChanged` - relays an element only if it is distinct from the previous item
6668
- `Debounce` - wait a bit after each item and emit them if no newer item arrived from the source
6769
- `DefaultIfEmpty` - return a fallback value if the source async sequence turns out to be empty
6870
- `DoOnNext` - execute an action when an item becomes available

async-enumerable-dotnet-test/AsyncEnumerableTest.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ static AsyncEnumerableTest()
212212
Defaults.Add(typeof(Func<long, Exception, bool>), (Func<long, Exception, bool>)((v, w) => false));
213213
Defaults.Add(typeof(Func<long, Exception, Task<bool>>), (Func<long, Exception, Task<bool>>)((v, w) => Task.FromResult(false)));
214214
Defaults.Add(typeof(Func<long, Task<bool>>), (Func<long, Task<bool>>)(v => Task.FromResult(false)));
215+
Defaults.Add(typeof(Func<ISet<int>>), (Func<ISet<int>>)(() => null));
215216

216217
Defaults.Add(typeof(Func<IAsyncEnumerable<int>, IAsyncEnumerable<int>>), (Func<IAsyncEnumerable<int>, IAsyncEnumerable<int>>)(w => w));
217218

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright (c) David Karnok & Contributors.
2+
// Licensed under the Apache 2.0 License.
3+
// See LICENSE file in the project root for full license information.
4+
5+
using Xunit;
6+
using async_enumerable_dotnet;
7+
using System.Collections.Generic;
8+
9+
namespace async_enumerable_dotnet_test
10+
{
11+
public class DistinctTest
12+
{
13+
[Fact]
14+
public async void Empty()
15+
{
16+
await AsyncEnumerable.Empty<int>()
17+
.Distinct()
18+
.AssertResult();
19+
}
20+
21+
[Fact]
22+
public async void Normal()
23+
{
24+
await AsyncEnumerable.Range(1, 5)
25+
.Distinct()
26+
.AssertResult(1, 2, 3, 4, 5);
27+
}
28+
29+
[Fact]
30+
public async void Redundant()
31+
{
32+
await AsyncEnumerable.FromArray(1, 2, 3, 2, 1, 4, 5, 1, 5)
33+
.Distinct()
34+
.AssertResult(1, 2, 3, 4, 5);
35+
}
36+
37+
[Fact]
38+
public async void KeySelector()
39+
{
40+
await AsyncEnumerable.Range(1, 5)
41+
.Distinct(v => v % 3)
42+
.AssertResult(1, 2, 3);
43+
}
44+
45+
[Fact]
46+
public async void EqualityComparer()
47+
{
48+
await AsyncEnumerable.Range(1, 5)
49+
.Distinct(EqualityComparer<int>.Default)
50+
.AssertResult(1, 2, 3, 4, 5);
51+
}
52+
53+
[Fact]
54+
public async void KeySelector_EqualityComparer()
55+
{
56+
await AsyncEnumerable.Range(1, 5)
57+
.Distinct(v => v % 3, EqualityComparer<long>.Default)
58+
.AssertResult(1, 2, 3);
59+
}
60+
61+
[Fact]
62+
public async void Custom_Set()
63+
{
64+
await AsyncEnumerable.Range(1, 5)
65+
.Distinct(v => (v % 3), () => new HashSet<long>())
66+
.AssertResult(1, 2, 3);
67+
}
68+
}
69+
}

async-enumerable-dotnet/AsyncEnumerable.cs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1616,5 +1616,85 @@ public static IAsyncEnumerable<TCollection> Buffer<TSource, TOther, TCollection>
16161616
return new BufferBoundaryExact<TSource, TOther, TCollection>(source, boundary, collectionSupplier, maxSize);
16171617
}
16181618

1619+
/// <summary>
1620+
/// Checks if the source items have been seen before by checking if
1621+
/// they are already successfully added to a HashSet or not, ensuring
1622+
/// only distinct source items pass through.
1623+
/// </summary>
1624+
/// <typeparam name="TSource">The element type of the source and result.</typeparam>
1625+
/// <param name="source">The source to have distinct items only of.</param>
1626+
/// <returns>The new IAsyncEnumerable sequence.</returns>
1627+
public static IAsyncEnumerable<TSource> Distinct<TSource>(this IAsyncEnumerable<TSource> source)
1628+
{
1629+
return Distinct(source, v => v, () => new HashSet<TSource>());
1630+
}
1631+
1632+
/// <summary>
1633+
/// Checks if the source items have been seen before by checking if
1634+
/// they are already successfully added to a HashSet or not, ensuring
1635+
/// only distinct source items pass through.
1636+
/// </summary>
1637+
/// <typeparam name="TSource">The element type of the source and result.</typeparam>
1638+
/// <param name="source">The source to have distinct items only of.</param>
1639+
/// <param name="comparer">The comparer for comparing items in the HashSet</param>
1640+
/// <returns>The new IAsyncEnumerable sequence.</returns>
1641+
public static IAsyncEnumerable<TSource> Distinct<TSource>(this IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
1642+
{
1643+
RequireNonNull(comparer, nameof(comparer));
1644+
return Distinct(source, v => v, () => new HashSet<TSource>(comparer));
1645+
}
1646+
1647+
/// <summary>
1648+
/// Checks if the source items have been seen before by checking if
1649+
/// a key extracted from such items is in a HashSet or not, ensuring
1650+
/// only distinct source items pass through.
1651+
/// </summary>
1652+
/// <typeparam name="TSource">The element type of the source and result.</typeparam>
1653+
/// <typeparam name="TKey">The key type used for comparing items.</typeparam>
1654+
/// <param name="source">The source to have distinct items only of.</param>
1655+
/// <param name="keySelector">The function that receives the source item and should return a key value for distinct comparison.</param>
1656+
/// <returns>The new IAsyncEnumerable sequence.</returns>
1657+
public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector)
1658+
{
1659+
return Distinct(source, keySelector, () => new HashSet<TKey>());
1660+
}
1661+
1662+
/// <summary>
1663+
/// Checks if the source items have been seen before by checking if
1664+
/// a key extracted from such items is in a HashSet or not, ensuring
1665+
/// only distinct source items pass through.
1666+
/// </summary>
1667+
/// <typeparam name="TSource">The element type of the source and result.</typeparam>
1668+
/// <typeparam name="TKey">The key type used for comparing items.</typeparam>
1669+
/// <param name="source">The source to have distinct items only of.</param>
1670+
/// <param name="keySelector">The function that receives the source item and should return a key value for distinct comparison.</param>
1671+
/// <param name="keyComparer">The comparer for comparing keys in the HashSet</param>
1672+
/// <returns>The new IAsyncEnumerable sequence.</returns>
1673+
public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> keyComparer)
1674+
{
1675+
RequireNonNull(keyComparer, nameof(keyComparer));
1676+
return Distinct(source, keySelector, () => new HashSet<TKey>(keyComparer));
1677+
}
1678+
1679+
/// <summary>
1680+
/// Checks if the source items have been seen before by checking
1681+
/// a key extracted from such items agains a set of keys and
1682+
/// drops such items, ensuring that only distinct source items pass
1683+
/// through.
1684+
/// </summary>
1685+
/// <typeparam name="TSource">The element type of the source and result.</typeparam>
1686+
/// <typeparam name="TKey">The key type used for comparing items.</typeparam>
1687+
/// <param name="source">The source to have distinct items only of.</param>
1688+
/// <param name="keySelector">The function that receives the source item and should return a key value for distinct comparison.</param>
1689+
/// <param name="setSupplier">The function that should provide a set implementation whose <see cref="ISet{T}.Add(T)"/> method's return
1690+
/// value decided is the source item should pass or not.</param>
1691+
/// <returns>The new IAsyncEnumerable sequence</returns>
1692+
public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<ISet<TKey>> setSupplier)
1693+
{
1694+
RequireNonNull(source, nameof(source));
1695+
RequireNonNull(keySelector, nameof(keySelector));
1696+
RequireNonNull(setSupplier, nameof(setSupplier));
1697+
return new Distinct<TSource, TKey>(source, keySelector, setSupplier);
1698+
}
16191699
}
16201700
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright (c) David Karnok & Contributors.
2+
// Licensed under the Apache 2.0 License.
3+
// See LICENSE file in the project root for full license information.
4+
5+
using System;
6+
using System.Collections.Generic;
7+
using System.Threading.Tasks;
8+
9+
namespace async_enumerable_dotnet.impl
10+
{
11+
internal sealed class Distinct<TSource, TKey> : IAsyncEnumerable<TSource>
12+
{
13+
private readonly IAsyncEnumerable<TSource> _source;
14+
15+
private readonly Func<TSource, TKey> _keySelector;
16+
17+
private readonly Func<ISet<TKey>> _collectionSupplier;
18+
19+
public Distinct(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<ISet<TKey>> collectionSupplier)
20+
{
21+
_source = source;
22+
_keySelector = keySelector;
23+
_collectionSupplier = collectionSupplier;
24+
}
25+
26+
public IAsyncEnumerator<TSource> GetAsyncEnumerator()
27+
{
28+
ISet<TKey> collection;
29+
30+
try
31+
{
32+
collection = _collectionSupplier();
33+
}
34+
catch (Exception ex)
35+
{
36+
return new Error<TSource>.ErrorEnumerator(ex);
37+
}
38+
return new DistinctEnumerator(_source.GetAsyncEnumerator(), _keySelector, collection);
39+
}
40+
41+
private sealed class DistinctEnumerator : IAsyncEnumerator<TSource>
42+
{
43+
private readonly IAsyncEnumerator<TSource> _source;
44+
45+
private readonly Func<TSource, TKey> _keySelector;
46+
47+
private ISet<TKey> _collection;
48+
49+
public TSource Current => _source.Current;
50+
51+
public DistinctEnumerator(IAsyncEnumerator<TSource> source, Func<TSource, TKey> keySelector, ISet<TKey> collection)
52+
{
53+
_source = source;
54+
_keySelector = keySelector;
55+
_collection = collection;
56+
}
57+
58+
public ValueTask DisposeAsync()
59+
{
60+
_collection = default;
61+
return _source.DisposeAsync();
62+
}
63+
64+
public async ValueTask<bool> MoveNextAsync()
65+
{
66+
for (; ; )
67+
{
68+
if (await _source.MoveNextAsync())
69+
{
70+
if (_collection.Add(_keySelector(_source.Current)))
71+
{
72+
return true;
73+
}
74+
}
75+
else
76+
{
77+
return false;
78+
}
79+
}
80+
}
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)