@@ -64,14 +64,17 @@ protected function getNextAvailableJobAndReserve($queue)
6464 $ job = $ this ->database ->getCollection ($ this ->table )->findOneAndUpdate (
6565 [
6666 'queue ' => $ this ->getQueue ($ queue ),
67- 'reserved ' => 0 ,
67+ 'reserved ' => [ ' $ne ' => 1 ] ,
6868 'available_at ' => ['$lte ' => Carbon::now ()->getTimestamp ()],
6969 ],
7070 [
7171 '$set ' => [
7272 'reserved ' => 1 ,
7373 'reserved_at ' => Carbon::now ()->getTimestamp (),
7474 ],
75+ '$inc ' => [
76+ 'attempts ' => 1 ,
77+ ],
7578 ],
7679 [
7780 'returnDocument ' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER ,
@@ -94,24 +97,15 @@ protected function getNextAvailableJobAndReserve($queue)
9497 protected function releaseJobsThatHaveBeenReservedTooLong ($ queue )
9598 {
9699 $ expiration = Carbon::now ()->subSeconds ($ this ->retryAfter )->getTimestamp ();
97- $ now = time ();
98100
99101 $ reserved = $ this ->database ->collection ($ this ->table )
100102 ->where ('queue ' , $ this ->getQueue ($ queue ))
101- ->where (function ($ query ) use ($ expiration , $ now ) {
102- // Check for available jobs
103- $ query ->where (function ($ query ) use ($ now ) {
104- $ query ->whereNull ('reserved_at ' );
105- $ query ->where ('available_at ' , '<= ' , $ now );
106- });
107-
108- // Check for jobs that are reserved but have expired
109- $ query ->orWhere ('reserved_at ' , '<= ' , $ expiration );
110- })->get ();
103+ ->whereNotNull ('reserved_at ' )
104+ ->where ('reserved_at ' , '<= ' , $ expiration )
105+ ->get ();
111106
112107 foreach ($ reserved as $ job ) {
113- $ attempts = $ job ['attempts ' ] + 1 ;
114- $ this ->releaseJob ($ job ['_id ' ], $ attempts );
108+ $ this ->releaseJob ($ job ['_id ' ], $ job ['attempts ' ]);
115109 }
116110 }
117111
0 commit comments