Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ class YDocExtension(ExtensionApp):
saving changes from the front-end.""",
)

file_stop_poll_on_errors_after = Float(
24 * 60 * 60,
allow_none=True,
config=True,
help="""The duration in seconds to stop polling a file after consecutive errors.
Defaults to 24 hours, if None then polling will not stop on errors.""",
)

document_cleanup_delay = Float(
60,
allow_none=True,
Expand Down Expand Up @@ -121,7 +129,10 @@ def initialize_handlers(self):
# the global app settings in which the file id manager will register
# itself maybe at a later time.
self.file_loaders = FileLoaderMapping(
self.serverapp.web_app.settings, self.log, self.file_poll_interval
self.serverapp.web_app.settings,
self.log,
self.file_poll_interval,
file_stop_poll_on_errors_after=self.file_stop_poll_on_errors_after,
)

self.handlers.extend(
Expand Down
42 changes: 38 additions & 4 deletions projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

import asyncio
from logging import Logger, getLogger
from time import time
from typing import Any, Callable, Coroutine
from tornado.web import HTTPError
from http import HTTPStatus

from jupyter_server.services.contents.manager import (
AsyncContentsManager,
Expand All @@ -29,12 +32,16 @@ def __init__(
contents_manager: AsyncContentsManager | ContentsManager,
log: Logger | None = None,
poll_interval: float | None = None,
max_consecutive_logs: int = 3,
stop_poll_on_errors_after: float | None = None,
) -> None:
self._file_id: str = file_id

self._lock = asyncio.Lock()
self._poll_interval = poll_interval
self._stop_poll_on_errors_after = stop_poll_on_errors_after
self._file_id_manager = file_id_manager
self._max_consecutive_logs = max_consecutive_logs
self._contents_manager = contents_manager

self._log = log or getLogger(__name__)
Expand Down Expand Up @@ -204,8 +211,8 @@ async def _watch_file(self) -> None:
return

consecutive_error_logs = 0
max_consecutive_logs = 3
suppression_logged = False
consecutive_errors_started = None

while True:
try:
Expand All @@ -214,13 +221,37 @@ async def _watch_file(self) -> None:
await self.maybe_notify()
consecutive_error_logs = 0
suppression_logged = False
consecutive_errors_started = None
except Exception as e:
if consecutive_error_logs < max_consecutive_logs:
self._log.error(f"Error watching file: {self.path}\n{e!r}", exc_info=e)
# We do not want to terminate the watcher if the content manager request
# fails due to timeout, server error or similar temporary issue; we only
# terminate if the file is not found or we get unauthorized error for
# an extended period of time.
if isinstance(e, HTTPError) and e.status_code in {
HTTPStatus.NOT_FOUND,
HTTPStatus.UNAUTHORIZED,
}:
if (
consecutive_errors_started
and self._stop_poll_on_errors_after is not None
):
errors_duration = time() - consecutive_errors_started
if errors_duration > self._stop_poll_on_errors_after:
self._log.warning(
"Stopping watching file due to consecutive errors over %s seconds: %s",
self._stop_poll_on_errors_after,
self.path,
)
break
else:
consecutive_errors_started = time()
# Otherwise we just log the error
if consecutive_error_logs < self._max_consecutive_logs:
self._log.error("Error watching file %s: %s", self.path, e, exc_info=e)
consecutive_error_logs += 1
elif not suppression_logged:
self._log.warning(
"Too many errors while watching %s suppressing further logs.",
"Too many errors while watching %s - suppressing further logs.",
self.path,
)
suppression_logged = True
Expand Down Expand Up @@ -268,6 +299,7 @@ def __init__(
settings: dict,
log: Logger | None = None,
file_poll_interval: float | None = None,
file_stop_poll_on_errors_after: float | None = None,
) -> None:
"""
Args:
Expand All @@ -279,6 +311,7 @@ def __init__(
self.__dict: dict[str, FileLoader] = {}
self.log = log or getLogger(__name__)
self.file_poll_interval = file_poll_interval
self._stop_poll_on_errors_after = file_stop_poll_on_errors_after

@property
def contents_manager(self) -> AsyncContentsManager | ContentsManager:
Expand Down Expand Up @@ -309,6 +342,7 @@ def __getitem__(self, file_id: str) -> FileLoader:
self.contents_manager,
self.log,
self.file_poll_interval,
stop_poll_on_errors_after=self._stop_poll_on_errors_after,
)
self.__dict[file_id] = file

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from datetime import datetime
from typing import Any
from tornado.web import HTTPError

from jupyter_server import _tz as tz

Expand Down Expand Up @@ -40,6 +41,8 @@ def __init__(self, model: dict):
def get(
self, path: str, content: bool = True, format: str | None = None, type: str | None = None
) -> dict:
if not self.model:
raise HTTPError(404, f"File not found: {path}")
self.actions.append("get")
return self.model

Expand Down
39 changes: 39 additions & 0 deletions tests/test_loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,45 @@ async def trigger():
await loader.clean()


async def test_FileLoader_with_watcher_errors(caplog):
id = "file-4567"
path = "myfile.txt"
paths = {}
paths[id] = path

cm = FakeContentsManager({"last_modified": datetime.now(timezone.utc)})

loader = FileLoader(
id,
FakeFileIDManager(paths),
cm,
poll_interval=0.1,
max_consecutive_logs=2,
stop_poll_on_errors_after=1,
)
await loader.load_content("text", "file")

try:
cm.model = {}
await asyncio.sleep(0.5)
logs = [r.getMessage() for r in caplog.records]
assert logs == [
"Error watching file myfile.txt: HTTP 404: Not Found (File not found: myfile.txt)",
"Error watching file myfile.txt: HTTP 404: Not Found (File not found: myfile.txt)",
"Too many errors while watching myfile.txt - suppressing further logs.",
]

await asyncio.sleep(1)
logs = [r.getMessage() for r in caplog.records]
assert len(logs) == 4
assert (
logs[-1]
== "Stopping watching file due to consecutive errors over 1 seconds: myfile.txt"
)
finally:
await loader.clean()


async def test_FileLoader_without_watcher():
id = "file-4567"
path = "myfile.txt"
Expand Down
Loading