File tree Expand file tree Collapse file tree 5 files changed +126
-0
lines changed Expand file tree Collapse file tree 5 files changed +126
-0
lines changed Original file line number Diff line number Diff line change 22
33use Illuminate \Support \ServiceProvider ;
44use Jenssegers \Mongodb \Eloquent \Model ;
5+ use Jenssegers \Mongodb \Queue \MongoConnector ;
56
67class MongodbServiceProvider extends ServiceProvider
78{
@@ -20,10 +21,18 @@ public function boot()
2021 */
2122 public function register ()
2223 {
24+ // Add database driver.
2325 $ this ->app ->resolving ('db ' , function ($ db ) {
2426 $ db ->extend ('mongodb ' , function ($ config ) {
2527 return new Connection ($ config );
2628 });
2729 });
30+
31+ // Add connector for queue support.
32+ $ this ->app ->resolving ('queue ' , function ($ queue ) {
33+ $ queue ->addConnector ('mongodb ' , function () {
34+ return new MongoConnector ($ this ->app ['db ' ]);
35+ });
36+ });
2837 }
2938}
Original file line number Diff line number Diff line change 1+ <?php namespace Jenssegers \Mongodb \Queue ;
2+
3+ use Illuminate \Database \ConnectionResolverInterface ;
4+ use Illuminate \Queue \Connectors \ConnectorInterface ;
5+ use Illuminate \Support \Arr ;
6+
7+ class MongoConnector implements ConnectorInterface
8+ {
9+ /**
10+ * Database connections.
11+ *
12+ * @var \Illuminate\Database\ConnectionResolverInterface
13+ */
14+ protected $ connections ;
15+
16+ /**
17+ * Create a new connector instance.
18+ *
19+ * @param \Illuminate\Database\ConnectionResolverInterface $connections
20+ * @return void
21+ */
22+ public function __construct (ConnectionResolverInterface $ connections )
23+ {
24+ $ this ->connections = $ connections ;
25+ }
26+
27+ /**
28+ * Establish a queue connection.
29+ *
30+ * @param array $config
31+ * @return \Illuminate\Contracts\Queue\Queue
32+ */
33+ public function connect (array $ config )
34+ {
35+ return new MongoQueue (
36+ $ this ->connections ->connection (Arr::get ($ config , 'connection ' )),
37+ $ config ['table ' ],
38+ $ config ['queue ' ],
39+ Arr::get ($ config , 'expire ' , 60 )
40+ );
41+ }
42+ }
Original file line number Diff line number Diff line change 1+ <?php namespace Jenssegers \Mongodb \Queue ;
2+
3+ use Carbon \Carbon ;
4+ use Illuminate \Queue \DatabaseQueue ;
5+
6+ class MongoQueue extends DatabaseQueue
7+ {
8+ /**
9+ * Get the next available job for the queue.
10+ *
11+ * @param string|null $queue
12+ * @return \StdClass|null
13+ */
14+ protected function getNextAvailableJob ($ queue )
15+ {
16+ $ job = $ this ->database ->table ($ this ->table )
17+ ->lockForUpdate ()
18+ ->where ('queue ' , $ this ->getQueue ($ queue ))
19+ ->where ('reserved ' , 0 )
20+ ->where ('available_at ' , '<= ' , $ this ->getTime ())
21+ ->orderBy ('id ' , 'asc ' )
22+ ->first ();
23+
24+ if ($ job ) {
25+ $ job = (object ) $ job ;
26+ $ job ->id = $ job ->_id ;
27+ }
28+
29+ return $ job ?: null ;
30+ }
31+
32+ /**
33+ * Release the jobs that have been reserved for too long.
34+ *
35+ * @param string $queue
36+ * @return void
37+ */
38+ protected function releaseJobsThatHaveBeenReservedTooLong ($ queue )
39+ {
40+ $ expired = Carbon::now ()->subSeconds ($ this ->expire )->getTimestamp ();
41+
42+ $ reserved = $ this ->database ->collection ($ this ->table )
43+ ->where ('queue ' , $ this ->getQueue ($ queue ))
44+ ->where ('reserved ' , 1 )
45+ ->where ('reserved_at ' , '<= ' , $ expired )->get ();
46+
47+ foreach ($ reserved as $ job ) {
48+ $ attempts = $ job ['attempts ' ] + 1 ;
49+ $ this ->releaseJob ($ job ['_id ' ], $ attempts );
50+ }
51+ }
52+ }
Original file line number Diff line number Diff line change 1+ <?php
2+
3+ class QueueTest extends TestCase
4+ {
5+ public function testQueue ()
6+ {
7+ $ id = Queue::push ('test ' , ['foo ' => 'bar ' ], 'test ' );
8+ $ this ->assertNotNull ($ id );
9+
10+ $ job = Queue::pop ('test ' );
11+ $ this ->assertInstanceOf ('Illuminate\Queue\Jobs\DatabaseJob ' , $ job );
12+ }
13+ }
Original file line number Diff line number Diff line change @@ -29,12 +29,22 @@ protected function getEnvironmentSetUp($app)
2929
3030 $ config = require 'config/database.php ' ;
3131
32+ $ app ['config ' ]->set ('app.key ' , 'ZsZewWyUJ5FsKp9lMwv4tYbNlegQilM7 ' );
33+
3234 $ app ['config ' ]->set ('database.default ' , 'mongodb ' );
3335 $ app ['config ' ]->set ('database.connections.mysql ' , $ config ['connections ' ]['mysql ' ]);
3436 $ app ['config ' ]->set ('database.connections.mongodb ' , $ config ['connections ' ]['mongodb ' ]);
3537
3638 $ app ['config ' ]->set ('auth.model ' , 'User ' );
3739 $ app ['config ' ]->set ('auth.providers.users.model ' , 'User ' );
3840 $ app ['config ' ]->set ('cache.driver ' , 'array ' );
41+
42+ $ app ['config ' ]->set ('queue.default ' , 'mongodb ' );
43+ $ app ['config ' ]->set ('queue.connections.mongodb ' , [
44+ 'driver ' => 'mongodb ' ,
45+ 'table ' => 'jobs ' ,
46+ 'queue ' => 'default ' ,
47+ 'expire ' => 60 ,
48+ ]);
3949 }
4050}
You can’t perform that action at this time.
0 commit comments