@@ -45,6 +45,7 @@ use crate::{
4545 client:: session:: TransactionState ,
4646 coll:: options:: Hint ,
4747 collation:: Collation ,
48+ db:: options:: RunCursorCommandOptions ,
4849 error:: { ErrorKind , Result } ,
4950 gridfs:: options:: { GridFsDownloadByNameOptions , GridFsUploadOptions } ,
5051 options:: {
@@ -288,6 +289,7 @@ impl<'de> Deserialize<'de> for Operation {
288289 "deleteOne" => deserialize_op :: < DeleteOne > ( definition. arguments ) ,
289290 "find" => deserialize_op :: < Find > ( definition. arguments ) ,
290291 "createFindCursor" => deserialize_op :: < CreateFindCursor > ( definition. arguments ) ,
292+ "createCommandCursor" => deserialize_op :: < CreateCommandCursor > ( definition. arguments ) ,
291293 "aggregate" => deserialize_op :: < Aggregate > ( definition. arguments ) ,
292294 "distinct" => deserialize_op :: < Distinct > ( definition. arguments ) ,
293295 "countDocuments" => deserialize_op :: < CountDocuments > ( definition. arguments ) ,
@@ -314,6 +316,7 @@ impl<'de> Deserialize<'de> for Operation {
314316 "createCollection" => deserialize_op :: < CreateCollection > ( definition. arguments ) ,
315317 "dropCollection" => deserialize_op :: < DropCollection > ( definition. arguments ) ,
316318 "runCommand" => deserialize_op :: < RunCommand > ( definition. arguments ) ,
319+ "runCursorCommand" => deserialize_op :: < RunCursorCommand > ( definition. arguments ) ,
317320 "endSession" => deserialize_op :: < EndSession > ( definition. arguments ) ,
318321 "assertSessionTransactionState" => {
319322 deserialize_op :: < AssertSessionTransactionState > ( definition. arguments )
@@ -1561,7 +1564,9 @@ impl TestOperation for CreateCollection {
15611564 . create_collection ( & self . collection , self . options . clone ( ) )
15621565 . await ?;
15631566 }
1564- Ok ( None )
1567+ Ok ( Some ( Entity :: Collection (
1568+ database. collection ( & self . collection ) ,
1569+ ) ) )
15651570 }
15661571 . boxed ( )
15671572 }
@@ -1644,6 +1649,109 @@ impl TestOperation for RunCommand {
16441649 }
16451650}
16461651
1652+ #[ derive( Debug , Deserialize ) ]
1653+ #[ serde( rename_all = "camelCase" , deny_unknown_fields) ]
1654+ pub ( super ) struct RunCursorCommand {
1655+ command : Document ,
1656+ // We don't need to use this field, but it needs to be included during deserialization so that
1657+ // we can use the deny_unknown_fields tag.
1658+ #[ serde( rename = "commandName" ) ]
1659+ _command_name : String ,
1660+
1661+ #[ serde( flatten) ]
1662+ options : RunCursorCommandOptions ,
1663+ session : Option < String > ,
1664+ }
1665+
1666+ impl TestOperation for RunCursorCommand {
1667+ fn execute_entity_operation < ' a > (
1668+ & ' a self ,
1669+ id : & ' a str ,
1670+ test_runner : & ' a TestRunner ,
1671+ ) -> BoxFuture < ' a , Result < Option < Entity > > > {
1672+ async move {
1673+ let command = self . command . clone ( ) ;
1674+ let db = test_runner. get_database ( id) . await ;
1675+ let options = self . options . clone ( ) ;
1676+
1677+ let result = match & self . session {
1678+ Some ( session_id) => {
1679+ with_mut_session ! ( test_runner, session_id, |session| async {
1680+ let mut cursor = db
1681+ . run_cursor_command_with_session( command, options, session)
1682+ . await ?;
1683+ cursor. stream( session) . try_collect:: <Vec <_>>( ) . await
1684+ } )
1685+ . await ?
1686+ }
1687+ None => {
1688+ let cursor = db. run_cursor_command ( command, options) . await ?;
1689+ cursor. try_collect :: < Vec < _ > > ( ) . await ?
1690+ }
1691+ } ;
1692+
1693+ Ok ( Some ( bson:: to_bson ( & result) ?. into ( ) ) )
1694+ }
1695+ . boxed ( )
1696+ }
1697+ }
1698+
1699+ #[ derive( Debug , Deserialize ) ]
1700+ #[ serde( rename_all = "camelCase" , deny_unknown_fields) ]
1701+ pub struct CreateCommandCursor {
1702+ command : Document ,
1703+ // We don't need to use this field, but it needs to be included during deserialization so that
1704+ // we can use the deny_unknown_fields tag.
1705+ #[ serde( rename = "commandName" ) ]
1706+ _command_name : String ,
1707+
1708+ #[ serde( flatten) ]
1709+ options : RunCursorCommandOptions ,
1710+ session : Option < String > ,
1711+ }
1712+
1713+ impl TestOperation for CreateCommandCursor {
1714+ fn execute_entity_operation < ' a > (
1715+ & ' a self ,
1716+ id : & ' a str ,
1717+ test_runner : & ' a TestRunner ,
1718+ ) -> BoxFuture < ' a , Result < Option < Entity > > > {
1719+ async move {
1720+ let command = self . command . clone ( ) ;
1721+ let db = test_runner. get_database ( id) . await ;
1722+ let options = self . options . clone ( ) ;
1723+
1724+ match & self . session {
1725+ Some ( session_id) => {
1726+ let mut ses_cursor = None ;
1727+ with_mut_session ! ( test_runner, session_id, |session| async {
1728+ ses_cursor = Some (
1729+ db. run_cursor_command_with_session( command, options, session)
1730+ . await ,
1731+ ) ;
1732+ } )
1733+ . await ;
1734+ let test_cursor = TestCursor :: Session {
1735+ cursor : ses_cursor. unwrap ( ) . unwrap ( ) ,
1736+ session_id : session_id. clone ( ) ,
1737+ } ;
1738+ Ok ( Some ( Entity :: Cursor ( test_cursor) ) )
1739+ }
1740+ None => {
1741+ let doc_cursor = db. run_cursor_command ( command, options) . await ?;
1742+ let test_cursor = TestCursor :: Normal ( Mutex :: new ( doc_cursor) ) ;
1743+ Ok ( Some ( Entity :: Cursor ( test_cursor) ) )
1744+ }
1745+ }
1746+ }
1747+ . boxed ( )
1748+ }
1749+
1750+ fn returns_root_documents ( & self ) -> bool {
1751+ false
1752+ }
1753+ }
1754+
16471755#[ derive( Debug , Deserialize ) ]
16481756#[ serde( rename_all = "camelCase" , deny_unknown_fields) ]
16491757pub ( super ) struct EndSession { }
0 commit comments