Skip to content

Commit 982c5b0

Browse files
committed
Unsubscribe before re-subscribing to avoid error for double sub id
1 parent d9cc3f8 commit 982c5b0

File tree

1 file changed

+17
-9
lines changed

1 file changed

+17
-9
lines changed

src/Tibber.Sdk/RealTimeMeasurementListener.cs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,15 @@ private void UnsubscribeObserver(HomeStreamObserverCollection collection, IObser
172172
collection.Observers.Remove(observer);
173173
}
174174

175+
private async Task ResubscribeStream(Guid homeId, int subscriptionId, CancellationToken cancellationToken)
176+
{
177+
await UnsubscribeStream(subscriptionId, cancellationToken);
178+
await SubscribeStream(homeId, subscriptionId, cancellationToken);
179+
}
180+
175181
private async Task SubscribeStream(Guid homeId, int subscriptionId, CancellationToken cancellationToken)
176182
{
177-
Trace.WriteLine($"subscribe to {homeId}");
183+
Trace.WriteLine($"subscribe to home id {homeId} with subscription id {subscriptionId}");
178184

179185
await ExecuteStreamRequest(
180186
//$@"{{""payload"":{{""query"":""subscription{{testMeasurement(count:2, complete:false){{timestamp,power,powerReactive,powerProduction,powerProductionReactive,accumulatedConsumption,accumulatedConsumptionLastHour,accumulatedProduction,accumulatedProductionLastHour,accumulatedCost,accumulatedReward,currency,minPower,averagePower,maxPower,minPowerProduction,maxPowerProduction,voltagePhase1,voltagePhase2,voltagePhase3,currentL1,currentL2,currentL3,lastMeterConsumption,lastMeterProduction,powerFactor,signalStrength}}}}"",""variables"":{{}},""extensions"":{{}}}},""type"":""subscribe"",""id"":""{subscriptionId}""}}",
@@ -190,13 +196,14 @@ await ExecuteStreamRequest(
190196
private async Task UnsubscribeStream(int subscriptionId, CancellationToken cancellationToken)
191197
{
192198
Trace.WriteLine($"unsubscribe subscription with id {subscriptionId}");
193-
await ExecuteStreamRequest($@"{{""type"":""complete"",""id"":{subscriptionId}}}", cancellationToken);
199+
await ExecuteStreamRequest($@"{{""type"":""complete"",""id"":""{subscriptionId}""}}", cancellationToken);
194200
}
195201

196202
private Task ExecuteStreamRequest(string request, CancellationToken cancellationToken)
197203
{
198-
var stopSubscriptionRequest = new ArraySegment<byte>(Encoding.ASCII.GetBytes(request));
199-
return _wssClient.SendAsync(stopSubscriptionRequest, WebSocketMessageType.Text, true, cancellationToken);
204+
Trace.WriteLine($"send message; client state {_wssClient.State} {_wssClient.CloseStatus} {_wssClient.CloseStatusDescription} {request}");
205+
var requestBytes = new ArraySegment<byte>(Encoding.ASCII.GetBytes(request));
206+
return _wssClient.SendAsync(requestBytes, WebSocketMessageType.Text, true, cancellationToken);
200207
}
201208

202209
private async Task Initialize(Uri websocketSubscriptionUrl, CancellationToken cancellationToken)
@@ -217,7 +224,7 @@ private async Task Initialize(Uri websocketSubscriptionUrl, CancellationToken ca
217224

218225
Trace.WriteLine("web socket connected");
219226

220-
var connectionInitMessage = new WebSocketConnectionInitMessage{ Payload = connectionInitPayload };
227+
var connectionInitMessage = new WebSocketConnectionInitMessage { Payload = connectionInitPayload };
221228
var json = JsonConvert.SerializeObject(connectionInitMessage, TibberApiClient.JsonSerializerSettings);
222229
var init = new ArraySegment<byte>(Encoding.UTF8.GetBytes(json));
223230

@@ -254,6 +261,7 @@ private async void StartListening()
254261

255262
do
256263
{
264+
Trace.WriteLine($"receive message; client state {_wssClient.State} {_wssClient.CloseStatus} {_wssClient.CloseStatusDescription}");
257265
result = await _wssClient.ReceiveAsync(_receiveBuffer, _cancellationTokenSource.Token);
258266
var json = Encoding.ASCII.GetString(_receiveBuffer.Array, 0, result.Count);
259267
stringBuilder.Append(json);
@@ -286,7 +294,7 @@ private async void StartListening()
286294
if (!_cancellationTokenSource.IsCancellationRequested)
287295
{
288296
Trace.WriteLine("connection re-established; re-initialize data streams");
289-
SubscribeStreams(c => true);
297+
ResubscribeStreams(c => true);
290298
continue;
291299
}
292300
}
@@ -365,13 +373,13 @@ private async void StartListening()
365373
} while (!_cancellationTokenSource.IsCancellationRequested);
366374
}
367375

368-
private void SubscribeStreams(Func<HomeStreamObserverCollection, bool> predicate)
376+
private void ResubscribeStreams(Func<HomeStreamObserverCollection, bool> predicate)
369377
{
370378
lock (_homeObservables)
371379
{
372380
var subscriptionTask = (Task)Task.FromResult(0);
373381
foreach (var collection in _homeObservables.Values.Where(predicate))
374-
subscriptionTask = subscriptionTask.ContinueWith(_ => SubscribeStream(collection.Observable.HomeId, collection.Observable.SubscriptionId, _cancellationTokenSource.Token));
382+
subscriptionTask = subscriptionTask.ContinueWith(_ => ResubscribeStream(collection.Observable.HomeId, collection.Observable.SubscriptionId, _cancellationTokenSource.Token));
375383
}
376384
}
377385

@@ -465,7 +473,7 @@ private void CheckDataStreamAlive(object state)
465473
{
466474
var now = DateTimeOffset.UtcNow;
467475

468-
SubscribeStreams(
476+
ResubscribeStreams(
469477
c =>
470478
{
471479
var sinceLastMessageMs = (now - c.LastMessageReceivedAt).TotalMilliseconds;

0 commit comments

Comments
 (0)