1+ <?php
2+
3+ namespace React \Socket ;
4+
5+ use React \Dns \Model \Message ;
6+ use React \Dns \Resolver \ResolverInterface ;
7+ use React \EventLoop \LoopInterface ;
8+ use React \EventLoop \TimerInterface ;
9+ use React \Promise ;
10+ use React \Promise \CancellablePromiseInterface ;
11+
12+ /**
13+ * @internal
14+ */
15+ final class HappyEyeBallsConnectionBuilder
16+ {
17+ const CONNECT_INTERVAL = 0.1 ;
18+ const RESOLVE_WAIT = 0.5 ;
19+
20+ public $ loop ;
21+ public $ connector ;
22+ public $ resolver ;
23+ public $ uri ;
24+ public $ host ;
25+ public $ resolved = array (
26+ Message::TYPE_A => false ,
27+ Message::TYPE_AAAA => false ,
28+ );
29+ public $ resolverPromises = array ();
30+ public $ connectionPromises = array ();
31+ public $ connectQueue = array ();
32+ public $ timer ;
33+ public $ parts ;
34+ public $ ipsCount = 0 ;
35+ public $ failureCount = 0 ;
36+ public $ resolve ;
37+ public $ reject ;
38+
39+ public function __construct (LoopInterface $ loop , ConnectorInterface $ connector , ResolverInterface $ resolver , $ uri , $ host , $ parts )
40+ {
41+ $ this ->loop = $ loop ;
42+ $ this ->connector = $ connector ;
43+ $ this ->resolver = $ resolver ;
44+ $ this ->uri = $ uri ;
45+ $ this ->host = $ host ;
46+ $ this ->parts = $ parts ;
47+ }
48+
49+ public function connect ()
50+ {
51+ $ that = $ this ;
52+ return new Promise \Promise (function ($ resolve , $ reject ) use ($ that ) {
53+ $ lookupResolve = function ($ type ) use ($ that , $ resolve , $ reject ) {
54+ return function (array $ ips ) use ($ that , $ type , $ resolve , $ reject ) {
55+ unset($ that ->resolverPromises [$ type ]);
56+ $ that ->resolved [$ type ] = true ;
57+
58+ $ that ->mixIpsIntoConnectQueue ($ ips );
59+
60+ if ($ that ->timer instanceof TimerInterface) {
61+ return ;
62+ }
63+
64+ $ that ->check ($ resolve , $ reject );
65+ };
66+ };
67+
68+ $ ipv4Deferred = null ;
69+ $ timer = null ;
70+ $ that ->resolverPromises [Message::TYPE_AAAA ] = $ that ->resolve (Message::TYPE_AAAA , $ reject )->then ($ lookupResolve (Message::TYPE_AAAA ))->then (function () use (&$ ipv4Deferred ) {
71+ if ($ ipv4Deferred instanceof Promise \Deferred) {
72+ $ ipv4Deferred ->resolve ();
73+ }
74+ });
75+ $ that ->resolverPromises [Message::TYPE_A ] = $ that ->resolve (Message::TYPE_A , $ reject )->then (function ($ ips ) use ($ that , &$ ipv4Deferred , &$ timer ) {
76+ if ($ that ->resolved [Message::TYPE_AAAA ] === true ) {
77+ return Promise \resolve ($ ips );
78+ }
79+
80+ /**
81+ * Delay A lookup by 50ms sending out connection to IPv4 addresses when IPv6 records haven't
82+ * resolved yet as per RFC.
83+ *
84+ * @link https://tools.ietf.org/html/rfc8305#section-3
85+ */
86+ $ ipv4Deferred = new Promise \Deferred ();
87+ $ deferred = new Promise \Deferred ();
88+
89+ $ timer = $ that ->loop ->addTimer ($ that ::RESOLVE_WAIT , function () use ($ deferred , $ ips ) {
90+ $ ipv4Deferred = null ;
91+ $ deferred ->resolve ($ ips );
92+ });
93+
94+ $ ipv4Deferred ->promise ()->then (function () use ($ that , &$ timer , $ deferred , $ ips ) {
95+ $ that ->loop ->cancelTimer ($ timer );
96+ $ deferred ->resolve ($ ips );
97+ });
98+
99+ return $ deferred ->promise ();
100+ })->then ($ lookupResolve (Message::TYPE_A ));
101+ }, function ($ _ , $ reject ) use ($ that , &$ timer ) {
102+ $ that ->cleanUp ();
103+
104+ if ($ timer instanceof TimerInterface) {
105+ $ that ->loop ->cancelTimer ($ timer );
106+ }
107+
108+ $ reject (new \RuntimeException ('Connection to ' . $ that ->uri . ' cancelled during DNS lookup ' ));
109+
110+ $ _ = $ reject = null ;
111+ });
112+ }
113+
114+ /**
115+ * @internal
116+ */
117+ public function resolve ($ type , $ reject )
118+ {
119+ $ that = $ this ;
120+ return $ that ->resolver ->resolveAll ($ that ->host , $ type )->then (null , function () use ($ type , $ reject , $ that ) {
121+ unset($ that ->resolverPromises [$ type ]);
122+ $ that ->resolved [$ type ] = true ;
123+
124+ if ($ that ->hasBeenResolved () === false ) {
125+ return ;
126+ }
127+
128+ if ($ that ->ipsCount === 0 ) {
129+ $ that ->resolved = null ;
130+ $ that ->resolverPromises = null ;
131+ $ reject (new \RuntimeException ('Connection to ' . $ that ->uri . ' failed during DNS lookup: DNS error ' ));
132+ }
133+ });
134+ }
135+
136+ /**
137+ * @internal
138+ */
139+ public function check ($ resolve , $ reject )
140+ {
141+ if (\count ($ this ->connectQueue ) === 0 && $ this ->resolved [Message::TYPE_A ] === true && $ this ->resolved [Message::TYPE_AAAA ] === true && $ this ->timer instanceof TimerInterface) {
142+ $ this ->loop ->cancelTimer ($ this ->timer );
143+ $ this ->timer = null ;
144+ }
145+
146+ if (\count ($ this ->connectQueue ) === 0 ) {
147+ return ;
148+ }
149+
150+ $ ip = \array_shift ($ this ->connectQueue );
151+
152+ $ that = $ this ;
153+ $ that ->connectionPromises [$ ip ] = $ this ->attemptConnection ($ ip )->then (function ($ connection ) use ($ that , $ ip , $ resolve ) {
154+ unset($ that ->connectionPromises [$ ip ]);
155+
156+ $ that ->cleanUp ();
157+
158+ $ resolve ($ connection );
159+ }, function () use ($ that , $ ip , $ resolve , $ reject ) {
160+ unset($ that ->connectionPromises [$ ip ]);
161+
162+ $ that ->failureCount ++;
163+
164+ if ($ that ->hasBeenResolved () === false ) {
165+ return ;
166+ }
167+
168+ if ($ that ->ipsCount === $ that ->failureCount ) {
169+ $ that ->cleanUp ();
170+
171+ $ reject (new \RuntimeException ('All attempts to connect to " ' . $ that ->host . '" have failed ' ));
172+ }
173+ });
174+
175+ /**
176+ * As long as we haven't connected yet keep popping an IP address of the connect queue until one of them
177+ * succeeds or they all fail. We will wait 100ms between connection attempts as per RFC.
178+ *
179+ * @link https://tools.ietf.org/html/rfc8305#section-5
180+ */
181+ if ((\count ($ this ->connectQueue ) > 0 || ($ this ->resolved [Message::TYPE_A ] === false || $ this ->resolved [Message::TYPE_AAAA ] === false )) && $ this ->timer === null ) {
182+ $ this ->timer = $ this ->loop ->addPeriodicTimer (self ::CONNECT_INTERVAL , function () use ($ that , $ resolve , $ reject ) {
183+ $ that ->check ($ resolve , $ reject );
184+ });
185+ }
186+ }
187+
188+ /**
189+ * @internal
190+ */
191+ public function attemptConnection ($ ip )
192+ {
193+ $ promise = null ;
194+ $ that = $ this ;
195+
196+ return new Promise \Promise (
197+ function ($ resolve , $ reject ) use (&$ promise , $ that , $ ip ) {
198+ $ uri = '' ;
199+
200+ // prepend original scheme if known
201+ if (isset ($ that ->parts ['scheme ' ])) {
202+ $ uri .= $ that ->parts ['scheme ' ] . ':// ' ;
203+ }
204+
205+ if (\strpos ($ ip , ': ' ) !== false ) {
206+ // enclose IPv6 addresses in square brackets before appending port
207+ $ uri .= '[ ' . $ ip . '] ' ;
208+ } else {
209+ $ uri .= $ ip ;
210+ }
211+
212+ // append original port if known
213+ if (isset ($ that ->parts ['port ' ])) {
214+ $ uri .= ': ' . $ that ->parts ['port ' ];
215+ }
216+
217+ // append orignal path if known
218+ if (isset ($ that ->parts ['path ' ])) {
219+ $ uri .= $ that ->parts ['path ' ];
220+ }
221+
222+ // append original query if known
223+ if (isset ($ that ->parts ['query ' ])) {
224+ $ uri .= '? ' . $ that ->parts ['query ' ];
225+ }
226+
227+ // append original hostname as query if resolved via DNS and if
228+ // destination URI does not contain "hostname" query param already
229+ $ args = array ();
230+ \parse_str (isset ($ that ->parts ['query ' ]) ? $ that ->parts ['query ' ] : '' , $ args );
231+ if ($ that ->host !== $ ip && !isset ($ args ['hostname ' ])) {
232+ $ uri .= (isset ($ that ->parts ['query ' ]) ? '& ' : '? ' ) . 'hostname= ' . \rawurlencode ($ that ->host );
233+ }
234+
235+ // append original fragment if known
236+ if (isset ($ that ->parts ['fragment ' ])) {
237+ $ uri .= '# ' . $ that ->parts ['fragment ' ];
238+ }
239+
240+ $ promise = $ that ->connector ->connect ($ uri );
241+ $ promise ->then ($ resolve , $ reject );
242+ },
243+ function ($ _ , $ reject ) use (&$ promise , $ that ) {
244+ // cancellation should reject connection attempt
245+ // (try to) cancel pending connection attempt
246+ $ reject (new \RuntimeException ('Connection to ' . $ that ->uri . ' cancelled during connection attempt ' ));
247+
248+ if ($ promise instanceof CancellablePromiseInterface) {
249+ // overwrite callback arguments for PHP7+ only, so they do not show
250+ // up in the Exception trace and do not cause a possible cyclic reference.
251+ $ _ = $ reject = null ;
252+
253+ $ promise ->cancel ();
254+ $ promise = null ;
255+ }
256+ }
257+ );
258+ }
259+
260+ /**
261+ * @internal
262+ */
263+ public function cleanUp ()
264+ {
265+ /** @var CancellablePromiseInterface $promise */
266+ foreach ($ this ->connectionPromises as $ index => $ connectionPromise ) {
267+ if ($ connectionPromise instanceof CancellablePromiseInterface) {
268+ $ connectionPromise ->cancel ();
269+ }
270+ }
271+
272+ /** @var CancellablePromiseInterface $promise */
273+ foreach ($ this ->resolverPromises as $ index => $ resolverPromise ) {
274+ if ($ resolverPromise instanceof CancellablePromiseInterface) {
275+ $ resolverPromise ->cancel ();
276+ }
277+ }
278+
279+ if ($ this ->timer instanceof TimerInterface) {
280+ $ this ->loop ->cancelTimer ($ this ->timer );
281+ $ this ->timer = null ;
282+ }
283+ }
284+
285+ /**
286+ * @internal
287+ */
288+ public function hasBeenResolved ()
289+ {
290+ foreach ($ this ->resolved as $ typeHasBeenResolved ) {
291+ if ($ typeHasBeenResolved === false ) {
292+ return false ;
293+ }
294+ }
295+
296+ return true ;
297+ }
298+
299+ /**
300+ * Mixes an array of IP addresses into the connect queue in such a way they alternate when attempting to connect.
301+ * The goal behind it is first attempt to connect to IPv6, then to IPv4, then to IPv6 again until one of those
302+ * attempts succeeds.
303+ *
304+ * @link https://tools.ietf.org/html/rfc8305#section-4
305+ *
306+ * @internal
307+ */
308+ public function mixIpsIntoConnectQueue (array $ ips )
309+ {
310+ $ this ->ipsCount += \count ($ ips );
311+ $ connectQueueStash = $ this ->connectQueue ;
312+ $ this ->connectQueue = array ();
313+ while (\count ($ connectQueueStash ) > 0 || \count ($ ips ) > 0 ) {
314+ if (\count ($ ips ) > 0 ) {
315+ $ this ->connectQueue [] = \array_shift ($ ips );
316+ }
317+ if (\count ($ connectQueueStash ) > 0 ) {
318+ $ this ->connectQueue [] = \array_shift ($ connectQueueStash );
319+ }
320+ }
321+ }
322+ }
0 commit comments