88import time
99from contextlib import contextmanager
1010import copy
11+ import random
1112
1213import attr
1314import grpc
2627from .scheduler import TagSet , schedule
2728from .generated import labgrid_coordinator_pb2
2829from .generated import labgrid_coordinator_pb2_grpc
29- from ..util import atomic_replace , labgrid_version , yaml
30+ from ..util import atomic_replace , labgrid_version , yaml , Timeout
3031
3132
3233@contextmanager
@@ -220,7 +221,7 @@ def __init__(self) -> None:
220221 self .load ()
221222
222223 self .loop = asyncio .get_running_loop ()
223- for name in ["save" , "reacquire " , "schedule" ]:
224+ for name in ["save" , "sync_resources " , "schedule" ]:
224225 step_func = getattr (self , f"_poll_step_{ name } " )
225226 task = self .loop .create_task (self .poll (step_func ), name = f"coordinator-poll-{ name } " )
226227 self .poll_tasks .append (task )
@@ -231,11 +232,11 @@ async def _poll_step_save(self):
231232 with warn_if_slow ("save changes" , level = logging .DEBUG ):
232233 await self .save ()
233234
234- async def _poll_step_reacquire (self ):
235- # try to re-acquire orphaned resources
235+ async def _poll_step_sync_resources (self ):
236+ # try to synchronize resources
236237 async with self .lock :
237- with warn_if_slow ("reacquire orphaned resources" , limit = 3.0 ):
238- await self ._reacquire_orphaned_resources ()
238+ with warn_if_slow ("synchronize resources" , limit = 3.0 ):
239+ await self ._synchronize_resources ()
239240
240241 async def _poll_step_schedule (self ):
241242 # update reservations
@@ -638,6 +639,14 @@ async def _acquire_resources(self, place, resources):
638639 if resource .acquired :
639640 return False
640641
642+ for otherplace in self .places .values ():
643+ for oldres in otherplace .acquired_resources :
644+ if resource .path == oldres .path :
645+ logging .info (
646+ "Conflicting orphaned resource %s for acquire request for place %s" , oldres , place .name
647+ )
648+ return False
649+
641650 # acquire resources
642651 acquired = []
643652 try :
@@ -692,47 +701,124 @@ async def _release_resources(self, place, resources, callback=True):
692701 except :
693702 logging .exception ("failed to publish released resource %s" , resource )
694703
695- async def _reacquire_orphaned_resources (self ):
704+ async def _synchronize_resources (self ):
696705 assert self .lock .locked ()
697706
698- for place in self .places .values ():
699- changed = False
707+ # fix:
708+ # - a resource is acquired for a place that is not acquired
709+ # * perhaps caused by a resource acquire timeout (during normal lock)
710+ # -> release()
711+ # - a resource is acquired for a place that still has it as orphaned
712+ # * perhaps caused by a resource acquire timeout (during reacquire)
713+ # -> replace orphaned resource
714+ # - a resource is released, but a place still has it as orphaned
715+ # * perhaps caused by a exporter restart
716+ # -> acquire() and replace orphaned resource
717+
718+ acquired_resources = {}
719+ used_resources = {}
720+ orphaned_resources = {}
721+
722+ # find acquired resources
723+ for exporter in self .exporters .values ():
724+ for group in exporter .groups .values ():
725+ for resource in group .values ():
726+ if resource .acquired :
727+ acquired_resources [resource .path ] = resource
700728
701- for idx , resource in enumerate (place .acquired_resources ):
729+ # find resources used by places
730+ for place in self .places .values ():
731+ for resource in place .acquired_resources :
702732 if not resource .orphaned :
703- continue
733+ used_resources [resource .path ] = resource
734+ else :
735+ orphaned_resources [resource .path ] = resource
736+
737+ timeout = Timeout (5.0 )
738+
739+ # find resources to be released
740+ to_release = list (acquired_resources .keys () - used_resources .keys () - orphaned_resources .keys ())
741+ if to_release :
742+ logging .info ("synchronize resources: %s acquired resource(s) should be released" , len (to_release ))
743+ random .shuffle (to_release ) # don't get stuck on a problematic resource
744+ for resource_path in to_release :
745+ if timeout .expired :
746+ continue # release the coordinator lock
747+
748+ resource = acquired_resources [resource_path ]
749+ if resource .acquired == "<broken>" :
750+ continue
751+ place = self .places .get (resource .acquired )
752+ print (f"should release { resource } for { place } ?" )
704753
705- # is the exporter connected again?
706- exporter = self .get_exporter_by_name (resource .path [0 ])
707- if not exporter :
708- continue
754+ if place is None :
755+ logging .warning ("resource %s claims to be acquired by unknown place" , resource )
756+ elif not place .acquired :
757+ logging .warning ("resource %s claims to be acquired by unacquired place" , resource )
758+ else :
759+ continue
760+ try :
761+ await self ._release_resources (place , [resource ])
762+ del acquired_resources [resource_path ]
763+ except Exception :
764+ logging .exception ("failed to release unused resource %s" , resource )
765+ break
709766
710- # does the resource exist again?
711- try :
712- new_resource = exporter .groups [resource .path [1 ]][resource .path [3 ]]
713- except KeyError :
714- continue
767+ # find orphaned resources to be acquired
768+ to_acquire = list (orphaned_resources .keys () - acquired_resources .keys ())
769+ if to_acquire :
770+ logging .info ("synchronize resources: %s orphaned resource(s) should be acquired" , len (to_acquire ))
771+ random .shuffle (to_acquire ) # don't get stuck on a problematic resource
772+ for resource_path in to_acquire :
773+ if timeout .expired :
774+ continue # release the coordinator lock
775+
776+ resource = orphaned_resources [resource_path ]
777+ if resource .acquired == "<broken>" :
778+ continue
779+ place = self .places .get (resource .acquired )
780+ assert place is not None
781+ assert place .acquired
782+ print (f"should acquire { resource } for { place } ?" )
783+
784+ # is the exporter connected again?
785+ exporter = self .get_exporter_by_name (resource .path [0 ])
786+ if not exporter :
787+ continue
715788
716- if new_resource .acquired :
717- # this should only happen when resources become broken
718- logging .debug ("ignoring acquired/broken resource %s for place %s" , new_resource , place .name )
719- continue
789+ # does the resource exist again?
790+ try :
791+ new_resource = exporter .groups [resource .path [1 ]][resource .path [3 ]]
792+ except KeyError :
793+ continue
720794
721- try :
722- await self ._acquire_resource (place , new_resource )
723- place .acquired_resources [idx ] = new_resource
724- except Exception :
725- logging .exception (
726- "failed to reacquire orphaned resource %s for place %s" , new_resource , place .name
727- )
728- break
729-
730- logging .info ("reacquired orphaned resource %s for place %s" , new_resource , place .name )
731- changed = True
732-
733- if changed :
734- self ._publish_place (place )
735- self .save_later ()
795+ if new_resource .acquired :
796+ # this should only happen when resources become broken
797+ logging .warning ("ignoring acquired/broken resource %s for place %s" , new_resource , place .name )
798+ continue
799+
800+ try :
801+ await self ._acquire_resource (place , new_resource )
802+ acquired_resources [new_resource .path ] = new_resource
803+ except Exception :
804+ logging .exception ("failed to reacquire orphaned resource %s for place %s" , new_resource , place .name )
805+ break
806+
807+ # find orphaned resources to be replaced in the places
808+ to_replace = set (orphaned_resources .keys () & acquired_resources .keys ())
809+ if to_replace :
810+ logging .info ("synchronize resources: %s orphaned resource(s) should be replaced" , len (to_replace ))
811+ for resource_path in set (orphaned_resources .keys () & acquired_resources .keys ()):
812+ oldresource = orphaned_resources [resource_path ]
813+ newresource = acquired_resources [resource_path ]
814+ assert oldresource .acquired == newresource .acquired
815+
816+ place = self .places .get (newresource .acquired )
817+ assert place is not None
818+ assert place .acquired
819+
820+ idx = place .acquired_resources .index (oldresource )
821+ place .acquired_resources [idx ] = newresource
736822
737823 @locked
738824 async def AcquirePlace (self , request , context ):
@@ -755,9 +841,6 @@ async def AcquirePlace(self, request, context):
755841 if not res .owner == username :
756842 await context .abort (grpc .StatusCode .PERMISSION_DENIED , f"Place { name } was not reserved for { username } " )
757843
758- # First try to reacquire orphaned resources to avoid conflicts.
759- await self ._reacquire_orphaned_resources ()
760-
761844 # FIXME use the session object instead? or something else which
762845 # survives disconnecting clients?
763846 place .acquired = username
0 commit comments