|
| 1 | +import Foundation |
| 2 | +import MongoSwift |
| 3 | +import NIO |
| 4 | + |
| 5 | +// swiftlint:disable force_unwrapping |
| 6 | + |
| 7 | +/// Examples used for the MongoDB documentation on Causal Consistency. |
| 8 | +/// - SeeAlso: https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#examples |
| 9 | +private func causalConsistency() throws { |
| 10 | + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) |
| 11 | + let client1 = try MongoClient(using: elg) |
| 12 | + defer { |
| 13 | + client1.syncShutdown() |
| 14 | + try? elg.syncShutdownGracefully() |
| 15 | + } |
| 16 | + |
| 17 | + // Start Causal Consistency Example 1 |
| 18 | + let s1 = client1.startSession(options: ClientSessionOptions(causalConsistency: true)) |
| 19 | + let currentDate = Date() |
| 20 | + var dbOptions = DatabaseOptions( |
| 21 | + readConcern: ReadConcern(.majority), |
| 22 | + writeConcern: try WriteConcern(w: .majority, wtimeoutMS: 1000) |
| 23 | + ) |
| 24 | + let items = client1.db("test", options: dbOptions).collection("items") |
| 25 | + let result1 = items.updateOne( |
| 26 | + filter: ["sku": "111", "end": .null], |
| 27 | + update: ["$set": ["end": .datetime(currentDate)]], |
| 28 | + session: s1 |
| 29 | + ).flatMap { _ in |
| 30 | + items.insertOne(["sku": "nuts-111", "name": "Pecans", "start": .datetime(currentDate)], session: s1) |
| 31 | + } |
| 32 | + // End Causal Consistency Example 1 |
| 33 | + |
| 34 | + let client2 = try MongoClient(using: elg) |
| 35 | + |
| 36 | + // Start Causal Consistency Example 2 |
| 37 | + let options = ClientSessionOptions(causalConsistency: true) |
| 38 | + let result2: EventLoopFuture<Void> = client2.withSession(options: options) { s2 in |
| 39 | + // The cluster and operation times are guaranteed to be non-nil since we already used s1 for operations above. |
| 40 | + s2.advanceClusterTime(to: s1.clusterTime!) |
| 41 | + s2.advanceOperationTime(to: s1.operationTime!) |
| 42 | + |
| 43 | + dbOptions.readPreference = ReadPreference(.secondary) |
| 44 | + let items2 = client2.db("test", options: dbOptions).collection("items") |
| 45 | + |
| 46 | + return items2.find(["end": .null], session: s2).flatMap { cursor in |
| 47 | + cursor.forEach { item in |
| 48 | + print(item) |
| 49 | + } |
| 50 | + } |
| 51 | + } |
| 52 | + // End Causal Consistency Example 2 |
| 53 | +} |
| 54 | + |
| 55 | +/// Examples used for the MongoDB documentation on Change Streams. |
| 56 | +/// - SeeAlso: https://docs.mongodb.com/manual/changeStreams/ |
| 57 | +private func changeStreams() throws { |
| 58 | + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) |
| 59 | + let client = try MongoClient(using: elg) |
| 60 | + let db = client.db("example") |
| 61 | + |
| 62 | + // The following examples assume that you have connected to a MongoDB replica set and have |
| 63 | + // accessed a database that contains an inventory collection. |
| 64 | + |
| 65 | + do { |
| 66 | + // Start Changestream Example 1 |
| 67 | + let inventory = db.collection("inventory") |
| 68 | + |
| 69 | + // Option 1: retrieve next document via next() |
| 70 | + let next = inventory.watch().flatMap { cursor in |
| 71 | + cursor.next() |
| 72 | + } |
| 73 | + |
| 74 | + // Option 2: register a callback to execute for each document |
| 75 | + let result = inventory.watch().flatMap { cursor in |
| 76 | + cursor.forEach { event in |
| 77 | + // process event |
| 78 | + print(event) |
| 79 | + } |
| 80 | + } |
| 81 | + // End Changestream Example 1 |
| 82 | + } |
| 83 | + |
| 84 | + do { |
| 85 | + // Start Changestream Example 2 |
| 86 | + let inventory = db.collection("inventory") |
| 87 | + |
| 88 | + // Option 1: use next() to iterate |
| 89 | + let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) |
| 90 | + .flatMap { changeStream in |
| 91 | + changeStream.next() |
| 92 | + } |
| 93 | + |
| 94 | + // Option 2: register a callback to execute for each document |
| 95 | + let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) |
| 96 | + .flatMap { changeStream in |
| 97 | + changeStream.forEach { event in |
| 98 | + // process event |
| 99 | + print(event) |
| 100 | + } |
| 101 | + } |
| 102 | + // End Changestream Example 2 |
| 103 | + } |
| 104 | + |
| 105 | + do { |
| 106 | + // Start Changestream Example 3 |
| 107 | + let inventory = db.collection("inventory") |
| 108 | + |
| 109 | + inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) |
| 110 | + .flatMap { changeStream in |
| 111 | + changeStream.next().map { _ in |
| 112 | + changeStream.resumeToken |
| 113 | + }.always { _ in |
| 114 | + _ = changeStream.kill() |
| 115 | + } |
| 116 | + }.flatMap { resumeToken in |
| 117 | + inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in |
| 118 | + newStream.forEach { event in |
| 119 | + // process event |
| 120 | + print(event) |
| 121 | + } |
| 122 | + } |
| 123 | + } |
| 124 | + // End Changestream Example 3 |
| 125 | + } |
| 126 | + |
| 127 | + do { |
| 128 | + // Start Changestream Example 4 |
| 129 | + let pipeline: [Document] = [ |
| 130 | + ["$match": ["fullDocument.username": "alice"]], |
| 131 | + ["$addFields": ["newField": "this is an added field!"]] |
| 132 | + ] |
| 133 | + let inventory = db.collection("inventory") |
| 134 | + |
| 135 | + // Option 1: use next() to iterate |
| 136 | + let next = inventory.watch(pipeline, withEventType: Document.self).flatMap { changeStream in |
| 137 | + changeStream.next() |
| 138 | + } |
| 139 | + |
| 140 | + // Option 2: register a callback to execute for each document |
| 141 | + let result = inventory.watch(pipeline, withEventType: Document.self).flatMap { changeStream in |
| 142 | + changeStream.forEach { event in |
| 143 | + // process event |
| 144 | + print(event) |
| 145 | + } |
| 146 | + } |
| 147 | + // End Changestream Example 4 |
| 148 | + } |
| 149 | +} |
0 commit comments