1313// limitations under the License.
1414
1515use std:: fmt:: Formatter ;
16+ use std:: io;
1617use std:: sync:: Arc ;
1718
1819use hyper_util:: rt:: { TokioExecutor , TokioIo } ;
@@ -21,7 +22,8 @@ use hyper_util::service::TowerToHyperService;
2122use quickwit_common:: tower:: BoxFutureInfaillible ;
2223use quickwit_config:: { disable_ingest_v1, enable_ingest_v2} ;
2324use quickwit_search:: SearchService ;
24- use tokio:: net:: TcpListener ;
25+ use tokio:: io:: { AsyncRead , AsyncWrite } ;
26+ use tokio:: net:: { TcpListener , TcpStream } ;
2527use tokio_rustls:: TlsAcceptor ;
2628use tokio_util:: either:: Either ;
2729use tower:: ServiceBuilder ;
@@ -120,6 +122,20 @@ impl Predicate for CompressionPredicate {
120122 }
121123}
122124
125+ async fn apply_tls_if_necessary (
126+ tcp_stream : TcpStream ,
127+ tls_acceptor_opt : & Option < TlsAcceptor > ,
128+ ) -> io:: Result < impl AsyncRead + AsyncWrite + Unpin + ' static > {
129+ let Some ( tls_acceptor) = & tls_acceptor_opt else {
130+ return Ok ( Either :: Right ( tcp_stream) ) ;
131+ } ;
132+ let tls_stream_res = tls_acceptor
133+ . accept ( tcp_stream)
134+ . await
135+ . inspect_err ( |err| error ! ( "failed to perform tls handshake: {err:#}" ) ) ?;
136+ Ok ( Either :: Left ( tls_stream_res) )
137+ }
138+
123139/// Starts REST services.
124140pub ( crate ) async fn start_rest_server (
125141 tcp_listener : TcpListener ,
@@ -227,37 +243,33 @@ pub(crate) async fn start_rest_server(
227243 let mut shutdown_signal = std:: pin:: pin!( shutdown_signal) ;
228244 readiness_trigger. await ;
229245
246+ let tls_acceptor_opt: Option < TlsAcceptor > =
247+ if let Some ( tls_config) = & quickwit_services. node_config . rest_config . tls {
248+ let rustls_config = tls:: make_rustls_config ( tls_config) ?;
249+ Some ( TlsAcceptor :: from ( rustls_config) )
250+ } else {
251+ None
252+ } ;
253+
230254 loop {
231255 tokio:: select! {
232- conn = tcp_listener. accept( ) => {
233- let ( stream , _remote_addr ) = match conn {
234- Ok ( conn ) => conn ,
256+ tcp_accept_res = tcp_listener. accept( ) => {
257+ let tcp_stream = match tcp_accept_res {
258+ Ok ( ( tcp_stream , _remote_addr ) ) => tcp_stream ,
235259 Err ( err) => {
236260 error!( "failed to accept connection: {err:#}" ) ;
237261 continue ;
238262 }
239263 } ;
240264
241- let either_stream =
242- if let Some ( tls_config) = & quickwit_services. node_config. rest_config. tls {
243- let rustls_config = tls:: make_rustls_config( tls_config) ?;
244- let acceptor = TlsAcceptor :: from( rustls_config) ;
245- let tls_stream = match acceptor. accept( stream) . await {
246- Ok ( tls_stream) => tls_stream,
247- Err ( err) => {
248- error!( "failed to perform tls handshake: {err:#}" ) ;
249- continue ;
250- }
251- } ;
252- Either :: Left ( tls_stream)
253- } else {
254- Either :: Right ( stream)
265+ let Ok ( tcp_or_tls_stream) = apply_tls_if_necessary( tcp_stream, & tls_acceptor_opt) . await else {
266+ continue ;
255267 } ;
256268
257- let conn = server. serve_connection_with_upgrades( TokioIo :: new( either_stream ) , service. clone( ) ) ;
258- let conn = graceful. watch( conn . into_owned( ) ) ;
269+ let serve_fut = server. serve_connection_with_upgrades( TokioIo :: new( tcp_or_tls_stream ) , service. clone( ) ) ;
270+ let serve_with_shutdown_fut = graceful. watch( serve_fut . into_owned( ) ) ;
259271 tokio:: spawn( async move {
260- if let Err ( err) = conn . await {
272+ if let Err ( err) = serve_with_shutdown_fut . await {
261273 error!( "failed to serve connection: {err:#}" ) ;
262274 }
263275 } ) ;
@@ -269,11 +281,8 @@ pub(crate) async fn start_rest_server(
269281 }
270282 }
271283
272- tokio:: select! {
273- _ = graceful. shutdown( ) => {
274- info!( "gracefully shutdown" ) ;
275- }
276- }
284+ graceful. shutdown ( ) . await ;
285+ info ! ( "gracefully shutdown" ) ;
277286
278287 Ok ( ( ) )
279288}
0 commit comments