33
44(c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski
55"""
6+ from __future__ import annotations
7+
68import os
79import stat
810from hashlib import md5
911from queue import Queue
12+ from typing import Callable
13+ from typing import Type
1014
1115import execnet .rsync_remote
16+ from execnet .gateway_base import Channel
17+ from execnet .multi import MultiChannel
1218
1319
1420class RSync :
@@ -21,51 +27,54 @@ class RSync:
2127 a path on remote side).
2228 """
2329
24- def __init__ (self , sourcedir , callback = None , verbose = True ):
30+ def __init__ (self , sourcedir : str | os . PathLike [ str ] , callback = None , verbose = True ):
2531 self ._sourcedir = str (sourcedir )
2632 self ._verbose = verbose
2733 assert callback is None or hasattr (callback , "__call__" )
2834 self ._callback = callback
29- self ._channels = {}
30- self ._receivequeue = Queue ()
31- self ._links = []
35+ self ._channels : dict [Channel , Callable [[], None ] | None ] = {}
36+ self ._links : list [tuple [str , str , str ]] = []
3237
3338 def filter (self , path ):
3439 return True
3540
36- def _end_of_channel (self , channel ):
41+ def _end_of_channel (self , channel , data ):
3742 if channel in self ._channels :
3843 # too early! we must have got an error
3944 channel .waitclose ()
4045 # or else we raise one
4146 raise OSError (f"connection unexpectedly closed: { channel .gateway } " )
4247
43- def _process_link (self , channel ):
48+ def _process_link (self , channel , data ):
4449 for link in self ._links :
4550 channel .send (link )
4651 # completion marker, this host is done
4752 channel .send (42 )
4853
49- def _done (self , channel ):
54+ def _done (self , channel , data ):
5055 """Call all callbacks"""
5156 finishedcallback = self ._channels .pop (channel )
5257 if finishedcallback :
5358 finishedcallback ()
5459 channel .waitclose ()
5560
56- def _list_done (self , channel ):
61+ def _ack (self , channel , data ):
62+ if self ._callback :
63+ self ._callback ("ack" , self ._paths [data ], channel )
64+
65+ def _list_done (self , channel , data ):
5766 # sum up all to send
5867 if self ._callback :
59- s = sum ([ self ._paths [i ] for i in self ._to_send [channel ] ])
68+ s = sum (self ._paths [i ] for i in self ._to_send [channel ])
6069 self ._callback ("list" , s , channel )
6170
6271 def _send_item (self , channel , data ):
6372 """Send one item"""
6473 modified_rel_path , checksum = data
6574 modifiedpath = os .path .join (self ._sourcedir , * modified_rel_path )
6675 try :
67- f = open (modifiedpath , "rb" )
68- data = f .read ()
76+ with open (modifiedpath , "rb" ) as fp :
77+ data = fp .read ()
6978 except OSError :
7079 data = None
7180
@@ -81,7 +90,6 @@ def _send_item(self, channel, data):
8190 # print "sending", modified_rel_path, data and len(data) or 0, checksum
8291
8392 if data is not None :
84- f .close ()
8593 if checksum is not None and checksum == md5 (data ).digest ():
8694 data = None # not really modified
8795 else :
@@ -92,7 +100,7 @@ def _report_send_file(self, gateway, modified_rel_path):
92100 if self ._verbose :
93101 print (f"{ gateway } <= { modified_rel_path } " )
94102
95- def send (self , raises = True ):
103+ def send (self , raises : bool = True ) -> None :
96104 """Sends a sourcedir to all added targets. Flag indicates
97105 whether to raise an error or return in case of lack of
98106 targets
@@ -110,45 +118,34 @@ def send(self, raises=True):
110118
111119 # paths and to_send are only used for doing
112120 # progress-related callbacks
113- self ._paths = {}
114- self ._to_send = {}
121+ self ._paths : dict [str , int ] = {}
122+ self ._to_send : dict [Channel , list [str ]] = {}
123+
124+ mch = MultiChannel (list (self ._channels ))
125+ rq = mch .make_receive_queue (endmarker = (None , None ))
115126
116127 # send modified file to clients
128+ commands : dict [str | None , Callable ] = {
129+ None : self ._end_of_channel ,
130+ "links" : self ._process_link ,
131+ "done" : self ._done ,
132+ "ack" : self ._ack ,
133+ "send" : self ._send_item ,
134+ "list_done" : self ._list_done ,
135+ }
136+
117137 while self ._channels :
118- channel , req = self ._receivequeue .get ()
119- if req is None :
120- self ._end_of_channel (channel )
121- else :
122- command , data = req
123- if command == "links" :
124- self ._process_link (channel )
125- elif command == "done" :
126- self ._done (channel )
127- elif command == "ack" :
128- if self ._callback :
129- self ._callback ("ack" , self ._paths [data ], channel )
130- elif command == "list_done" :
131- self ._list_done (channel )
132- elif command == "send" :
133- self ._send_item (channel , data )
134- del data
135- else :
136- assert "Unknown command %s" % command
137-
138- def add_target (self , gateway , destdir , finishedcallback = None , ** options ):
138+ channel , (command , data ) = rq .get ()
139+ assert command in commands , "Unknown command %s" % command
140+ commands [command ](channel , data )
141+
142+ def add_target (self , gateway , destdir , finishedcallback = None , delete : bool = False ):
139143 """Adds a remote target specified via a gateway
140144 and a remote destination directory.
141145 """
142- for name in options :
143- assert name in ("delete" ,)
144-
145- def itemcallback (req ):
146- self ._receivequeue .put ((channel , req ))
147-
148- channel = gateway .remote_exec (execnet .rsync_remote )
149- channel .reconfigure (py2str_as_py3str = False , py3str_as_py2str = False )
150- channel .setcallback (itemcallback , endmarker = None )
151- channel .send ((str (destdir ), options ))
146+ channel = gateway .remote_exec (
147+ execnet .rsync_remote .serve_rsync , destdir = str (destdir ), delete = delete
148+ )
152149 self ._channels [channel ] = finishedcallback
153150
154151 def _broadcast (self , msg ):
0 commit comments