Skip to content

Commit 483e085

Browse files
author
Dan
committed
WIP - try and fix event loop block on embedded server proxy connections. Added invalid host args when using dict
1 parent 7c13c0f commit 483e085

File tree

4 files changed

+103
-50
lines changed

4 files changed

+103
-50
lines changed

embedded_server/embedded_server.py

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,13 @@
3838
*Warning* - Note that commands, with or without a shell, are actually run on the system running this server. Destructive commands will affect the system as permissions of user running the server allow. **Use at your own risk**.
3939
"""
4040

41+
from gipc import start_process
42+
from multiprocessing import Process
4143
import sys
4244
if 'threading' in sys.modules:
4345
del sys.modules['threading']
4446
from gevent import monkey
4547
monkey.patch_all()
46-
from multiprocessing import Process
4748
import os
4849
import gevent
4950
from gevent import socket
@@ -53,9 +54,11 @@
5354
import logging
5455
import paramiko
5556
import time
57+
import gevent.subprocess
58+
import gevent.hub
59+
5660
from .stub_sftp import StubSFTPServer
5761
from .tunnel import Tunneler
58-
import gevent.subprocess
5962

6063
logger = logging.getLogger("embedded_server")
6164
paramiko_logger = logging.getLogger('paramiko.transport')
@@ -86,16 +89,16 @@ def __init__(self, host_key, fail_auth=False,
8689
ssh_exception=False,
8790
socket=None,
8891
port=0,
89-
listen_ip='127.0.0.1',
92+
host='127.0.0.1',
9093
timeout=None):
9194
if not socket:
92-
self.socket = make_socket(listen_ip, port)
95+
self.socket = make_socket(host, port)
9396
if not self.socket:
9497
msg = "Could not establish listening connection on %s:%s"
95-
logger.error(msg, listen_ip, port)
96-
raise Exception(msg, listen_ip, port)
97-
self.listen_ip = listen_ip
98-
self.listen_port = self.socket.getsockname()[1]
98+
logger.error(msg, host, port)
99+
raise Exception(msg, host, port)
100+
self.host = host
101+
self.port = self.socket.getsockname()[1]
99102
self.event = Event()
100103
self.fail_auth = fail_auth
101104
self.ssh_exception = ssh_exception
@@ -105,15 +108,20 @@ def __init__(self, host_key, fail_auth=False,
105108

106109
def start_listening(self):
107110
try:
111+
gevent.sleep(0)
108112
self.socket.listen(100)
109-
logger.info('Listening for connection on %s:%s..', self.listen_ip,
110-
self.listen_port)
113+
logger.info('Listening for connection on %s:%s..', self.host,
114+
self.port)
111115
except Exception as e:
112116
logger.error('*** Listen failed: %s' % (str(e),))
113117
traceback.print_exc()
114118
raise
119+
gevent.sleep()
115120
conn, addr = self.socket.accept()
121+
gevent.sleep(.2)
116122
logger.info('Got connection..')
123+
# import ipdb; ipdb.set_trace()
124+
gevent.sleep(.2)
117125
if self.timeout:
118126
logger.debug("SSH server sleeping for %s then raising socket.timeout",
119127
self.timeout)
@@ -123,21 +131,26 @@ def start_listening(self):
123131
self.transport.add_server_key(self.host_key)
124132
self.transport.set_subsystem_handler('sftp', paramiko.SFTPServer,
125133
StubSFTPServer)
134+
gevent.sleep()
126135
try:
127136
self.transport.start_server(server=self)
128137
except paramiko.SSHException as e:
129138
logger.exception('SSH negotiation failed')
130139
raise
140+
gevent.sleep(0)
131141

132142
def run(self):
133143
while True:
134144
try:
135145
self.start_listening()
146+
gevent.sleep(0)
136147
except Exception:
137148
logger.exception("Error occured starting server")
138149
continue
150+
gevent.sleep(0)
139151
try:
140152
self.accept_connections()
153+
gevent.sleep(0)
141154
except Exception as e:
142155
logger.error('*** Caught exception: %s: %s' % (str(e.__class__), str(e),))
143156
traceback.print_exc()
@@ -152,8 +165,10 @@ def accept_connections(self):
152165
gevent.sleep(0)
153166
channel = self.transport.accept(20)
154167
if not channel:
155-
logger.error("Could not establish channel")
156-
return
168+
logger.error("Could not establish channel on %s:%s",
169+
self.host, self.port)
170+
gevent.sleep(0)
171+
continue
157172
while self.transport.is_active():
158173
logger.debug("Transport active, waiting..")
159174
gevent.sleep(1)
@@ -190,21 +205,28 @@ def check_channel_pty_request(self, channel, term, width, height, pixelwidth,
190205
return True
191206

192207
def check_channel_direct_tcpip_request(self, chanid, origin, destination):
208+
# import ipdb; ipdb.set_trace()
193209
logger.debug("Proxy connection %s -> %s requested", origin, destination,)
194210
extra = {'username' : self.transport.get_username()}
195211
logger.debug("Starting proxy connection %s -> %s",
196212
origin, destination, extra=extra)
213+
self.event.set()
197214
try:
198-
tunnel = Tunneler(destination, self.transport, chanid)
215+
gevent.sleep(.2)
216+
tunnel = Process(target=Tunneler, args=(destination, self.transport, chanid,))
217+
tunnel.daemon = True
199218
tunnel.start()
219+
gevent.sleep(.2)
200220
except Exception as ex:
201221
logger.error("Error creating proxy connection to %s - %s",
202222
destination, ex,)
203223
return paramiko.OPEN_FAILED_CONNECT_FAILED
224+
gevent.sleep(2)
204225
return paramiko.OPEN_SUCCEEDED
205226

206227
def check_channel_forward_agent_request(self, channel):
207228
logger.debug("Forward agent key request for channel %s" % (channel,))
229+
gevent.sleep(0)
208230
return True
209231

210232
def check_channel_exec_request(self, channel, cmd,
@@ -217,6 +239,7 @@ def check_channel_exec_request(self, channel, cmd,
217239
stdin=gevent.subprocess.PIPE,
218240
shell=True, env=_env)
219241
gevent.spawn(self._read_response, channel, process)
242+
gevent.sleep(0)
220243
return True
221244

222245
def _read_response(self, channel, process):
@@ -230,6 +253,7 @@ def _read_response(self, channel, process):
230253
# Let clients consume output from channel before closing
231254
gevent.sleep(.1)
232255
channel.close()
256+
gevent.sleep(0)
233257

234258
def make_socket(listen_ip, port=0):
235259
"""Make socket on given address and available port chosen by OS"""
@@ -246,21 +270,33 @@ def make_socket(listen_ip, port=0):
246270
def start_server(listen_ip, fail_auth=False, ssh_exception=False,
247271
timeout=None,
248272
listen_port=0):
249-
server = Server(host_key, listen_ip=listen_ip, port=listen_port,
273+
# gevent.reinit()
274+
gevent.hub.reinit()
275+
# h.destroy(destroy_loop=True)
276+
# h = gevent.hub.Hub()
277+
# h.NOT_ERROR = (Exception,)
278+
# gevent.hub.set_hub(h)
279+
server = Server(host_key, host=listen_ip, port=listen_port,
250280
fail_auth=fail_auth, ssh_exception=ssh_exception,
251281
timeout=timeout)
252282
try:
253283
server.run()
254284
except KeyboardInterrupt:
255285
sys.exit(0)
286+
# listen_process = Process(target=server.run)
287+
# listen_process.start()
288+
# listen_process.join()
256289

257290
def start_server_process(listen_ip, fail_auth=False, ssh_exception=False,
258291
timeout=None, listen_port=0):
292+
gevent.reinit()
259293
server = Process(target=start_server, args=(listen_ip,),
260294
kwargs={
261295
'listen_port': listen_port,
262296
'fail_auth': fail_auth,
263297
'ssh_exception': ssh_exception,
264298
'timeout': timeout,
265299
})
300+
server.start()
301+
gevent.sleep(.2)
266302
return server

embedded_server/tunnel.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
class Tunneler(gevent.Greenlet):
3030
def __init__(self, address, transport, chanid):
3131
gevent.Greenlet.__init__(self)
32+
gevent.sleep(.2)
3233
self.socket = socket.create_connection(address)
3334
self.transport = transport
3435
self.chanid = chanid
36+
gevent.sleep(0)
3537

3638
def close(self):
3739
try:
@@ -50,13 +52,16 @@ def tunnel(self, dest_socket, source_chan):
5052
response_data = dest_socket.recv(1024)
5153
source_chan.sendall(response_data)
5254
logger.debug("Tunnel sent data..")
53-
gevent.sleep(0)
55+
gevent.sleep(.1)
5456
finally:
5557
source_chan.close()
5658
dest_socket.close()
59+
gevent.sleep(0)
5760

5861
def run(self):
62+
gevent.sleep(.2)
5963
channel = self.transport.accept(20)
64+
gevent.sleep(0)
6065
if not channel:
6166
return
6267
if not channel.get_id() == self.chanid:
@@ -69,3 +74,4 @@ def run(self):
6974
except Exception as ex:
7075
logger.exception("Got exception creating tunnel - %s", ex,)
7176
logger.debug("Finished tunneling")
77+
gevent.sleep(0)

pssh/pssh_client.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -369,13 +369,19 @@ def run_command(self, *args, **kwargs):
369369
:rtype: Dictionary with host as key as per \
370370
:mod:`pssh.pssh_client.ParallelSSHClient.get_output`
371371
372-
:raises: :mod:`pssh.exceptions.AuthenticationException` on authentication error
373-
:raises: :mod:`pssh.exceptions.UnknownHostException` on DNS resolution error
374-
:raises: :mod:`pssh.exceptions.ConnectionErrorException` on error connecting
375-
:raises: :mod:`pssh.exceptions.SSHException` on other undefined SSH errors
376-
:raises: :mod:`pssh.exceptions.HostArgumentException` on number of host \
377-
arguments not equal to number of hosts
372+
:raises: :mod:`pssh.exceptions.AuthenticationException` on \
373+
authentication error
374+
:raises: :mod:`pssh.exceptions.UnknownHostException` on DNS resolution \
375+
error
376+
:raises: :mod:`pssh.exceptions.ConnectionErrorException` on error \
377+
connecting
378+
:raises: :mod:`pssh.exceptions.SSHException` on other undefined SSH \
379+
errors
380+
:raises: :mod:`pssh.exceptions.HostArgumentException` on number of \
381+
host arguments not equal to number of hosts
378382
:raises: `TypeError` on not enough host arguments for cmd string format
383+
:raises: `KeyError` on no host argument key in arguments dict for cmd \
384+
string format
379385
380386
**Example Usage**
381387

tests/test_pssh_client.py

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,17 @@
2727
import warnings
2828
import shutil
2929
import sys
30-
from multiprocessing import Process
3130

31+
from embedded_server.embedded_server import start_server, make_socket, \
32+
logger as server_logger, paramiko_logger, start_server_process
33+
from embedded_server.fake_agent import FakeAgent
34+
from paramiko import RSAKey
3235
import gevent
3336
from pssh import ParallelSSHClient, UnknownHostException, \
3437
AuthenticationException, ConnectionErrorException, SSHException, \
3538
logger as pssh_logger
3639
from pssh.exceptions import HostArgumentException
3740
from pssh.utils import load_private_key
38-
from embedded_server.embedded_server import start_server, make_socket, \
39-
logger as server_logger, paramiko_logger, start_server_process
40-
from embedded_server.fake_agent import FakeAgent
41-
from paramiko import RSAKey
4241

4342
PKEY_FILENAME = os.path.sep.join([os.path.dirname(__file__), 'test_client_private_key'])
4443
USER_KEY = RSAKey.from_private_key_file(PKEY_FILENAME)
@@ -538,22 +537,24 @@ def test_ssh_proxy(self):
538537
proxy_server_port = self.make_random_port(proxy_host)
539538
proxy_server = start_server_process(proxy_host,
540539
listen_port=proxy_server_port)
541-
client = ParallelSSHClient([self.host], port=self.listen_port,
540+
client = ParallelSSHClient([self.host], port=39783,
542541
pkey=self.user_key,
543542
proxy_host=proxy_host,
544-
proxy_port=proxy_server_port
543+
proxy_port=proxy_server_port,
545544
)
546545
gevent.sleep(2)
547-
output = client.run_command(self.fake_cmd)
548-
gevent.sleep(.2)
549-
stdout = list(output[self.host]['stdout'])
550-
expected_stdout = [self.fake_resp]
551-
self.assertEqual(expected_stdout, stdout,
552-
msg="Got unexpected stdout - %s, expected %s" %
553-
(stdout,
554-
expected_stdout,))
555-
del client
556-
proxy_server.terminate()
546+
try:
547+
output = client.run_command(self.fake_cmd)
548+
gevent.sleep(1)
549+
stdout = list(output[self.host]['stdout'])
550+
expected_stdout = [self.fake_resp]
551+
self.assertEqual(expected_stdout, stdout,
552+
msg="Got unexpected stdout - %s, expected %s" %
553+
(stdout,
554+
expected_stdout,))
555+
finally:
556+
del client
557+
proxy_server.terminate()
557558

558559
def test_ssh_proxy_auth(self):
559560
"""Test connecting to remote destination via SSH proxy
@@ -589,18 +590,19 @@ def test_ssh_proxy_auth(self):
589590
proxy_password)
590591
self.assertTrue(client.host_clients[self.host].proxy_pkey)
591592
finally:
593+
del client
592594
proxy_server.terminate()
593595

594596
def test_ssh_proxy_auth_fail(self):
595597
"""Test failures while connecting via proxy"""
596-
listen_socket = make_socket(self.host)
597-
listen_port = listen_socket.getsockname()[1]
598-
self.server.kill()
599-
server = start_server(listen_socket,
600-
fail_auth=True)
601-
proxy_server_socket = make_socket('127.0.0.2')
602-
proxy_server_port = proxy_server_socket.getsockname()[1]
603-
proxy_server = start_server(proxy_server_socket)
598+
# listen_socket = make_socket(self.host)
599+
proxy_host = '127.0.0.2'
600+
listen_port = self.make_random_port()
601+
server = start_server_process(self.host, listen_port=listen_port,
602+
fail_auth=True)
603+
proxy_server_port = self.make_random_port(host=proxy_host)
604+
proxy_server = start_server_process(proxy_host,
605+
listen_port=proxy_server_port)
604606
proxy_user = 'proxy_user'
605607
proxy_password = 'fake'
606608
gevent.sleep(2)
@@ -611,12 +613,15 @@ def test_ssh_proxy_auth_fail(self):
611613
proxy_user=proxy_user,
612614
proxy_password='fake',
613615
proxy_pkey=self.user_key,
616+
num_retries=1,
614617
)
615618
gevent.sleep(2)
616-
self.assertRaises(AuthenticationException, client.run_command, self.fake_cmd)
617-
del client
618-
server.kill()
619-
proxy_server.kill()
619+
try:
620+
self.assertRaises(AuthenticationException, client.run_command, self.fake_cmd)
621+
finally:
622+
del client
623+
server.terminate()
624+
proxy_server.terminate()
620625

621626
def test_bash_variable_substitution(self):
622627
"""Test bash variables work correctly"""

0 commit comments

Comments
 (0)