@@ -198,14 +198,27 @@ def next(self):
198198 all changes in the cursor. For example::
199199
200200 try:
201- with db.collection.watch(
202- [{'$match': {'operationType': 'insert'}}]) as stream:
201+ resume_token = None
202+ pipeline = [{'$match': {'operationType': 'insert'}}]
203+ with db.collection.watch(pipeline) as stream:
203204 for insert_change in stream:
204205 print(insert_change)
206+ resume_token = stream.resume_token
205207 except pymongo.errors.PyMongoError:
206208 # The ChangeStream encountered an unrecoverable error or the
207209 # resume attempt failed to recreate the cursor.
208- logging.error('...')
210+ if resume_token is None:
211+ # There is no usable resume token because there was a
212+ # failure during ChangeStream initialization.
213+ logging.error('...')
214+ else:
215+ # Use the interrupted ChangeStream's resume token to create
216+ # a new ChangeStream. The new stream will continue from the
217+ # last seen insert change without missing any events.
218+ with db.collection.watch(
219+ pipeline, resume_after=resume_token) as stream:
220+ for insert_change in stream:
221+ print(insert_change)
209222
210223 Raises :exc:`StopIteration` if this ChangeStream is closed.
211224 """
@@ -238,13 +251,17 @@ def try_next(self):
238251 with db.collection.watch() as stream:
239252 while stream.alive:
240253 change = stream.try_next()
254+ # Note that the ChangeStream's resume token may be updated
255+ # even when no changes are returned.
256+ print("Current resume token: %r" % (stream.resume_token,))
241257 if change is not None:
242- print(change)
243- elif stream.alive:
244- # We end up here when there are no recent changes.
245- # Sleep for a while to avoid flooding the server with
246- # getMore requests when no changes are available.
247- time.sleep(10)
258+ print("Change document: %r" % (change,))
259+ continue
260+ # We end up here when there are no recent changes.
261+ # Sleep for a while before trying again to avoid flooding
262+ # the server with getMore requests when no changes are
263+ # available.
264+ time.sleep(10)
248265
249266 If no change document is cached locally then this method runs a single
250267 getMore command. If the getMore yields any documents, the next
0 commit comments