File tree Expand file tree Collapse file tree 5 files changed +119
-0
lines changed Expand file tree Collapse file tree 5 files changed +119
-0
lines changed Original file line number Diff line number Diff line change 11<?php namespace Jenssegers \Mongodb ;
22
33use Illuminate \Support \ServiceProvider ;
4+ use Jenssegers \Mongodb \Queue \MongoConnector ;
45
56class MongodbServiceProvider extends ServiceProvider
67{
@@ -19,10 +20,18 @@ public function boot()
1920 */
2021 public function register ()
2122 {
23+ // Add database driver.
2224 $ this ->app ->resolving ('db ' , function ($ db ) {
2325 $ db ->extend ('mongodb ' , function ($ config ) {
2426 return new Connection ($ config );
2527 });
2628 });
29+
30+ // Add connector for queue support.
31+ $ this ->app ->resolving ('queue ' , function ($ queue ) {
32+ $ queue ->addConnector ('mongodb ' , function () {
33+ return new MongoConnector ($ this ->app ['db ' ]);
34+ });
35+ });
2736 }
2837}
Original file line number Diff line number Diff line change 1+ <?php namespace Jenssegers \Mongodb \Queue ;
2+
3+ use Illuminate \Database \ConnectionResolverInterface ;
4+ use Illuminate \Queue \Connectors \ConnectorInterface ;
5+ use Illuminate \Support \Arr ;
6+
7+ class MongoConnector implements ConnectorInterface
8+ {
9+ /**
10+ * Database connections.
11+ *
12+ * @var \Illuminate\Database\ConnectionResolverInterface
13+ */
14+ protected $ connections ;
15+
16+ /**
17+ * Create a new connector instance.
18+ *
19+ * @param \Illuminate\Database\ConnectionResolverInterface $connections
20+ * @return void
21+ */
22+ public function __construct (ConnectionResolverInterface $ connections )
23+ {
24+ $ this ->connections = $ connections ;
25+ }
26+
27+ /**
28+ * Establish a queue connection.
29+ *
30+ * @param array $config
31+ * @return \Illuminate\Contracts\Queue\Queue
32+ */
33+ public function connect (array $ config )
34+ {
35+ return new MongoQueue (
36+ $ this ->connections ->connection (Arr::get ($ config , 'connection ' )),
37+ $ config ['table ' ],
38+ $ config ['queue ' ],
39+ Arr::get ($ config , 'expire ' , 60 )
40+ );
41+ }
42+ }
Original file line number Diff line number Diff line change 1+ <?php namespace Jenssegers \Mongodb \Queue ;
2+
3+ use Carbon \Carbon ;
4+ use Illuminate \Queue \DatabaseQueue ;
5+
6+ class MongoQueue extends DatabaseQueue
7+ {
8+ /**
9+ * Get the next available job for the queue.
10+ *
11+ * @param string|null $queue
12+ * @return \StdClass|null
13+ */
14+ protected function getNextAvailableJob ($ queue )
15+ {
16+ $ job = parent ::getNextAvailableJob ($ queue );
17+
18+ if ($ job ) {
19+ $ job ->id = $ job ->_id ;
20+ }
21+
22+ return $ job ;
23+ }
24+
25+ /**
26+ * Release the jobs that have been reserved for too long.
27+ *
28+ * @param string $queue
29+ * @return void
30+ */
31+ protected function releaseJobsThatHaveBeenReservedTooLong ($ queue )
32+ {
33+ $ expired = Carbon::now ()->subSeconds ($ this ->expire )->getTimestamp ();
34+
35+ $ reserved = $ this ->database ->collection ($ this ->table )
36+ ->where ('queue ' , $ this ->getQueue ($ queue ))
37+ ->where ('reserved ' , 1 )
38+ ->where ('reserved_at ' , '<= ' , $ expired )->get ();
39+
40+ foreach ($ reserved as $ job ) {
41+ $ attempts = $ job ['attempts ' ] + 1 ;
42+ $ this ->releaseJob ($ job ['_id ' ], $ attempts );
43+ }
44+ }
45+ }
Original file line number Diff line number Diff line change 1+ <?php
2+
3+ class QueueTest extends TestCase
4+ {
5+ public function testQueue ()
6+ {
7+ $ id = Queue::push ('test ' , ['foo ' => 'bar ' ], 'test ' );
8+ $ this ->assertNotNull ($ id );
9+
10+ $ job = Queue::pop ('test ' );
11+ $ this ->assertInstanceOf ('Illuminate\Queue\Jobs\DatabaseJob ' , $ job );
12+ }
13+ }
Original file line number Diff line number Diff line change @@ -29,11 +29,21 @@ protected function getEnvironmentSetUp($app)
2929
3030 $ config = require 'config/database.php ' ;
3131
32+ $ app ['config ' ]->set ('app.key ' , 'ZsZewWyUJ5FsKp9lMwv4tYbNlegQilM7 ' );
33+
3234 $ app ['config ' ]->set ('database.default ' , 'mongodb ' );
3335 $ app ['config ' ]->set ('database.connections.mysql ' , $ config ['connections ' ]['mysql ' ]);
3436 $ app ['config ' ]->set ('database.connections.mongodb ' , $ config ['connections ' ]['mongodb ' ]);
3537
3638 $ app ['config ' ]->set ('auth.model ' , 'User ' );
3739 $ app ['config ' ]->set ('cache.driver ' , 'array ' );
40+
41+ $ app ['config ' ]->set ('queue.default ' , 'mongodb ' );
42+ $ app ['config ' ]->set ('queue.connections.mongodb ' , [
43+ 'driver ' => 'mongodb ' ,
44+ 'table ' => 'jobs ' ,
45+ 'queue ' => 'default ' ,
46+ 'expire ' => 60 ,
47+ ]);
3848 }
3949}
You can’t perform that action at this time.
0 commit comments