-
Notifications
You must be signed in to change notification settings - Fork 159
Prepare connection / region pinning #450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
hiroshihorie
wants to merge
26
commits into
main
Choose a base branch
from
hiroshi/prepare-connection
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 11 commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
356fefc
RegionUrlProvider
hiroshihorie eba6899
Tests
hiroshihorie 652b87f
Test cache interval
hiroshihorie 14c0ffe
Return socket url by default
hiroshihorie c7901f9
Merge branch 'main' into hiroshi/prepare-connection
hiroshihorie 6990f8f
Merge branch 'main' into hiroshi/prepare-connection
hiroshihorie 69cff3e
Fix compile
hiroshihorie 570da75
Merge branch 'main' into hiroshi/prepare-connection
hiroshihorie 1785583
Merge branch 'main' into hiroshi/prepare-connection
hiroshihorie 0799efe
Change to category
hiroshihorie 812a294
Optimize
hiroshihorie 9761f24
Remove sort
hiroshihorie 9f9e42b
Merge branch 'main' into hiroshi/prepare-connection
hiroshihorie 962c703
Merge branch 'main' into hiroshi/prepare-connection
hiroshihorie 59faa1f
Improvements
hiroshihorie d886ed6
Prepare
hiroshihorie 4bb928f
Update tests
hiroshihorie 02a6f47
Merge branch 'main' into hiroshi/prepare-connection
hiroshihorie 16a4a44
Merge branch 'main' into hiroshi/prepare-connection
hiroshihorie 216adfa
Merge branch 'main' into hiroshi/prepare-connection
hiroshihorie 0af7d12
Merge branch 'main' into hiroshi/prepare-connection
hiroshihorie f887ec2
Merge fixes
hiroshihorie d68c749
Changes
hiroshihorie 3b50244
Minor adjustments
hiroshihorie 4627d15
Prewarm url
hiroshihorie eba7b15
Fix error for non-cloud url
hiroshihorie File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| /* | ||
| * Copyright 2024 LiveKit | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| import Foundation | ||
|
|
||
| // MARK: - Room+Region | ||
|
|
||
| extension Room { | ||
| static let defaultCacheInterval: TimeInterval = 3000 | ||
|
|
||
| func resolveBestRegion() async throws -> RegionInfo { | ||
| try await requestRegionSettings() | ||
|
|
||
| let sortedByDistance = _regionState.remaining.sorted { $0.distance < $1.distance } | ||
| log("[Region] Remaining regions: \(String(describing: sortedByDistance))") | ||
|
|
||
| guard let selectedRegion = sortedByDistance.first else { | ||
| throw LiveKitError(.regionUrlProvider, message: "No more remaining regions.") | ||
| } | ||
|
|
||
| log("[Region] Resolved region: \(String(describing: selectedRegion))") | ||
|
|
||
| return selectedRegion | ||
| } | ||
|
|
||
| func add(failedRegion region: RegionInfo) { | ||
| _regionState.mutate { | ||
| $0.remaining.remove(region) | ||
| } | ||
| } | ||
|
|
||
| // MARK: - Private | ||
|
|
||
| private func requestRegionSettings() async throws { | ||
| let (serverUrl, token) = _state.read { ($0.url, $0.token) } | ||
|
|
||
| guard let serverUrl, let token else { | ||
| throw LiveKitError(.invalidState) | ||
| } | ||
|
|
||
| let shouldRequestRegionSettings = _regionState.read { | ||
| guard serverUrl == $0.url, let regionSettingsUpdated = $0.lastRequested else { return true } | ||
| let interval = Date().timeIntervalSince(regionSettingsUpdated) | ||
| log("[Region] Interval: \(String(describing: interval))") | ||
| return interval > Self.defaultCacheInterval | ||
| } | ||
|
|
||
| guard shouldRequestRegionSettings else { return } | ||
|
|
||
| // Ensure url is for cloud. | ||
| guard serverUrl.isCloud() else { | ||
| throw LiveKitError(.onlyForCloud) | ||
| } | ||
|
|
||
| // Make a request which ignores cache. | ||
| var request = URLRequest(url: serverUrl.regionSettingsUrl(), | ||
| cachePolicy: .reloadIgnoringLocalAndRemoteCacheData) | ||
|
|
||
| request.addValue("Bearer \(token)", forHTTPHeaderField: "authorization") | ||
|
|
||
| log("[Region] Requesting region settings...") | ||
|
|
||
| let (data, response) = try await URLSession.shared.data(for: request) | ||
| // Response must be a HTTPURLResponse. | ||
| guard let httpResponse = response as? HTTPURLResponse else { | ||
| throw LiveKitError(.regionUrlProvider, message: "Failed to fetch region settings") | ||
| } | ||
|
|
||
| // Check the status code. | ||
| guard httpResponse.isStatusCodeOK else { | ||
| log("[Region] Failed to fetch region settings, error: \(String(describing: httpResponse))", .error) | ||
| throw LiveKitError(.regionUrlProvider, message: "Failed to fetch region settings with status code: \(httpResponse.statusCode)") | ||
| } | ||
|
|
||
| do { | ||
| // Try to parse the JSON data. | ||
| let regionSettings = try Livekit_RegionSettings(jsonUTF8Data: data) | ||
| let allRegions = regionSettings.regions.compactMap { $0.toLKType() } | ||
|
|
||
| if allRegions.isEmpty { | ||
| throw LiveKitError(.regionUrlProvider, message: "Fetched region data is empty.") | ||
| } | ||
|
|
||
| log("[Region] all regions: \(String(describing: allRegions))") | ||
|
|
||
| _regionState.mutate { | ||
| $0.url = serverUrl | ||
| $0.all = Set(allRegions) | ||
| $0.remaining = Set(allRegions) | ||
| $0.lastRequested = Date() | ||
| } | ||
| } catch { | ||
| throw LiveKitError(.regionUrlProvider, message: "Failed to parse region settings with error: \(error)") | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -168,7 +168,16 @@ public class Room: NSObject, ObservableObject, Loggable { | |
| } | ||
| } | ||
|
|
||
| struct RegionState { | ||
| // Region | ||
| var url: URL? | ||
| var lastRequested: Date? | ||
| var all: Set<RegionInfo> = [] | ||
| var remaining: Set<RegionInfo> = [] | ||
| } | ||
|
|
||
| let _state: StateSync<State> | ||
| let _regionState = StateSync(RegionState()) | ||
|
|
||
| private let _sidCompleter = AsyncCompleter<Sid>(label: "sid", defaultTimeout: .resolveSid) | ||
|
|
||
|
|
@@ -315,28 +324,35 @@ public class Room: NSObject, ObservableObject, Loggable { | |
|
|
||
| try Task.checkCancellation() | ||
|
|
||
| _state.mutate { $0.connectionState = .connecting } | ||
| _state.mutate { | ||
| $0.url = url | ||
| $0.token = token | ||
| $0.connectionState = .connecting | ||
| } | ||
|
|
||
| do { | ||
| try await fullConnectSequence(url, token) | ||
|
|
||
| // Connect sequence successful | ||
| log("Connect sequence completed") | ||
| while true { | ||
| let region = try await resolveBestRegion() | ||
|
||
|
|
||
| // Final check if cancelled, don't fire connected events | ||
| try Task.checkCancellation() | ||
|
|
||
| // update internal vars (only if connect succeeded) | ||
| _state.mutate { | ||
| $0.url = url | ||
| $0.token = token | ||
| $0.connectionState = .connected | ||
| do { | ||
| try await fullConnectSequence(region.url, token) | ||
| // Connect sequence successful | ||
| log("Connect sequence completed") | ||
| // Final check if cancelled, don't fire connected events | ||
| try Task.checkCancellation() | ||
| _state.mutate { $0.connectionState = .connected } | ||
| break // Exit loop on successful connection | ||
| } catch { | ||
| log("Connect failed with region: \(region)") | ||
| add(failedRegion: region) | ||
| // Prepare for next connect attempt. | ||
| await cleanUp(isFullReconnect: true) | ||
| } | ||
| } | ||
|
|
||
| } catch { | ||
| log("Failed to resolve a region or connect: \(error)") | ||
| await cleanUp(withError: error) | ||
| // Re-throw error | ||
| throw error | ||
| throw error // Re-throw the original error | ||
| } | ||
|
|
||
| log("Connected to \(String(describing: self))", .info) | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| /* | ||
| * Copyright 2024 LiveKit | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| import Foundation | ||
|
|
||
| @objc | ||
| public class RegionInfo: NSObject { | ||
| let regionId: String | ||
| let url: URL | ||
| let distance: Int64 | ||
|
|
||
| init?(region: String, url: String, distance: Int64) { | ||
| guard let url = URL(string: url) else { return nil } | ||
| regionId = region | ||
| self.url = url | ||
| self.distance = distance | ||
| } | ||
|
|
||
| // MARK: - Equal | ||
|
|
||
| override public func isEqual(_ object: Any?) -> Bool { | ||
| guard let other = object as? Self else { return false } | ||
| return regionId == other.regionId | ||
| } | ||
|
|
||
| override public var hash: Int { | ||
| var hasher = Hasher() | ||
| hasher.combine(regionId) | ||
| return hasher.finalize() | ||
| } | ||
|
|
||
| // | ||
|
|
||
| override public var description: String { | ||
| "RegionInfo(id: \(regionId), url: \(url), distance: \(distance))" | ||
| } | ||
| } | ||
|
|
||
| extension Livekit_RegionInfo { | ||
| func toLKType() -> RegionInfo? { | ||
| RegionInfo(region: region, | ||
| url: url, | ||
| distance: distance) | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| /* | ||
| * Copyright 2024 LiveKit | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| @testable import LiveKit | ||
| import XCTest | ||
|
|
||
| class RegionUrlProviderTests: XCTestCase { | ||
| func testResolveUrl() async throws { | ||
| let testCacheInterval: TimeInterval = 3 | ||
| // Test data. | ||
| let testRegionSettings = Livekit_RegionSettings.with { | ||
| $0.regions.append(Livekit_RegionInfo.with { | ||
| $0.region = "otokyo1a" | ||
| $0.url = "https://example.otokyo1a.production.livekit.cloud" | ||
| $0.distance = 32838 | ||
| }) | ||
| $0.regions.append(Livekit_RegionInfo.with { | ||
| $0.region = "dblr1a" | ||
| $0.url = "https://example.dblr1a.production.livekit.cloud" | ||
| $0.distance = 6_660_301 | ||
| }) | ||
| $0.regions.append(Livekit_RegionInfo.with { | ||
| $0.region = "dsyd1a" | ||
| $0.url = "https://example.dsyd1a.production.livekit.cloud" | ||
| $0.distance = 7_823_582 | ||
| }) | ||
| } | ||
|
|
||
| let provider = RegionUrlProvider(url: "wss://test.livekit.cloud", token: "", cacheInterval: testCacheInterval) | ||
|
|
||
| // See if request should be initiated. | ||
| XCTAssert(provider.shouldRequestRegionSettings(), "Should require to request region settings") | ||
|
|
||
| // Set test data. | ||
| provider.set(regionSettings: testRegionSettings) | ||
|
|
||
| // See if request is not required to be initiated. | ||
| XCTAssert(!provider.shouldRequestRegionSettings(), "Should require to request region settings") | ||
|
|
||
| let attempt1 = try await provider.nextBestRegionUrl() | ||
| print("Next url: \(String(describing: attempt1))") | ||
| XCTAssert(attempt1 == URL(string: testRegionSettings.regions[0].url)?.toSocketUrl()) | ||
|
|
||
| let attempt2 = try await provider.nextBestRegionUrl() | ||
| print("Next url: \(String(describing: attempt2))") | ||
| XCTAssert(attempt2 == URL(string: testRegionSettings.regions[1].url)?.toSocketUrl()) | ||
|
|
||
| let attempt3 = try await provider.nextBestRegionUrl() | ||
| print("Next url: \(String(describing: attempt3))") | ||
| XCTAssert(attempt3 == URL(string: testRegionSettings.regions[2].url)?.toSocketUrl()) | ||
|
|
||
| let attempt4 = try await provider.nextBestRegionUrl() | ||
| print("Next url: \(String(describing: attempt4))") | ||
| XCTAssert(attempt4 == nil) | ||
|
|
||
| // Simulate cache time elapse. | ||
| await asyncSleep(for: testCacheInterval) | ||
|
|
||
| // After cache time elapsed, should require to request region settings again. | ||
| XCTAssert(provider.shouldRequestRegionSettings(), "Should require to request region settings") | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| /* | ||
| * Copyright 2024 LiveKit | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| import Foundation | ||
|
|
||
| func asyncSleep(for duration: TimeInterval) async { | ||
| let nanoseconds = UInt64(duration * Double(NSEC_PER_SEC)) | ||
| try? await Task.sleep(nanoseconds: nanoseconds) | ||
| } |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.