Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 70 additions & 16 deletions examples/LargeResponse/LargeResponse.ino
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,64 @@ private:
size_t _sent = 0;
};

// Code to reproduce issues:
// - https://github.com/ESP32Async/ESPAsyncWebServer/issues/242
// - https://github.com/ESP32Async/ESPAsyncWebServer/issues/315
//
// https://github.com/ESP32Async/ESPAsyncWebServer/pull/317#issuecomment-3421141039
//
// I cracked it.
// So this is how it works:
// That space that _tcp is writing to identified by CONFIG_TCP_SND_BUF_DEFAULT (and is value-matching with default TCP windows size which is very confusing itself).
// The space returned by client()->write() and client->space() somehow might not be atomically/thread synced (had not dived that deep yet). So if first call to _fillBuffer is done via user-code thread and ended up with some small amount of data consumed and second one is done by _poll or _ack? returns full size again! This is where old code fails.
// If you change your class this way it will fail 100%.
class CustomResponseMRE : public AsyncAbstractResponse {
public:
explicit CustomResponseMRE() {
_code = 200;
_contentType = "text/plain";
_sendContentLength = false;
// add some useless headers
addHeader("Clear-Site-Data", "Clears browsing data (e.g., cookies, storage, cache) associated with the requesting website.");
addHeader(
"No-Vary-Search", "Specifies a set of rules that define how a URL's query parameters will affect cache matching. These rules dictate whether the same "
"URL with different URL parameters should be saved as separate browser cache entries"
);
}

bool _sourceValid() const override {
return true;
}

size_t _fillBuffer(uint8_t *buf, size_t buflen) override {
if (fillChar == NULL) {
fillChar = 'A';
return RESPONSE_TRY_AGAIN;
}
if (_sent == RESPONSE_TRY_AGAIN) {
Serial.println("Simulating temporary unavailability of data...");
_sent = 0;
return RESPONSE_TRY_AGAIN;
}
size_t remaining = totalResponseSize - _sent;
if (remaining == 0) {
return 0;
}
if (buflen > remaining) {
buflen = remaining;
}
Serial.printf("Filling '%c' @ sent: %u, buflen: %u\n", fillChar, _sent, buflen);
std::fill_n(buf, buflen, static_cast<uint8_t>(fillChar));
_sent += buflen;
fillChar = (fillChar == 'Z') ? 'A' : fillChar + 1;
return buflen;
}

private:
char fillChar = NULL;
size_t _sent = 0;
};

void setup() {
Serial.begin(115200);

Expand All @@ -77,14 +135,7 @@ void setup() {
//
// curl -v http://192.168.4.1/1 | grep -o '.' | sort | uniq -c
//
// Should output 16000 and the counts for each character from A to D
//
// Console:
//
// Filling 'A' @ index: 0, maxLen: 5652, toSend: 5652
// Filling 'B' @ index: 5652, maxLen: 4308, toSend: 4308
// Filling 'C' @ index: 9960, maxLen: 2888, toSend: 2888
// Filling 'D' @ index: 12848, maxLen: 3152, toSend: 3152
// Should output 16000 and a distribution of letters which is the same in ESP32 logs and console
//
server.on("/1", HTTP_GET, [](AsyncWebServerRequest *request) {
fillChar = 'A';
Expand All @@ -103,19 +154,22 @@ void setup() {
//
// curl -v http://192.168.4.1/2 | grep -o '.' | sort | uniq -c
//
// Should output 16000
//
// Console:
//
// Filling 'A' @ sent: 0, buflen: 5675
// Filling 'B' @ sent: 5675, buflen: 4308
// Filling 'C' @ sent: 9983, buflen: 5760
// Filling 'D' @ sent: 15743, buflen: 257
// Should output 16000 and a distribution of letters which is the same in ESP32 logs and console
//
server.on("/2", HTTP_GET, [](AsyncWebServerRequest *request) {
request->send(new CustomResponse());
});

// Example to use a AsyncAbstractResponse
//
// curl -v http://192.168.4.1/3 | grep -o '.' | sort | uniq -c
//
// Should output 16000 and a distribution of letters which is the same in ESP32 logs and console
//
server.on("/3", HTTP_GET, [](AsyncWebServerRequest *request) {
request->send(new CustomResponseMRE());
});

server.begin();
}

Expand Down
5 changes: 5 additions & 0 deletions examples/PerfTests/PerfTests.ino
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ void setup() {
//
// time curl -N -v -G -d 'd=2000' -d 'l=10000' http://192.168.4.1/slow.html --output -
//
// THIS CODE WILL CRASH BECAUSE OF THE WATCHDOG.
// IF YOU REALLY NEED TO DO THIS, YOU MUST DISABLE THE TWDT
//
// CORRECT WAY IS TO USE SSE OR WEBSOCKETS TO DO THE COSTLY PROCESSING ASYNC.
//
server.on("/slow.html", HTTP_GET, [](AsyncWebServerRequest *request) {
requests = requests + 1;
uint32_t d = request->getParam("d")->value().toInt();
Expand Down
4 changes: 2 additions & 2 deletions examples/ServerSentEvents/ServerSentEvents.ino
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ void setup() {
});

events.onConnect([](AsyncEventSourceClient *client) {
Serial.printf("SSE Client connected! ID: %" PRIu32 "\n", client->lastId());
Serial.printf("SSE Client connected!");
client->send("hello!", NULL, millis(), 1000);
});

events.onDisconnect([](AsyncEventSourceClient *client) {
Serial.printf("SSE Client disconnected! ID: %" PRIu32 "\n", client->lastId());
Serial.printf("SSE Client disconnected!");
});

server.addHandler(&events);
Expand Down
5 changes: 5 additions & 0 deletions examples/SlowChunkResponse/SlowChunkResponse.ino
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ void setup() {
//
// time curl -N -v -G -d 'd=2000' -d 'l=10000' http://192.168.4.1/slow.html --output -
//
// THIS CODE WILL CRASH BECAUSE OF THE WATCHDOG.
// IF YOU REALLY NEED TO DO THIS, YOU MUST DISABLE THE TWDT
//
// CORRECT WAY IS TO USE SSE OR WEBSOCKETS TO DO THE COSTLY PROCESSING ASYNC.
//
server.on("/slow.html", HTTP_GET, [](AsyncWebServerRequest *request) {
uint32_t d = request->getParam("d")->value().toInt();
uint32_t l = request->getParam("l")->value().toInt();
Expand Down
32 changes: 21 additions & 11 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ size_t AsyncEventSourceMessage::send(AsyncClient *client) {

// Client

AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server) : _client(request->client()), _server(server) {
AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server) : _client(request->clientRelease()), _server(server) {

if (request->hasHeader(T_Last_Event_ID)) {
_lastId = atoi(request->getHeader(T_Last_Event_ID)->value().c_str());
Expand Down Expand Up @@ -181,9 +181,9 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
);

_server->_addClient(this);
delete request;

_client->setNoDelay(true);
// delete AsyncWebServerRequest object (and bound response) since we have the ownership on client connection now
delete request;
}

AsyncEventSourceClient::~AsyncEventSourceClient() {
Expand Down Expand Up @@ -470,8 +470,7 @@ void AsyncEventSource::_adjust_inflight_window() {

/* Response */

AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server) {
_server = server;
AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server) : _server(server) {
_code = 200;
_contentType = T_text_event_stream;
_sendContentLength = false;
Expand All @@ -482,13 +481,24 @@ AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server) {
void AsyncEventSourceResponse::_respond(AsyncWebServerRequest *request) {
String out;
_assembleHead(out, request->version());
// unbind client's onAck callback from AsyncWebServerRequest's, we will destroy it on next callback and steal the client,
// can't do it now 'cause now we are in AsyncWebServerRequest::_onAck 's stack actually
// here we are loosing time on one RTT delay, but with current design we can't get rid of Req/Resp objects other way
_request = request;
request->client()->onAck(
[](void *r, AsyncClient *c, size_t len, uint32_t time) {
if (len) {
static_cast<AsyncEventSourceResponse *>(r)->_switchClient();
}
},
this
);
request->client()->write(out.c_str(), _headLength);
_state = RESPONSE_WAIT_ACK;
}

size_t AsyncEventSourceResponse::_ack(AsyncWebServerRequest *request, size_t len, uint32_t time __attribute__((unused))) {
if (len) {
new AsyncEventSourceClient(request, _server);
}
return 0;
}
void AsyncEventSourceResponse::_switchClient() {
// AsyncEventSourceClient c-tor will take the ownership of AsyncTCP's client connection
new AsyncEventSourceClient(_request, _server);
// AsyncEventSourceClient c-tor would also delete _request and *this
};
14 changes: 13 additions & 1 deletion src/AsyncEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ class AsyncEventSourceClient {
void _runQueue();

public:
/**
* @brief Construct a new Async Event Source Client object
* @note constructor would take the ownership of of AsyncTCP's client pointer from `request` parameter and call delete on it!
*
* @param request
* @param server
*/
AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server);
~AsyncEventSourceClient();

Expand Down Expand Up @@ -312,11 +319,16 @@ class AsyncEventSource : public AsyncWebHandler {
class AsyncEventSourceResponse : public AsyncWebServerResponse {
private:
AsyncEventSource *_server;
AsyncWebServerRequest *_request;
// this call back will switch AsyncTCP client to SSE
void _switchClient();

public:
AsyncEventSourceResponse(AsyncEventSource *server);
void _respond(AsyncWebServerRequest *request);
size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time);
size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time) override {
return 0;
};
bool _sourceValid() const {
return true;
}
Expand Down
43 changes: 24 additions & 19 deletions src/AsyncWebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,10 @@ size_t AsyncWebSocketMessage::send(AsyncClient *client) {
const char *AWSC_PING_PAYLOAD = "ESPAsyncWebServer-PING";
const size_t AWSC_PING_PAYLOAD_LEN = 22;

AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, AsyncWebSocket *server) : _tempObject(NULL) {
_client = request->client();
_server = server;
_clientId = _server->_getNextId();
_status = WS_CONNECTED;
_pstate = 0;
_lastMessageTime = millis();
_keepAlivePeriod = 0;
AsyncWebSocketClient::AsyncWebSocketClient(AsyncClient *client, AsyncWebSocket *server)
: _client(client), _server(server), _clientId(_server->_getNextId()), _status(WS_CONNECTED), _pstate(0), _lastMessageTime(millis()), _keepAlivePeriod(0),
_tempObject(NULL) {

_client->setRxTimeout(0);
_client->onError(
[](void *r, AsyncClient *c, int8_t error) {
Expand Down Expand Up @@ -272,7 +268,6 @@ AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, Async
},
this
);
delete request;
memset(&_pinfo, 0, sizeof(_pinfo));
}

Expand Down Expand Up @@ -806,7 +801,10 @@ void AsyncWebSocket::_handleEvent(AsyncWebSocketClient *client, AwsEventType typ

AsyncWebSocketClient *AsyncWebSocket::_newClient(AsyncWebServerRequest *request) {
_clients.emplace_back(request, this);
// we've just detached AsyncTCP client from AsyncWebServerRequest
_handleEvent(&_clients.back(), WS_EVT_CONNECT, request, NULL, 0);
// after user code completed CONNECT event callback we can delete req/response objects
delete request;
return &_clients.back();
}

Expand Down Expand Up @@ -1243,8 +1241,7 @@ AsyncWebSocketMessageBuffer *AsyncWebSocket::makeBuffer(const uint8_t *data, siz
* Authentication code from https://github.com/Links2004/arduinoWebSockets/blob/master/src/WebSockets.cpp#L480
*/

AsyncWebSocketResponse::AsyncWebSocketResponse(const String &key, AsyncWebSocket *server) {
_server = server;
AsyncWebSocketResponse::AsyncWebSocketResponse(const String &key, AsyncWebSocket *server) : _server(server) {
_code = 101;
_sendContentLength = false;

Expand Down Expand Up @@ -1290,18 +1287,26 @@ void AsyncWebSocketResponse::_respond(AsyncWebServerRequest *request) {
request->client()->close();
return;
}
// unbind client's onAck callback from AsyncWebServerRequest's, we will destroy it on next callback and steal the client,
// can't do it now 'cause now we are in AsyncWebServerRequest::_onAck 's stack actually
// here we are loosing time on one RTT delay, but with current design we can't get rid of Req/Resp objects other way
_request = request;
request->client()->onAck(
[](void *r, AsyncClient *c, size_t len, uint32_t time) {
if (len) {
static_cast<AsyncWebSocketResponse *>(r)->_switchClient();
}
},
this
);
String out;
_assembleHead(out, request->version());
request->client()->write(out.c_str(), _headLength);
_state = RESPONSE_WAIT_ACK;
}

size_t AsyncWebSocketResponse::_ack(AsyncWebServerRequest *request, size_t len, uint32_t time) {
(void)time;

if (len) {
_server->_newClient(request);
}

return 0;
void AsyncWebSocketResponse::_switchClient() {
// detach client from request
_server->_newClient(_request);
// _newClient() would also destruct _request and *this
}
24 changes: 18 additions & 6 deletions src/AsyncWebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,18 @@ class AsyncWebSocketClient {
AsyncWebSocket *_server;
uint32_t _clientId;
AwsClientStatus _status;
uint8_t _pstate;
uint32_t _lastMessageTime;
uint32_t _keepAlivePeriod;
#ifdef ESP32
mutable std::recursive_mutex _lock;
#endif
std::deque<AsyncWebSocketControl> _controlQueue;
std::deque<AsyncWebSocketMessage> _messageQueue;
bool closeWhenFull = true;

uint8_t _pstate;
AwsFrameInfo _pinfo;

uint32_t _lastMessageTime;
uint32_t _keepAlivePeriod;

bool _queueControl(uint8_t opcode, const uint8_t *data = NULL, size_t len = 0, bool mask = false);
bool _queueMessage(AsyncWebSocketSharedBuffer buffer, uint8_t opcode = WS_TEXT, bool mask = false);
void _runQueue();
Expand All @@ -232,7 +231,15 @@ class AsyncWebSocketClient {
public:
void *_tempObject;

AsyncWebSocketClient(AsyncWebServerRequest *request, AsyncWebSocket *server);
AsyncWebSocketClient(AsyncClient *client, AsyncWebSocket *server);

/**
* @brief Construct a new Async Web Socket Client object
* @note constructor would take the ownership of of AsyncTCP's client pointer from `request` parameter and call delete on it!
* @param request
* @param server
*/
AsyncWebSocketClient(AsyncWebServerRequest *request, AsyncWebSocket *server) : AsyncWebSocketClient(request->clientRelease(), server){};
~AsyncWebSocketClient();

// client id increments for the given server
Expand Down Expand Up @@ -464,11 +471,16 @@ class AsyncWebSocketResponse : public AsyncWebServerResponse {
private:
String _content;
AsyncWebSocket *_server;
AsyncWebServerRequest *_request;
// this call back will switch AsyncTCP client to WebSocket
void _switchClient();

public:
AsyncWebSocketResponse(const String &key, AsyncWebSocket *server);
void _respond(AsyncWebServerRequest *request);
size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time);
size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time) override {
return 0;
};
bool _sourceValid() const {
return true;
}
Expand Down
Loading