1717#include " database/src/common/query_spec.h"
1818#include " database/src/desktop/connection/persistent_connection.h"
1919#include " database/src/desktop/core/listen_provider.h"
20+ #include " database/src/desktop/view/view.h"
2021
2122namespace firebase {
2223namespace database {
@@ -25,8 +26,66 @@ namespace internal {
2526using connection::PersistentConnection;
2627using connection::ResponsePtr;
2728
28- void WebSocketListenProvider::StartListening (const QuerySpec& query_spec) {
29- connection_->Listen (query_spec, PersistentConnection::Tag (), ResponsePtr ());
29+ class WebSocketListenResponse : public connection ::Response {
30+ public:
31+ WebSocketListenResponse (const Response::ResponseCallback& callback,
32+ const Repo::ThisRef& repo_ref, SyncTree* sync_tree,
33+ const QuerySpec& query_spec, const View* view)
34+ : connection::Response(callback),
35+ repo_ref_ (repo_ref),
36+ sync_tree_(sync_tree),
37+ query_spec_(query_spec),
38+ view_(view) {}
39+
40+ Repo::ThisRef& repo_ref () { return repo_ref_; }
41+ SyncTree* sync_tree () { return sync_tree_; }
42+ const QuerySpec& query_spec () { return query_spec_; }
43+ const View* view () { return view_; }
44+
45+ private:
46+ Repo::ThisRef repo_ref_;
47+ SyncTree* sync_tree_;
48+ QuerySpec query_spec_;
49+ const View* view_;
50+ };
51+
52+ void WebSocketListenProvider::StartListening (const QuerySpec& query_spec,
53+ const View* view) {
54+ connection_->Listen (
55+ query_spec, PersistentConnection::Tag (),
56+ MakeShared<WebSocketListenResponse>(
57+ [](const SharedPtr<connection::Response>& connection_response) {
58+ WebSocketListenResponse* response =
59+ static_cast <WebSocketListenResponse*>(
60+ connection_response.get ());
61+
62+ Repo::ThisRefLock lock (&response->repo_ref ());
63+ Repo* repo = lock.GetReference ();
64+ if (repo == nullptr ) {
65+ // Repo was deleted, do not proceed.
66+ return ;
67+ }
68+
69+ std::vector<Event> events;
70+ if (!response->HasError ()) {
71+ const QuerySpec& query_spec = response->view ()->query_spec ();
72+ events =
73+ response->sync_tree ()->ApplyListenComplete (query_spec.path );
74+ } else {
75+ LogWarning (" Listen at %s failed: %s" ,
76+ response->query_spec ().path .c_str (),
77+ response->GetErrorMessage ().c_str ());
78+
79+ // If a listen failed, kill all of the listeners here, not just
80+ // the one that triggered the error. Note that this may need to be
81+ // scoped to just this listener if we change permissions on
82+ // filtered children
83+ events = response->sync_tree ()->RemoveAllEventRegistrations (
84+ response->query_spec (), response->GetErrorCode ());
85+ }
86+ repo->PostEvents (events);
87+ },
88+ repo_->this_ref (), sync_tree_, query_spec, view));
3089}
3190
3291void WebSocketListenProvider::StopListening (const QuerySpec& query_spec) {
0 commit comments