@@ -455,28 +455,29 @@ async def forward_request(self, gateway: DbGateway, method: str, params: Optiona
455455 except Exception as e :
456456 raise GatewayConnectionError (f"Failed to forward request to { gateway .name } : { str (e )} " )
457457
458- async def check_gateway_health (self , gateway : DbGateway ) -> bool :
459- """Check if a gateway is healthy.
458+ async def check_health_of_gateways (self , gateways : List [ DbGateway ] ) -> bool :
459+ """Health check for gateways
460460
461461 Args:
462- gateway: Gateway to check
462+ gateways: Gateways to check
463463
464464 Returns:
465465 True if gateway is healthy
466466 """
467- if not gateway .is_active :
468- return False
467+ for gateway in gateways :
468+ if not gateway .is_active :
469+ return False
469470
470- try :
471- # Try to initialize connection
472- await self ._initialize_gateway (gateway .url , gateway .auth_value )
471+ try :
472+ # Try to initialize connection
473+ await self ._initialize_gateway (gateway .url , gateway .auth_value )
473474
474- # Update last seen
475- gateway .last_seen = datetime .utcnow ()
476- return True
475+ # Update last seen
476+ gateway .last_seen = datetime .utcnow ()
477+ return True
477478
478- except Exception :
479- return False
479+ except Exception :
480+ return False
480481
481482 async def aggregate_capabilities (self , db : Session ) -> Dict [str , Any ]:
482483 """Aggregate capabilities from all gateways.
@@ -576,29 +577,24 @@ async def connect_to_sse_server(server_url: str, authentication: Optional[Dict[s
576577 except Exception as e :
577578 raise GatewayConnectionError (f"Failed to initialize gateway at { url } : { str (e )} " )
578579
580+ def _get_active_gateways (self ) -> list [DbGateway ]:
581+ """Sync function for database operations (runs in thread)."""
582+ with Session () as db :
583+ return db .execute (select (DbGateway ).where (DbGateway .is_active )).scalars ().all ()
584+
579585 async def _run_health_checks (self ) -> None :
580- """Run periodic health checks on all gateways ."""
586+ """Run health checks with sync Session in async code ."""
581587 while True :
582588 try :
583- async with Session () as db :
584- # Get active gateways
585- gateways = db .execute (select (DbGateway ).where (DbGateway .is_active )).scalars ().all ()
589+ # Run sync database code in a thread
590+ gateways = await asyncio .to_thread (self ._get_active_gateways )
586591
587- # Check each gateway
588- for gateway in gateways :
589- try :
590- is_healthy = await self .check_gateway_health (gateway )
591- if not is_healthy :
592- logger .warning (f"Gateway { gateway .name } is unhealthy" )
593- except Exception as e :
594- logger .error (f"Health check failed for { gateway .name } : { str (e )} " )
595-
596- db .commit ()
592+ # Async health checks (non-blocking)
593+ await self .check_health_of_gateways (gateways )
597594
598595 except Exception as e :
599596 logger .error (f"Health check run failed: { str (e )} " )
600-
601- # Wait for next check
597+
602598 await asyncio .sleep (self ._health_check_interval )
603599
604600 def _get_auth_headers (self ) -> Dict [str , str ]:
0 commit comments