Skip to content

Commit 51c711c

Browse files
author
Dan
committed
Cleaned up embedded server, tunnel and fixed race condition. Updated tests for embedded server changes
1 parent f6c06a4 commit 51c711c

File tree

3 files changed

+175
-214
lines changed

3 files changed

+175
-214
lines changed

embedded_server/embedded_server.py

Lines changed: 118 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -23,37 +23,53 @@
2323
Implements:
2424
* Execution of commands via exec_command
2525
* Public key and password auth
26-
* Direct TCP tunneling
26+
* Direct TCP tunneling (port forwarding)
2727
* SSH agent forwarding
2828
* Stub SFTP server from Paramiko
2929
* Forced authentication failure
30+
* Forced server timeout for connection timeout simulation
3031
31-
Does _not_ support interactive shells, our clients do not use them.
32+
Does _not_ support interactive shells - it is intended for purely API driven use.
3233
33-
Server private key is hardcoded. Server listen code inspired by demo_server.py in
34-
Paramiko repository.
34+
An embedded private key is provided as `embedded_server.host_key` and may be overriden
3535
36-
Server runs asynchronously in its own greenlet. Call `start_server` with a new `multiprocessing.Process` to run it on a new process with its own event loop.
36+
Server runs asynchronously in its own greenlet.
3737
3838
*Warning* - Note that commands, with or without a shell, are actually run on the system running this server. Destructive commands will affect the system as permissions of user running the server allow. **Use at your own risk**.
39+
40+
Example Usage
41+
===============
42+
43+
from embedded_server import start_server, start_server_from_ip, make_socket
44+
45+
Make server from existing socket
46+
----------------------------------
47+
48+
socket = make_socket('127.0.0.1')
49+
server = start_server(socket)
50+
51+
Make server from IP and optionally port
52+
-----------------------------------------
53+
54+
server, listen_port = start_server_from_ip('127.0.0.1')
55+
other_server, _ = start_server_from_ip('127.0.0.1', port=1234)
3956
"""
4057

41-
# from gipc import start_process
42-
from multiprocessing import Process
4358
import sys
4459
if 'threading' in sys.modules:
4560
del sys.modules['threading']
4661
from gevent import monkey
4762
monkey.patch_all()
63+
4864
import os
4965
import gevent
5066
from gevent import socket
5167
from gevent.event import Event
5268
import sys
5369
import traceback
5470
import logging
55-
import paramiko
5671
import time
72+
import paramiko
5773
import gevent.subprocess
5874
import gevent.hub
5975

@@ -68,9 +84,10 @@
6884

6985
class Server(paramiko.ServerInterface):
7086
"""Implements :mod:`paramiko.ServerInterface` to provide an
71-
embedded SSH server implementation.
87+
embedded SSH2 server implementation.
7288
73-
Start a `Server` with at least a host private key.
89+
Start a `Server` with at least a :mod:`paramiko.Transport` object
90+
and a host private key.
7491
7592
Any SSH2 client with public key or password authentication
7693
is allowed, only. Interactive shell requests are not accepted.
@@ -85,97 +102,17 @@ class Server(paramiko.ServerInterface):
85102
* Interactive shell requests
86103
"""
87104

88-
def __init__(self, host_key, fail_auth=False,
89-
ssh_exception=False,
90-
socket=None,
91-
port=0,
92-
host='127.0.0.1',
93-
timeout=None):
94-
if not socket:
95-
self.socket = make_socket(host, port)
96-
if not self.socket:
97-
msg = "Could not establish listening connection on %s:%s"
98-
logger.error(msg, host, port)
99-
raise Exception(msg, host, port)
100-
self.host = host
101-
self.port = self.socket.getsockname()[1]
105+
def __init__(self, transport, host_key, fail_auth=False,
106+
ssh_exception=False):
107+
paramiko.ServerInterface.__init__(self)
108+
transport.load_server_moduli()
109+
transport.add_server_key(host_key)
110+
transport.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer)
111+
self.transport = transport
102112
self.event = Event()
103113
self.fail_auth = fail_auth
104114
self.ssh_exception = ssh_exception
105115
self.host_key = host_key
106-
self.transport = None
107-
self.timeout = timeout
108-
109-
def start_listening(self):
110-
try:
111-
gevent.sleep(0)
112-
self.socket.listen(100)
113-
logger.info('Listening for connection on %s:%s..', self.host,
114-
self.port)
115-
except Exception as e:
116-
logger.error('*** Listen failed: %s' % (str(e),))
117-
traceback.print_exc()
118-
raise
119-
gevent.sleep()
120-
conn, addr = self.socket.accept()
121-
gevent.sleep(.2)
122-
logger.info('Got connection..')
123-
# import ipdb; ipdb.set_trace()
124-
gevent.sleep(.2)
125-
if self.timeout:
126-
logger.debug("SSH server sleeping for %s then raising socket.timeout",
127-
self.timeout)
128-
gevent.Timeout(self.timeout).start()
129-
self.transport = paramiko.Transport(conn)
130-
self.transport.load_server_moduli()
131-
self.transport.add_server_key(self.host_key)
132-
self.transport.set_subsystem_handler('sftp', paramiko.SFTPServer,
133-
StubSFTPServer)
134-
gevent.sleep()
135-
try:
136-
self.transport.start_server(server=self)
137-
except paramiko.SSHException as e:
138-
logger.exception('SSH negotiation failed')
139-
raise
140-
gevent.sleep(0)
141-
142-
def run(self):
143-
while True:
144-
try:
145-
self.start_listening()
146-
gevent.sleep(0)
147-
except Exception:
148-
logger.exception("Error occured starting server")
149-
continue
150-
gevent.sleep(0)
151-
try:
152-
self.accept_connections()
153-
gevent.sleep(0)
154-
except Exception as e:
155-
logger.error('*** Caught exception: %s: %s' % (str(e.__class__), str(e),))
156-
traceback.print_exc()
157-
try:
158-
self.transport.close()
159-
except Exception:
160-
pass
161-
raise
162-
163-
def accept_connections(self):
164-
while True:
165-
gevent.sleep(0)
166-
channel = self.transport.accept(20)
167-
if not channel:
168-
logger.error("Could not establish channel on %s:%s",
169-
self.host, self.port)
170-
gevent.sleep(0)
171-
continue
172-
while self.transport.is_active():
173-
logger.debug("Transport active, waiting..")
174-
gevent.sleep(1)
175-
while not channel.send_ready():
176-
gevent.sleep(.2)
177-
channel.close()
178-
gevent.sleep(0)
179116

180117
def check_channel_request(self, kind, chanid):
181118
return paramiko.OPEN_SUCCEEDED
@@ -205,28 +142,25 @@ def check_channel_pty_request(self, channel, term, width, height, pixelwidth,
205142
return True
206143

207144
def check_channel_direct_tcpip_request(self, chanid, origin, destination):
208-
# import ipdb; ipdb.set_trace()
209145
logger.debug("Proxy connection %s -> %s requested", origin, destination,)
210146
extra = {'username' : self.transport.get_username()}
211147
logger.debug("Starting proxy connection %s -> %s",
212148
origin, destination, extra=extra)
213-
self.event.set()
214149
try:
215-
gevent.sleep(.2)
216-
tunnel = Process(target=Tunneler, args=(destination, self.transport, chanid,))
217-
tunnel.daemon = True
150+
tunnel = Tunneler(destination, self.transport, chanid)
218151
tunnel.start()
219-
gevent.sleep(.2)
220152
except Exception as ex:
221153
logger.error("Error creating proxy connection to %s - %s",
222154
destination, ex,)
223155
return paramiko.OPEN_FAILED_CONNECT_FAILED
224-
gevent.sleep(2)
156+
self.event.set()
157+
gevent.sleep()
158+
logger.debug("Proxy connection started")
225159
return paramiko.OPEN_SUCCEEDED
226160

227161
def check_channel_forward_agent_request(self, channel):
228162
logger.debug("Forward agent key request for channel %s" % (channel,))
229-
gevent.sleep(0)
163+
gevent.sleep()
230164
return True
231165

232166
def check_channel_exec_request(self, channel, cmd,
@@ -261,42 +195,87 @@ def make_socket(listen_ip, port=0):
261195
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
262196
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
263197
sock.bind((listen_ip, port))
264-
except Exception as e:
265-
logger.error('Failed to bind to address - %s' % (str(e),))
198+
except Exception as ex:
199+
logger.error('Failed to bind to address - %s', ex)
266200
traceback.print_exc()
267201
return
268202
return sock
269203

270-
def start_server(listen_ip, fail_auth=False, ssh_exception=False,
271-
timeout=None,
272-
listen_port=0):
273-
# gevent.reinit()
274-
gevent.hub.reinit()
275-
# h.destroy(destroy_loop=True)
276-
# h = gevent.hub.Hub()
277-
# h.NOT_ERROR = (Exception,)
278-
# gevent.hub.set_hub(h)
279-
server = Server(host_key, host=listen_ip, port=listen_port,
280-
fail_auth=fail_auth, ssh_exception=ssh_exception,
281-
timeout=timeout)
204+
def listen(sock, fail_auth=False, ssh_exception=False,
205+
timeout=None):
206+
"""Run server and given a cmd_to_run, send given
207+
response to client connection. Returns (server, socket) tuple
208+
where server is a joinable server thread and socket is listening
209+
socket of server.
210+
"""
211+
# sock = make_socket(ip, port=port)
212+
try:
213+
sock.listen(100)
214+
except Exception as e:
215+
logger.error('*** Listen failed: %s' % (str(e),))
216+
traceback.print_exc()
217+
return
218+
host, port = sock.getsockname()
219+
logger.info('Listening for connection on %s:%s..', host, port)
220+
return handle_ssh_connection(sock, fail_auth=fail_auth,
221+
timeout=timeout, ssh_exception=ssh_exception)
222+
223+
def _handle_ssh_connection(transport, fail_auth=False,
224+
ssh_exception=False):
225+
server = Server(transport, host_key,
226+
fail_auth=fail_auth, ssh_exception=ssh_exception)
282227
try:
283-
server.run()
284-
except KeyboardInterrupt:
285-
sys.exit(0)
286-
# listen_process = Process(target=server.run)
287-
# listen_process.start()
288-
# listen_process.join()
289-
290-
def start_server_process(listen_ip, fail_auth=False, ssh_exception=False,
291-
timeout=None, listen_port=0):
292-
gevent.reinit()
293-
server = Process(target=start_server, args=(listen_ip,),
294-
kwargs={
295-
'listen_port': listen_port,
296-
'fail_auth': fail_auth,
297-
'ssh_exception': ssh_exception,
298-
'timeout': timeout,
299-
})
300-
server.start()
301-
gevent.sleep(.2)
302-
return server
228+
transport.start_server(server=server)
229+
except paramiko.SSHException as e:
230+
logger.exception('SSH negotiation failed')
231+
return
232+
except Exception:
233+
logger.exception("Error occured starting server")
234+
return
235+
# *Important* Allow other greenlets to execute before establishing connection
236+
# which may be handled by said other greenlets
237+
gevent.sleep(.5)
238+
channel = transport.accept(20)
239+
if not channel:
240+
logger.error("Could not establish channel")
241+
return
242+
while transport.is_active():
243+
logger.debug("Transport active, waiting..")
244+
gevent.sleep(1)
245+
while not channel.send_ready():
246+
gevent.sleep(.2)
247+
channel.close()
248+
249+
def handle_ssh_connection(sock,
250+
fail_auth=False, ssh_exception=False,
251+
timeout=None):
252+
conn, addr = sock.accept()
253+
logger.info('Got connection..')
254+
if timeout:
255+
logger.debug("SSH server sleeping for %s then raising socket.timeout",
256+
timeout)
257+
gevent.Timeout(timeout).start().get()
258+
try:
259+
transport = paramiko.Transport(conn)
260+
return _handle_ssh_connection(transport, fail_auth=fail_auth,
261+
ssh_exception=ssh_exception)
262+
except Exception as e:
263+
logger.error('*** Caught exception: %s: %s' % (str(e.__class__), str(e),))
264+
traceback.print_exc()
265+
try:
266+
transport.close()
267+
except Exception:
268+
pass
269+
270+
def start_server(sock, fail_auth=False, ssh_exception=False,
271+
timeout=None):
272+
return gevent.spawn(listen, sock, fail_auth=fail_auth,
273+
timeout=timeout, ssh_exception=ssh_exception)
274+
275+
def start_server_from_ip(ip, port=0,
276+
fail_auth=False, ssh_exception=False,
277+
timeout=None):
278+
server_sock = make_socket(ip, port=port)
279+
server = start_server(server_sock, fail_auth=fail_auth,
280+
ssh_exception=ssh_exception, timeout=timeout)
281+
return server, server_sock.getsockname()[1]

embedded_server/tunnel.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
from gevent import socket, select
2525
import logging
2626

27-
logger = logging.getLogger("fake_server")
27+
logger = logging.getLogger("embedded_server.tunnel")
2828

29-
class Tunneler(gevent.Greenlet):
29+
class Tunneler(gevent.Greenlet):
30+
3031
def __init__(self, address, transport, chanid):
3132
gevent.Greenlet.__init__(self)
3233
gevent.sleep(.2)
34+
logger.info("Tunneller creating connection -> %s", address)
3335
self.socket = socket.create_connection(address)
3436
self.transport = transport
3537
self.chanid = chanid
@@ -59,9 +61,8 @@ def tunnel(self, dest_socket, source_chan):
5961
gevent.sleep(0)
6062

6163
def run(self):
62-
gevent.sleep(.2)
64+
logger.info("Tunnel waiting for connection")
6365
channel = self.transport.accept(20)
64-
gevent.sleep(0)
6566
if not channel:
6667
return
6768
if not channel.get_id() == self.chanid:

0 commit comments

Comments
 (0)