Skip to content

Commit bf3d67e

Browse files
committed
reliably send msg from client to server when use HTTP
1 parent 8d4fe46 commit bf3d67e

File tree

3 files changed

+175
-55
lines changed

3 files changed

+175
-55
lines changed

pywebio/platform/adaptor/http.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import threading
1515
import time
1616
from contextlib import contextmanager
17-
from typing import Dict, Optional
17+
from typing import Dict, Optional, List
1818
from collections import deque
1919

2020
from ..page import make_applications, render_page
@@ -59,16 +59,6 @@ def request_body(self) -> bytes:
5959
"""
6060
return b''
6161

62-
def request_json(self) -> Optional[Dict]:
63-
"""返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None
64-
Return the data (json deserialization) of the currently requested, if the data is not in json format, return None"""
65-
try:
66-
if self.request_headers().get('content-type') == 'application/octet-stream':
67-
return deserialize_binary_event(self.request_body())
68-
return json.loads(self.request_body())
69-
except Exception:
70-
return None
71-
7262
def set_header(self, name, value):
7363
"""为当前响应设置header
7464
Set a header for the current response"""
@@ -109,7 +99,7 @@ def __init__(self, session: Session, message_window: int = 4):
10999
self.messages = deque()
110100
self.window_size = message_window
111101
self.min_msg_id = 0 # the id of the first message in the window
112-
self.next_event_id = 0
102+
self.finished_event_id = -1 # the id of the last finished event
113103

114104
@staticmethod
115105
def close_message(ack):
@@ -118,6 +108,20 @@ def close_message(ack):
118108
seq=ack + 1
119109
)
120110

111+
def push_event(self, events: List[Dict], seq: int) -> int:
112+
"""Send client events to the session and return the success message count"""
113+
if not events:
114+
return 0
115+
116+
submit_cnt = 0
117+
for eid, event in enumerate(events, start=seq):
118+
if eid > self.finished_event_id:
119+
self.finished_event_id = eid # todo: use lock for check and set operation
120+
self.session.send_client_event(event)
121+
submit_cnt += 1
122+
123+
return submit_cnt
124+
121125
def get_response(self, ack=0):
122126
"""
123127
ack num is the number of messages that the client has received.
@@ -134,7 +138,8 @@ def get_response(self, ack=0):
134138

135139
return dict(
136140
commands=list(self.messages),
137-
seq=self.min_msg_id
141+
seq=self.min_msg_id,
142+
ack=self.finished_event_id
138143
)
139144

140145

@@ -179,6 +184,7 @@ def _remove_expired_sessions(cls, session_expire_seconds):
179184
if session:
180185
session.close(nonblock=True)
181186
del cls._webio_sessions[sid]
187+
del cls._webio_transports[sid]
182188

183189
@classmethod
184190
def _remove_webio_session(cls, sid):
@@ -234,6 +240,14 @@ def get_cdn(self, context):
234240
return False
235241
return self.cdn
236242

243+
def read_event_data(self, context: HttpContext) -> List[Dict]:
244+
try:
245+
if context.request_headers().get('content-type') == 'application/octet-stream':
246+
return [deserialize_binary_event(context.request_body())]
247+
return json.loads(context.request_body())
248+
except Exception:
249+
return []
250+
237251
@contextmanager
238252
def handle_request_context(self, context: HttpContext):
239253
"""called when every http request"""
@@ -304,8 +318,10 @@ def handle_request_context(self, context: HttpContext):
304318
webio_session = cls._webio_sessions[webio_session_id]
305319

306320
if context.request_method() == 'POST': # client push event
307-
if context.request_json() is not None:
308-
webio_session.send_client_event(context.request_json())
321+
seq = int(context.request_url_parameter('seq', 0))
322+
event_data = self.read_event_data(context)
323+
submit_cnt = cls._webio_transports[webio_session_id].push_event(event_data, seq)
324+
if submit_cnt > 0:
309325
yield type(self).WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <---
310326
elif context.request_method() == 'GET': # client pull messages
311327
pass

webiojs/src/session.ts

Lines changed: 66 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import {error_alert, randomid} from "./utils";
1+
import {error_alert, randomid, ReliableSender} from "./utils";
22
import {state} from "./state";
33
import {t} from "./i18n";
44

@@ -181,6 +181,7 @@ export class HttpSession implements Session {
181181
webio_session_id: string = '';
182182
debug = false;
183183

184+
private sender: ReliableSender = null;
184185
private _executed_command_msg_id = -1;
185186
private _closed = false;
186187
private _session_create_callbacks: (() => void)[] = [];
@@ -193,6 +194,7 @@ export class HttpSession implements Session {
193194
let url = new URL(api_url, window.location.href);
194195
url.search = "?app=" + app_name;
195196
this.api_url = url.href;
197+
this.sender = new ReliableSender(this._send.bind(this));
196198
}
197199

198200
on_session_create(callback: () => void): void {
@@ -224,21 +226,21 @@ export class HttpSession implements Session {
224226
contentType: "application/json; charset=utf-8",
225227
dataType: "json",
226228
headers: {"webio-session-id": this.webio_session_id},
227-
success: function (data: { commands: Command[][], seq: number, event: number },
229+
success: function (data: { commands: Command[][], seq: number, event: number, ack: number },
228230
textStatus: string, jqXHR: JQuery.jqXHR) {
229231
safe_poprun_callbacks(that._session_create_callbacks, 'session_create_callback');
230232
that._on_request_success(data, textStatus, jqXHR);
231-
if(that.webio_session_id.startsWith("NEW-")){
233+
if (that.webio_session_id.startsWith("NEW-")) {
232234
that.webio_session_id = that.webio_session_id.substring(4);
233235
}
234-
},
235-
error: function () {
236-
console.error('Http pulling failed');
237236
}
238237
})
239238
}
240239

241-
private _on_request_success(data: { commands: Command[][], seq: number }, textStatus: string, jqXHR: JQuery.jqXHR) {
240+
private _on_request_success(data: { commands: Command[][], seq: number, ack: number },
241+
textStatus: string, jqXHR: JQuery.jqXHR) {
242+
this.sender.ack(data.ack);
243+
242244
let msg_start_idx = this._executed_command_msg_id - data.seq + 1;
243245
if (data.commands.length <= msg_start_idx)
244246
return;
@@ -258,54 +260,78 @@ export class HttpSession implements Session {
258260

259261
send_message(msg: ClientEvent, onprogress?: (loaded: number, total: number) => void): void {
260262
if (this.debug) console.info('<<<', msg);
261-
this._send({
262-
data: JSON.stringify(msg),
263-
contentType: "application/json; charset=utf-8",
264-
}, onprogress);
263+
this.sender.add_send_task({
264+
data: msg,
265+
json: true,
266+
onprogress: onprogress,
267+
})
265268
}
266269

267270
send_buffer(data: Blob, onprogress?: (loaded: number, total: number) => void): void {
268271
if (this.debug) console.info('<<< Blob data...');
269-
this._send({
272+
this.sender.add_send_task({
270273
data: data,
271-
cache: false,
272-
processData: false,
273-
contentType: 'application/octet-stream',
274-
}, onprogress);
274+
json: false,
275+
onprogress: onprogress,
276+
}, false)
275277
}
276278

277-
_send(options: { [key: string]: any; }, onprogress?: (loaded: number, total: number) => void): void {
278-
if (this.closed())
279-
return error_alert(t("disconnected_with_server"));
280-
281-
$.ajax({
282-
...options,
283-
type: "POST",
284-
url: `${this.api_url}&ack=${this._executed_command_msg_id}`,
285-
dataType: "json",
286-
headers: {"webio-session-id": this.webio_session_id},
287-
success: this._on_request_success.bind(this),
288-
xhr: function () {
289-
let xhr = new window.XMLHttpRequest();
290-
// Upload progress
291-
xhr.upload.addEventListener("progress", function (evt) {
292-
if (evt.lengthComputable && onprogress) {
293-
onprogress(evt.loaded, evt.total);
294-
}
295-
}, false);
296-
return xhr;
297-
},
298-
error: function () {
299-
console.error('Http push blob data failed');
300-
error_alert(t("connect_fail"));
279+
_send(params: { [key: string]: any; }[], seq: number): Promise<void> {
280+
if (this.closed()) {
281+
this.sender.stop();
282+
error_alert(t("disconnected_with_server"));
283+
return Promise.reject();
284+
}
285+
let data: any, ajax_options: any;
286+
let json = params.some(p => p.json);
287+
if (json) {
288+
data = JSON.stringify(params.map(p => p.data));
289+
ajax_options = {
290+
contentType: "application/json; charset=utf-8",
301291
}
292+
} else {
293+
data = params[0].data;
294+
ajax_options = {
295+
cache: false,
296+
processData: false,
297+
contentType: 'application/octet-stream',
298+
}
299+
}
300+
return new Promise((resolve, reject) => {
301+
$.ajax({
302+
data: data,
303+
...ajax_options,
304+
type: "POST",
305+
url: `${this.api_url}&ack=${this._executed_command_msg_id}&seq=${seq}`,
306+
dataType: "json",
307+
headers: {"webio-session-id": this.webio_session_id},
308+
success: this._on_request_success.bind(this),
309+
xhr: function () {
310+
let xhr = new window.XMLHttpRequest();
311+
// Upload progress
312+
xhr.upload.addEventListener("progress", function (evt) {
313+
if (evt.lengthComputable) {
314+
params.forEach(p => {
315+
if (p.onprogress) // only the first one
316+
p.onprogress(evt.loaded, evt.total);
317+
p.onprogress = null;
318+
});
319+
}
320+
}, false);
321+
return xhr;
322+
},
323+
error: function () {
324+
console.error('Http push event failed, will retry');
325+
}
326+
}).always(() => resolve());
302327
});
303328
}
304329

305330
close_session(): void {
306331
this._closed = true;
307332
safe_poprun_callbacks(this._session_close_callbacks, 'session_close_callback');
308333
clearInterval(this.interval_pull_id);
334+
this.sender.stop();
309335
}
310336

311337
closed(): boolean {

webiojs/src/utils.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,4 +183,82 @@ export function is_mobile() {
183183
if (navigator.userAgentData) return navigator.userAgentData.mobile;
184184
const ipadOS = (navigator.platform === 'MacIntel' && navigator.maxTouchPoints > 1); /* iPad OS 13 */
185185
return /android|webos|iphone|ipad|ipod|blackberry|iemobile|opera mini/i.test(navigator.userAgent.toLowerCase()) || ipadOS;
186+
}
187+
188+
// put send task to a queue and run it one by one
189+
export class ReliableSender {
190+
private seq = 0;
191+
private queue: { enable_batch: boolean, param: any }[] = [];
192+
private send_running = false
193+
private _stop = false;
194+
195+
constructor(
196+
private readonly sender: (params: any[], seq: number) => Promise<void>,
197+
private window_size: number = 8,
198+
init_seq = 0, private timeout = 2000
199+
) {
200+
this.sender = sender;
201+
this.window_size = window_size;
202+
this.timeout = timeout;
203+
this.seq = init_seq;
204+
this.queue = [];
205+
}
206+
207+
/*
208+
* for continuous batch_send tasks in queue, they will be sent in one sender, the sending will retry when it finished or timeout.
209+
* for non-batch task, each will be sent in a single sender, the sending will retry when it finished.
210+
* */
211+
add_send_task(param: any, allow_batch_send = true) {
212+
if (this._stop) return;
213+
this.queue.push({
214+
enable_batch: allow_batch_send,
215+
param: param
216+
});
217+
if (!this.send_running)
218+
this.start_send();
219+
}
220+
221+
private start_send() {
222+
if (this._stop || this.queue.length === 0) {
223+
this.send_running = false;
224+
return;
225+
}
226+
this.send_running = true;
227+
let params: any[] = [];
228+
for (let item of this.queue) {
229+
if (!item.enable_batch)
230+
break;
231+
params.push(item.param);
232+
}
233+
let batch_send = true;
234+
if (params.length === 0 && !this.queue[0].enable_batch) {
235+
batch_send = false;
236+
params.push(this.queue[0].param);
237+
}
238+
if (params.length === 0) {
239+
this.send_running = false;
240+
return;
241+
}
242+
243+
let promises = [this.sender(params, this.seq)];
244+
if (batch_send)
245+
promises.push(new Promise((resolve) => setTimeout(resolve, this.timeout)));
246+
247+
Promise.race(promises).then(() => {
248+
this.start_send();
249+
});
250+
}
251+
252+
// seq for each ack call must be larger than the previous one, otherwise the ack will be ignored
253+
ack(seq: number) {
254+
if (seq < this.seq)
255+
return;
256+
let pop_count = seq - this.seq + 1;
257+
this.queue = this.queue.slice(pop_count);
258+
this.seq = seq + 1;
259+
}
260+
261+
stop() {
262+
this._stop = true;
263+
}
186264
}

0 commit comments

Comments
 (0)