2727 */
2828class Connection
2929{
30+ private const DEFAULT_OPTIONS = [
31+ 'stream ' => 'messages ' ,
32+ 'group ' => 'symfony ' ,
33+ 'consumer ' => 'consumer ' ,
34+ 'auto_setup ' => true ,
35+ ];
36+
3037 private $ connection ;
3138 private $ stream ;
3239 private $ group ;
3340 private $ consumer ;
41+ private $ autoSetup ;
3442 private $ couldHavePendingMessages = true ;
3543
3644 public function __construct (array $ configuration , array $ connectionCredentials = [], array $ redisOptions = [], \Redis $ redis = null )
3745 {
3846 $ this ->connection = $ redis ?: new \Redis ();
3947 $ this ->connection ->connect ($ connectionCredentials ['host ' ] ?? '127.0.0.1 ' , $ connectionCredentials ['port ' ] ?? 6379 );
4048 $ this ->connection ->setOption (\Redis::OPT_SERIALIZER , $ redisOptions ['serializer ' ] ?? \Redis::SERIALIZER_PHP );
41- $ this ->stream = $ configuration ['stream ' ] ?? '' ?: 'messages ' ;
42- $ this ->group = $ configuration ['group ' ] ?? '' ?: 'symfony ' ;
43- $ this ->consumer = $ configuration ['consumer ' ] ?? '' ?: 'consumer ' ;
49+ $ this ->stream = $ configuration ['stream ' ] ?? self ::DEFAULT_OPTIONS ['stream ' ];
50+ $ this ->group = $ configuration ['group ' ] ?? self ::DEFAULT_OPTIONS ['group ' ];
51+ $ this ->consumer = $ configuration ['consumer ' ] ?? self ::DEFAULT_OPTIONS ['consumer ' ];
52+ $ this ->autoSetup = $ configuration ['auto_setup ' ] ?? self ::DEFAULT_OPTIONS ['auto_setup ' ];
4453 }
4554
4655 public static function fromDsn (string $ dsn , array $ redisOptions = [], \Redis $ redis = null ): self
@@ -51,9 +60,9 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
5160
5261 $ pathParts = explode ('/ ' , $ parsedUrl ['path ' ] ?? '' );
5362
54- $ stream = $ pathParts [1 ] ?? '' ;
55- $ group = $ pathParts [2 ] ?? '' ;
56- $ consumer = $ pathParts [3 ] ?? '' ;
63+ $ stream = $ pathParts [1 ] ?? null ;
64+ $ group = $ pathParts [2 ] ?? null ;
65+ $ consumer = $ pathParts [3 ] ?? null ;
5766
5867 $ connectionCredentials = [
5968 'host ' => $ parsedUrl ['host ' ] ?? '127.0.0.1 ' ,
@@ -64,11 +73,21 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
6473 parse_str ($ parsedUrl ['query ' ], $ redisOptions );
6574 }
6675
67- return new self (['stream ' => $ stream , 'group ' => $ group , 'consumer ' => $ consumer ], $ connectionCredentials , $ redisOptions , $ redis );
76+ $ autoSetup = null ;
77+ if (\array_key_exists ('auto_setup ' , $ redisOptions )) {
78+ $ autoSetup = filter_var ($ redisOptions ['auto_setup ' ], FILTER_VALIDATE_BOOLEAN );
79+ unset($ redisOptions ['auto_setup ' ]);
80+ }
81+
82+ return new self (['stream ' => $ stream , 'group ' => $ group , 'consumer ' => $ consumer , 'auto_setup ' => $ autoSetup ], $ connectionCredentials , $ redisOptions , $ redis );
6883 }
6984
7085 public function get (): ?array
7186 {
87+ if ($ this ->autoSetup ) {
88+ $ this ->setup ();
89+ }
90+
7291 $ messageId = '> ' ; // will receive new messages
7392
7493 if ($ this ->couldHavePendingMessages ) {
@@ -141,6 +160,10 @@ public function reject(string $id): void
141160
142161 public function add (string $ body , array $ headers ): void
143162 {
163+ if ($ this ->autoSetup ) {
164+ $ this ->setup ();
165+ }
166+
144167 $ e = null ;
145168 try {
146169 $ added = $ this ->connection ->xadd ($ this ->stream , '* ' , ['message ' => json_encode (
@@ -161,5 +184,7 @@ public function setup(): void
161184 } catch (\RedisException $ e ) {
162185 throw new TransportException ($ e ->getMessage (), 0 , $ e );
163186 }
187+
188+ $ this ->autoSetup = false ;
164189 }
165190}
0 commit comments