Skip to content

Commit 9cc8548

Browse files
authored
Add explicit stop method to Auditable (#5413)
1 parent 0387d63 commit 9cc8548

File tree

3 files changed

+85
-7
lines changed

3 files changed

+85
-7
lines changed

src/Elasticsearch.Net/Auditing/Auditable.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ public Auditable(AuditEvent type, List<Audit> auditTrail, IDateTimeProvider date
2121
_dateTimeProvider = dateTimeProvider;
2222
var started = _dateTimeProvider.Now();
2323

24-
_audit = new Audit(type, started);
25-
_audit.Node = node;
24+
_audit = new Audit(type, started) { Node = node };
2625
auditTrail.Add(_audit);
2726
var diagnosticName = type.GetAuditDiagnosticEventName();
2827
_activity = diagnosticName != null ? DiagnosticSource.Diagnose(diagnosticName, _audit) : null;
@@ -43,9 +42,11 @@ public string Path
4342
set => _audit.Path = value;
4443
}
4544

45+
public void Stop() => _audit.Ended = _dateTimeProvider.Now();
46+
4647
public void Dispose()
4748
{
48-
_audit.Ended = _dateTimeProvider.Now();
49+
_audit.Ended = _audit.Ended == default ? _dateTimeProvider.Now() : _audit.Ended;
4950
_activity?.Dispose();
5051
}
5152
}

src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public TResponse CallElasticsearch<TResponse>(RequestData requestData)
161161
var response = _connection.Request<TResponse>(requestData);
162162
d.EndState = response.ApiCall;
163163
response.ApiCall.AuditTrail = AuditTrail;
164+
audit.Stop();
164165
ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCall, response);
165166
if (!response.ApiCall.Success) audit.Event = requestData.OnFailureAuditEvent;
166167
return response;
@@ -186,6 +187,7 @@ public async Task<TResponse> CallElasticsearchAsync<TResponse>(RequestData reque
186187
var response = await _connection.RequestAsync<TResponse>(requestData, cancellationToken).ConfigureAwait(false);
187188
d.EndState = response.ApiCall;
188189
response.ApiCall.AuditTrail = AuditTrail;
190+
audit.Stop();
189191
ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCall, response);
190192
if (!response.ApiCall.Success) audit.Event = requestData.OnFailureAuditEvent;
191193
return response;
@@ -380,6 +382,7 @@ public void Ping(Node node)
380382
{
381383
var response = _connection.Request<VoidResponse>(pingData);
382384
d.EndState = response;
385+
audit.Stop();
383386
ThrowBadAuthPipelineExceptionWhenNeeded(response);
384387
//ping should not silently accept bad but valid http responses
385388
if (!response.Success)
@@ -408,6 +411,7 @@ public async Task PingAsync(Node node, CancellationToken cancellationToken)
408411
{
409412
var response = await _connection.RequestAsync<VoidResponse>(pingData, cancellationToken).ConfigureAwait(false);
410413
d.EndState = response;
414+
audit.Stop();
411415
ThrowBadAuthPipelineExceptionWhenNeeded(response);
412416
//ping should not silently accept bad but valid http responses
413417
if (!response.Success)
@@ -438,7 +442,7 @@ public void Sniff()
438442
audit.Path = requestData.PathAndQuery;
439443
var response = _connection.Request<SniffResponse>(requestData);
440444
d.EndState = response;
441-
445+
audit.Stop();
442446
ThrowBadAuthPipelineExceptionWhenNeeded(response);
443447
//sniff should not silently accept bad but valid http responses
444448
if (!response.Success)
@@ -475,7 +479,7 @@ public async Task SniffAsync(CancellationToken cancellationToken)
475479
audit.Path = requestData.PathAndQuery;
476480
var response = await _connection.RequestAsync<SniffResponse>(requestData, cancellationToken).ConfigureAwait(false);
477481
d.EndState = response;
478-
482+
audit.Stop();
479483
ThrowBadAuthPipelineExceptionWhenNeeded(response);
480484
//sniff should not silently accept bad but valid http responses
481485
if (!response.Success)
@@ -564,8 +568,8 @@ private RequestData CreatePingRequestData(Node node)
564568
EnableHttpPipelining = RequestConfiguration?.EnableHttpPipelining ?? _settings.HttpPipeliningEnabled,
565569
ForceNode = RequestConfiguration?.ForceNode
566570
};
567-
IRequestParameters requestParameters = new RootNodeInfoRequestParameters();
568-
requestParameters.RequestConfiguration = requestOverrides;
571+
572+
IRequestParameters requestParameters = new RootNodeInfoRequestParameters { RequestConfiguration = requestOverrides };
569573

570574
var data = new RequestData(HttpMethod.HEAD, string.Empty, null, _settings, requestParameters, _memoryStreamFactory) { Node = node };
571575
return data;
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using System;
6+
using System.Collections.Concurrent;
7+
using System.Collections.Generic;
8+
using System.Diagnostics;
9+
using System.Threading.Tasks;
10+
using Elastic.Elasticsearch.Xunit.XunitPlumbing;
11+
using Elasticsearch.Net;
12+
using Elasticsearch.Net.Diagnostics;
13+
using Xunit;
14+
15+
namespace Tests.Reproduce
16+
{
17+
public class GitHubIssue5363
18+
{
19+
internal class TestDiagnosticListener : IObserver<DiagnosticListener>, IDisposable
20+
{
21+
private ConcurrentBag<IDisposable> Disposables { get; } = new();
22+
23+
public Action<IApiCallDetails> OnEnded { get; }
24+
25+
public TestDiagnosticListener(Action<IApiCallDetails> onEnded) => OnEnded = onEnded;
26+
27+
public void OnError(Exception error) { }
28+
public void OnCompleted() { }
29+
30+
public void OnNext(DiagnosticListener value) =>
31+
TrySubscribe(DiagnosticSources.RequestPipeline.SourceName,
32+
() => new RequestPipelineDiagnosticObserver(null, v => OnEnded(v.Value)), value);
33+
34+
private void TrySubscribe(string sourceName, Func<IObserver<KeyValuePair<string, object>>> listener, DiagnosticListener value)
35+
{
36+
if (value.Name != sourceName)
37+
return;
38+
var d = value.Subscribe(listener());
39+
40+
Disposables.Add(d);
41+
}
42+
43+
public void Dispose()
44+
{
45+
foreach (var d in Disposables)
46+
{
47+
d.Dispose();
48+
}
49+
}
50+
}
51+
52+
[U]
53+
public async Task DiagnosticListener_AuditTrailIsValid()
54+
{
55+
using var listener = new TestDiagnosticListener(data =>
56+
{
57+
var auditTrailEvent = data.AuditTrail[0];
58+
59+
Assert.True(auditTrailEvent.Ended != default);
60+
});
61+
62+
using var foo = DiagnosticListener.AllListeners.Subscribe(listener);
63+
64+
var connectionPool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
65+
var settings = new ConnectionConfiguration(connectionPool, new InMemoryConnection());
66+
67+
var client = new ElasticLowLevelClient(settings);
68+
var person = new { Id = "1" };
69+
70+
await client.IndexAsync<BytesResponse>("test-index", PostData.Serializable(person));
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)