Skip to content

Commit e00abda

Browse files
committed
use receiver window in http session
1 parent 75b4773 commit e00abda

File tree

2 files changed

+55
-25
lines changed

2 files changed

+55
-25
lines changed

pywebio/platform/adaptor/http.py

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
import time
1616
from contextlib import contextmanager
1717
from typing import Dict, Optional
18+
from collections import deque
1819

1920
from ..page import make_applications, render_page
2021
from ..utils import deserialize_binary_event
2122
from ...session import CoroutineBasedSession, ThreadBasedSession, register_session_implement_for_target
22-
from ...session.base import get_session_info_from_headers
23+
from ...session.base import get_session_info_from_headers, Session
2324
from ...utils import random_str, LRUDict, isgeneratorfunction, iscoroutinefunction, check_webio_js
2425

2526

@@ -102,6 +103,41 @@ def get_client_ip(self):
102103
_event_loop = None
103104

104105

106+
class ReliableTransport:
107+
def __init__(self, session: Session, message_window: int = 4):
108+
self.session = session
109+
self.messages = deque()
110+
self.window_size = message_window
111+
self.min_msg_id = 0 # the id of the first message in the window
112+
self.next_event_id = 0
113+
114+
@staticmethod
115+
def close_message(ack):
116+
return dict(
117+
commands=[[dict(command='close_session')]],
118+
seq=ack+1
119+
)
120+
121+
def get_response(self, ack=0):
122+
"""
123+
ack num is the number of messages that the client has received.
124+
response is a list of messages that the client should receive, along with their min id `seq`.
125+
"""
126+
while ack >= self.min_msg_id and self.messages:
127+
self.messages.popleft()
128+
self.min_msg_id += 1
129+
130+
if len(self.messages) < self.window_size:
131+
msgs = self.session.get_task_commands()
132+
if msgs:
133+
self.messages.append(msgs)
134+
135+
return dict(
136+
commands=list(self.messages),
137+
seq=self.min_msg_id
138+
)
139+
140+
105141
# todo: use lock to avoid thread race condition
106142
class HttpHandler:
107143
"""基于HTTP的后端Handler实现
@@ -112,7 +148,7 @@ class HttpHandler:
112148
113149
"""
114150
_webio_sessions = {} # WebIOSessionID -> WebIOSession()
115-
_webio_last_commands = {} # WebIOSessionID -> (last commands, commands sequence id)
151+
_webio_transports = {} # WebIOSessionID -> ReliableTransport(), type: Dict[str, ReliableTransport]
116152
_webio_expire = LRUDict() # WebIOSessionID -> last active timestamp. In increasing order of last active time
117153
_webio_expire_lock = threading.Lock()
118154

@@ -149,17 +185,6 @@ def _remove_webio_session(cls, sid):
149185
cls._webio_sessions.pop(sid, None)
150186
cls._webio_expire.pop(sid, None)
151187

152-
@classmethod
153-
def get_response(cls, sid, ack=0):
154-
commands, seq = cls._webio_last_commands.get(sid, ([], 0))
155-
if ack == seq:
156-
webio_session = cls._webio_sessions[sid]
157-
commands = webio_session.get_task_commands()
158-
seq += 1
159-
cls._webio_last_commands[sid] = (commands, seq)
160-
161-
return {'commands': commands, 'seq': seq}
162-
163188
def _process_cors(self, context: HttpContext):
164189
"""Handling cross-domain requests: check the source of the request and set headers"""
165190
origin = context.request_headers().get('Origin', '')
@@ -240,8 +265,8 @@ def handle_request_context(self, context: HttpContext):
240265
context.set_content(html)
241266
return context.get_response()
242267

268+
ack = int(context.request_url_parameter('ack', 0))
243269
webio_session_id = None
244-
245270
# 初始请求,创建新 Session
246271
if not request_headers['webio-session-id'] or request_headers['webio-session-id'] == 'NEW':
247272
if context.request_method() == 'POST': # 不能在POST请求中创建Session,防止CSRF攻击
@@ -264,9 +289,11 @@ def handle_request_context(self, context: HttpContext):
264289
session_cls = ThreadBasedSession
265290
webio_session = session_cls(application, session_info=session_info)
266291
cls._webio_sessions[webio_session_id] = webio_session
292+
cls._webio_transports[webio_session_id] = ReliableTransport(webio_session)
267293
yield type(self).WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <---
268294
elif request_headers['webio-session-id'] not in cls._webio_sessions: # WebIOSession deleted
269-
context.set_content([dict(command='close_session')], json_type=True)
295+
close_msg = ReliableTransport.close_message(ack)
296+
context.set_content(close_msg, json_type=True)
270297
return context.get_response()
271298
else:
272299
webio_session_id = request_headers['webio-session-id']
@@ -283,8 +310,8 @@ def handle_request_context(self, context: HttpContext):
283310

284311
self.interval_cleaning()
285312

286-
ack = int(context.request_url_parameter('ack', 0))
287-
context.set_content(type(self).get_response(webio_session_id, ack=ack), json_type=True)
313+
resp = cls._webio_transports[webio_session_id].get_response(ack)
314+
context.set_content(resp, json_type=True)
288315

289316
if webio_session.closed():
290317
self._remove_webio_session(webio_session_id)

webiojs/src/session.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ export class HttpSession implements Session {
181181
webio_session_id: string = 'NEW';
182182
debug = false;
183183

184-
private _executed_command_msg_id = 0;
184+
private _executed_command_msg_id = -1;
185185
private _closed = false;
186186
private _session_create_callbacks: (() => void)[] = [];
187187
private _session_close_callbacks: (() => void)[] = [];
@@ -223,7 +223,7 @@ export class HttpSession implements Session {
223223
contentType: "application/json; charset=utf-8",
224224
dataType: "json",
225225
headers: {"webio-session-id": this.webio_session_id},
226-
success: function (data: { commands: Command[], seq: number }, textStatus: string, jqXHR: JQuery.jqXHR) {
226+
success: function (data: { commands: Command[][], seq: number }, textStatus: string, jqXHR: JQuery.jqXHR) {
227227
safe_poprun_callbacks(that._session_create_callbacks, 'session_create_callback');
228228
that._on_request_success(data, textStatus, jqXHR);
229229
},
@@ -233,18 +233,21 @@ export class HttpSession implements Session {
233233
})
234234
}
235235

236-
private _on_request_success(data: { commands: Command[], seq: number }, textStatus: string, jqXHR: JQuery.jqXHR) {
237-
if (data.seq == this._executed_command_msg_id)
236+
private _on_request_success(data: { commands: Command[][], seq: number }, textStatus: string, jqXHR: JQuery.jqXHR) {
237+
let msg_start_idx = this._executed_command_msg_id - data.seq + 1;
238+
if (data.commands.length <= msg_start_idx)
238239
return;
239-
this._executed_command_msg_id = data.seq;
240+
this._executed_command_msg_id = data.seq + data.commands.length - 1;
240241

241242
let sid = jqXHR.getResponseHeader('webio-session-id');
242243
if (sid)
243244
this.webio_session_id = sid;
244245

245-
for (let msg of data.commands) {
246-
if (this.debug) console.info('>>>', msg);
247-
this._on_server_message(msg);
246+
for (let msgs of data.commands.slice(msg_start_idx)) {
247+
for (let msg of msgs) {
248+
if (this.debug) console.info('>>>', msg);
249+
this._on_server_message(msg);
250+
}
248251
}
249252
};
250253

0 commit comments

Comments
 (0)