diff --git a/changelog.d/19229.misc b/changelog.d/19229.misc new file mode 100644 index 00000000000..8caebead72a --- /dev/null +++ b/changelog.d/19229.misc @@ -0,0 +1 @@ +Move towards using a dedicated `Duration` type. diff --git a/rust/src/duration.rs b/rust/src/duration.rs new file mode 100644 index 00000000000..a3dbe919b28 --- /dev/null +++ b/rust/src/duration.rs @@ -0,0 +1,56 @@ +/* + * This file is licensed under the Affero General Public License (AGPL) version 3. + * + * Copyright (C) 2025 Element Creations, Ltd + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * See the GNU Affero General Public License for more details: + * . + */ + +use once_cell::sync::OnceCell; +use pyo3::{ + types::{IntoPyDict, PyAnyMethods}, + Bound, BoundObject, IntoPyObject, Py, PyAny, PyErr, PyResult, Python, +}; + +/// A reference to the `synapse.util.duration` module. +static DURATION: OnceCell> = OnceCell::new(); + +/// Access to the `synapse.util.duration` module. +fn duration_module(py: Python<'_>) -> PyResult<&Bound<'_, PyAny>> { + Ok(DURATION + .get_or_try_init(|| py.import("synapse.util.duration").map(Into::into))? + .bind(py)) +} + +/// Mirrors the `synapse.util.duration.Duration` Python class. +pub struct SynapseDuration { + microseconds: u64, +} + +impl SynapseDuration { + /// For now we only need to create durations from milliseconds. + pub fn from_milliseconds(milliseconds: u64) -> Self { + Self { + microseconds: milliseconds * 1_000, + } + } +} + +impl<'py> IntoPyObject<'py> for &SynapseDuration { + type Target = PyAny; + type Output = Bound<'py, Self::Target>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let duration_module = duration_module(py)?; + let kwargs = [("microseconds", self.microseconds)].into_py_dict(py)?; + let duration_instance = duration_module.call_method("Duration", (), Some(&kwargs))?; + Ok(duration_instance.into_bound()) + } +} diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 6522148fa15..fe880af2eae 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -5,6 +5,7 @@ use pyo3::prelude::*; use pyo3_log::ResetHandle; pub mod acl; +pub mod duration; pub mod errors; pub mod events; pub mod http; diff --git a/rust/src/rendezvous/mod.rs b/rust/src/rendezvous/mod.rs index 848b5035bbc..9a6da9fcc32 100644 --- a/rust/src/rendezvous/mod.rs +++ b/rust/src/rendezvous/mod.rs @@ -35,6 +35,7 @@ use ulid::Ulid; use self::session::Session; use crate::{ + duration::SynapseDuration, errors::{NotFoundError, SynapseError}, http::{http_request_from_twisted, http_response_to_twisted, HeaderMapPyExt}, UnwrapInfallible, @@ -132,6 +133,8 @@ impl RendezvousHandler { .unwrap_infallible() .unbind(); + let eviction_duration = SynapseDuration::from_milliseconds(eviction_interval); + // Construct a Python object so that we can get a reference to the // evict method and schedule it to run. let self_ = Py::new( @@ -149,7 +152,7 @@ impl RendezvousHandler { let evict = self_.getattr(py, "_evict")?; homeserver.call_method0("get_clock")?.call_method( "looping_call", - (evict, eviction_interval), + (evict, &eviction_duration), None, )?; diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index df884d47d75..d6cc3d26b5f 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -27,6 +27,7 @@ from synapse.storage.databases.main import DataStore from synapse.types import Requester from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.wheel_timer import WheelTimer if TYPE_CHECKING: @@ -100,7 +101,7 @@ def __init__( # and doesn't affect correctness. self._timer: WheelTimer[Hashable] = WheelTimer() - self.clock.looping_call(self._prune_message_counts, 15 * 1000) + self.clock.looping_call(self._prune_message_counts, Duration(seconds=15)) def _get_key(self, requester: Requester | None, key: Hashable | None) -> Hashable: """Use the requester's MXID as a fallback key if no key is provided.""" diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index d278e30850b..7b4bf25c280 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -218,13 +218,13 @@ def performance_stats_init() -> None: # table will decrease clock.looping_call( hs.get_datastores().main.generate_user_daily_visits, - Duration(minutes=5).as_millis(), + Duration(minutes=5), ) # monthly active user limiting functionality clock.looping_call( hs.get_datastores().main.reap_monthly_active_users, - Duration(hours=1).as_millis(), + Duration(hours=1), ) hs.get_datastores().main.reap_monthly_active_users() @@ -263,14 +263,14 @@ async def _generate_monthly_active_users() -> None: if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only: generate_monthly_active_users() - clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000) + clock.looping_call(generate_monthly_active_users, Duration(minutes=5)) # End of monthly active user settings if hs.config.metrics.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") clock.looping_call( phone_stats_home, - PHONE_HOME_INTERVAL.as_millis(), + PHONE_HOME_INTERVAL, hs, stats, ) @@ -278,14 +278,14 @@ async def _generate_monthly_active_users() -> None: # We need to defer this init for the cases that we daemonize # otherwise the process ID we get is that of the non-daemon process clock.call_later( - 0, + Duration(seconds=0), performance_stats_init, ) # We wait 5 minutes to send the first set of stats as the server can # be quite busy the first few minutes clock.call_later( - INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME.as_secs(), + INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME, phone_stats_home, hs, stats, diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 250f84d6445..befb4ae44b0 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -77,6 +77,7 @@ from synapse.storage.databases.main import DataStore from synapse.types import DeviceListUpdates, JsonMapping from synapse.util.clock import Clock, DelayedCallWrapper +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -504,7 +505,7 @@ def __init__( self.scheduled_recovery: DelayedCallWrapper | None = None def recover(self) -> None: - delay = 2**self.backoff_counter + delay = Duration(seconds=2**self.backoff_counter) logger.info("Scheduling retries on %s in %fs", self.service.id, delay) self.scheduled_recovery = self.clock.call_later( delay, diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 4110a90ed6d..ba738ad65e7 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -75,6 +75,7 @@ from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.duration import Duration from synapse.util.retryutils import NotRetryingDestination if TYPE_CHECKING: @@ -132,7 +133,7 @@ def __init__(self, hs: "HomeServer"): super().__init__(hs) self.pdu_destination_tried: dict[str, dict[str, int]] = {} - self._clock.looping_call(self._clear_tried_cache, 60 * 1000) + self._clock.looping_call(self._clear_tried_cache, Duration(minutes=1)) self.state = hs.get_state_handler() self.transport_layer = hs.get_federation_transport_client() diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 34abac1cec9..b909f1e5956 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -89,6 +89,7 @@ from synapse.util import unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results from synapse.util.caches.response_cache import ResponseCache +from synapse.util.duration import Duration from synapse.util.stringutils import parse_server_name if TYPE_CHECKING: @@ -226,7 +227,7 @@ async def _handle_old_staged_events(self) -> None: ) # We pause a bit so that we don't start handling all rooms at once. - await self._clock.sleep(random.uniform(0, 0.1)) + await self._clock.sleep(Duration(seconds=random.uniform(0, 0.1))) async def on_backfill_request( self, origin: str, room_id: str, versions: list[str], limit: int @@ -301,7 +302,9 @@ async def on_incoming_transaction( # Start a periodic check for old staged events. This is to handle # the case where locks time out, e.g. if another process gets killed # without dropping its locks. - self._clock.looping_call(self._handle_old_staged_events, 60 * 1000) + self._clock.looping_call( + self._handle_old_staged_events, Duration(minutes=1) + ) # keep this as early as possible to make the calculated origin ts as # accurate as possible. diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index cf70e10a58f..4a6d155217c 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -53,6 +53,7 @@ from synapse.metrics import SERVER_NAME_LABEL, LaterGauge from synapse.replication.tcp.streams.federation import FederationStream from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection +from synapse.util.duration import Duration from synapse.util.metrics import Measure from .units import Edu @@ -137,7 +138,7 @@ def register(queue_name: QueueNames, queue: Sized) -> None: assert isinstance(queue, Sized) register(queue_name, queue=queue) - self.clock.looping_call(self._clear_queue, 30 * 1000) + self.clock.looping_call(self._clear_queue, Duration(seconds=30)) def shutdown(self) -> None: """Stops this federation sender instance from sending further transactions.""" diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 0bd97c25dfb..f7240c2f7f9 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -174,6 +174,7 @@ get_domain_from_id, ) from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.metrics import Measure from synapse.util.retryutils import filter_destinations_by_retry_limiter @@ -218,12 +219,12 @@ # Please note that rate limiting still applies, so while the loop is # executed every X seconds the destinations may not be woken up because # they are being rate limited following previous attempt failures. -WAKEUP_RETRY_PERIOD_SEC = 60 +WAKEUP_RETRY_PERIOD = Duration(minutes=1) -# Time (in s) to wait in between waking up each destination, i.e. one destination +# Time to wait in between waking up each destination, i.e. one destination # will be woken up every seconds until we have woken every destination # has outstanding catch-up. -WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5 +WAKEUP_INTERVAL_BETWEEN_DESTINATIONS = Duration(seconds=5) class AbstractFederationSender(metaclass=abc.ABCMeta): @@ -379,7 +380,7 @@ async def _handle(self) -> None: queue.attempt_new_transaction() - await self.clock.sleep(current_sleep_seconds) + await self.clock.sleep(Duration(seconds=current_sleep_seconds)) if not self.queue: break @@ -468,7 +469,7 @@ def __init__(self, hs: "HomeServer"): # Regularly wake up destinations that have outstanding PDUs to be caught up self.clock.looping_call_now( self.hs.run_as_background_process, - WAKEUP_RETRY_PERIOD_SEC * 1000.0, + WAKEUP_RETRY_PERIOD, "wake_destinations_needing_catchup", self._wake_destinations_needing_catchup, ) @@ -1161,4 +1162,4 @@ async def _wake_destinations_needing_catchup(self) -> None: last_processed, ) self.wake_destination(destination) - await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC) + await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS) diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index bc50efa1a7b..ba40d5763ef 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -28,6 +28,7 @@ from synapse.types import UserID from synapse.util import stringutils from synapse.util.async_helpers import delay_cancellation +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -73,7 +74,7 @@ def __init__(self, hs: "HomeServer"): # Check the renewal emails to send and send them every 30min. if hs.config.worker.run_background_tasks: - self.clock.looping_call(self._send_renewal_emails, 30 * 60 * 1000) + self.clock.looping_call(self._send_renewal_emails, Duration(minutes=30)) async def is_user_expired(self, user_id: str) -> bool: """Checks if a user has expired against third-party modules. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index d9355d33da9..b5c0cbdba23 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -74,6 +74,7 @@ from synapse.types import JsonDict, Requester, StrCollection, UserID from synapse.util import stringutils as stringutils from synapse.util.async_helpers import delay_cancellation, maybe_awaitable +from synapse.util.duration import Duration from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.stringutils import base62_encode from synapse.util.threepids import canonicalise_email @@ -242,7 +243,7 @@ def __init__(self, hs: "HomeServer"): if hs.config.worker.run_background_tasks: self._clock.looping_call( run_as_background_process, - 5 * 60 * 1000, + Duration(minutes=5), "expire_old_sessions", self.server_name, self._expire_old_sessions, diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index de21e3abbb7..a051200cf10 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -42,6 +42,7 @@ UserID, create_requester, ) +from synapse.util.duration import Duration from synapse.util.events import generate_fake_event_id from synapse.util.metrics import Measure from synapse.util.sentinel import Sentinel @@ -92,7 +93,7 @@ async def _schedule_db_events() -> None: # Kick off again (without blocking) to catch any missed notifications # that may have fired before the callback was added. self._clock.call_later( - 0, + Duration(seconds=0), self.notify_new_event, ) @@ -501,17 +502,17 @@ def _schedule_next_at_or_none(self, next_send_ts: Timestamp | None) -> None: def _schedule_next_at(self, next_send_ts: Timestamp) -> None: delay = next_send_ts - self._get_current_ts() - delay_sec = delay / 1000 if delay > 0 else 0 + delay_duration = Duration(milliseconds=max(delay, 0)) if self._next_delayed_event_call is None: self._next_delayed_event_call = self._clock.call_later( - delay_sec, + delay_duration, self.hs.run_as_background_process, "_send_on_timeout", self._send_on_timeout, ) else: - self._next_delayed_event_call.reset(delay_sec) + self._next_delayed_event_call.reset(delay_duration.as_secs()) async def get_all_for_user(self, requester: Requester) -> list[JsonDict]: """Return all pending delayed events requested by the given user.""" diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 3f1a5fe6d6e..1b7de57ab93 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -71,6 +71,7 @@ from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from synapse.util.metrics import measure_func from synapse.util.retryutils import ( NotRetryingDestination, @@ -85,7 +86,7 @@ DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages" MAX_DEVICE_DISPLAY_NAME_LEN = 100 -DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000 +DELETE_STALE_DEVICES_INTERVAL = Duration(days=1) def _check_device_name_length(name: str | None) -> None: @@ -186,7 +187,7 @@ def __init__(self, hs: "HomeServer"): ): self.clock.looping_call( self.hs.run_as_background_process, - DELETE_STALE_DEVICES_INTERVAL_MS, + DELETE_STALE_DEVICES_INTERVAL, desc="delete_stale_devices", func=self._delete_stale_devices, ) @@ -915,7 +916,7 @@ async def handle_new_device_update(self) -> None: ) DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000 - DEVICE_MSGS_DELETE_SLEEP_MS = 100 + DEVICE_MSGS_DELETE_SLEEP = Duration(milliseconds=100) async def _delete_device_messages( self, @@ -941,9 +942,7 @@ async def _delete_device_messages( if from_stream_id is None: return TaskStatus.COMPLETE, None, None - await self.clock.sleep( - DeviceWriterHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0 - ) + await self.clock.sleep(DeviceWriterHandler.DEVICE_MSGS_DELETE_SLEEP) class DeviceWriterHandler(DeviceHandler): @@ -1469,7 +1468,7 @@ def __init__(self, hs: "HomeServer", device_handler: DeviceWriterHandler): self._resync_retry_lock = Lock() self.clock.looping_call( self.hs.run_as_background_process, - 30 * 1000, + Duration(seconds=30), func=self._maybe_retry_device_resync, desc="_maybe_retry_device_resync", ) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 41d27d47da1..64f705a3dad 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -46,6 +46,7 @@ ) from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from synapse.util.json import json_decoder from synapse.util.retryutils import ( NotRetryingDestination, @@ -1634,7 +1635,7 @@ async def _delete_old_one_time_keys_task( # matrix.org has about 15M users in the e2e_one_time_keys_json table # (comprising 20M devices). We want this to take about a week, so we need # to do about one batch of 100 users every 4 seconds. - await self.clock.sleep(4) + await self.clock.sleep(Duration(seconds=4)) def _check_cross_signing_key( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1bba3fc758e..7808f8928b0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -72,6 +72,7 @@ from synapse.types import JsonDict, StrCollection, get_domain_from_id from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer +from synapse.util.duration import Duration from synapse.util.retryutils import NotRetryingDestination from synapse.visibility import filter_events_for_server @@ -1972,7 +1973,9 @@ async def _sync_partial_state_room( logger.warning( "%s; waiting for %d ms...", e, e.retry_after_ms ) - await self.clock.sleep(e.retry_after_ms / 1000) + await self.clock.sleep( + Duration(milliseconds=e.retry_after_ms) + ) # Success, no need to try the rest of the destinations. break diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 01e98f60ada..e314180e122 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -91,6 +91,7 @@ ) from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer, concurrently_execute +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter, partition, sorted_topologically from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr @@ -1802,7 +1803,7 @@ async def prep(event: EventBase) -> None: # the reactor. For large rooms let's yield to the reactor # occasionally to ensure we don't block other work. if (i + 1) % 1000 == 0: - await self._clock.sleep(0) + await self._clock.sleep(Duration(seconds=0)) # Also persist the new event in batches for similar reasons as above. for batch in batch_iter(events_and_contexts_to_persist, 1000): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7679303a36d..bac4bd9361b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -83,6 +83,7 @@ from synapse.util import log_failure, unwrapFirstError from synapse.util.async_helpers import Linearizer, gather_results from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.duration import Duration from synapse.util.json import json_decoder, json_encoder from synapse.util.metrics import measure_func from synapse.visibility import get_effective_room_visibility_from_state @@ -433,14 +434,11 @@ def _schedule_expiry_for_event(self, event_id: str, expiry_ts: int) -> None: # Figure out how many seconds we need to wait before expiring the event. now_ms = self.clock.time_msec() - delay = (expiry_ts - now_ms) / 1000 + delay = Duration(milliseconds=max(expiry_ts - now_ms, 0)) - # callLater doesn't support negative delays, so trim the delay to 0 if we're - # in that case. - if delay < 0: - delay = 0 - - logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay) + logger.info( + "Scheduling expiry for event %s in %.3fs", event_id, delay.as_secs() + ) self._scheduled_expiry = self.clock.call_later( delay, @@ -551,7 +549,7 @@ def __init__(self, hs: "HomeServer"): "send_dummy_events_to_fill_extremities", self._send_dummy_events_to_fill_extremities, ), - 5 * 60 * 1000, + Duration(minutes=5), ) self._message_handler = hs.get_message_handler() @@ -1012,7 +1010,7 @@ async def create_and_send_nonmember_event( if not ignore_shadow_ban and requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) raise ShadowBanError() room_version = None @@ -1515,7 +1513,7 @@ async def handle_new_client_event( and requester.shadow_banned ): # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) raise ShadowBanError() if event.is_state(): diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index a90ed3193cd..f869a41c5ed 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -42,6 +42,7 @@ from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse from synapse.types.state import StateFilter from synapse.util.async_helpers import ReadWriteLock +from synapse.util.duration import Duration from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -116,7 +117,7 @@ def __init__(self, hs: "HomeServer"): self.clock.looping_call( self.hs.run_as_background_process, - job.interval, + Duration(milliseconds=job.interval), "purge_history_for_rooms_in_range", self.purge_history_for_rooms_in_range, job.shortest_max_lifetime, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ca5002cab35..4c3adca46e3 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -121,6 +121,7 @@ get_domain_from_id, ) from synapse.util.async_helpers import Linearizer +from synapse.util.duration import Duration from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -203,7 +204,7 @@ # Delay before a worker tells the presence handler that a user has stopped # syncing. -UPDATE_SYNCING_USERS_MS = 10 * 1000 +UPDATE_SYNCING_USERS = Duration(seconds=10) assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER @@ -528,7 +529,7 @@ def __init__(self, hs: "HomeServer"): self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) self._set_state_client = ReplicationPresenceSetState.make_client(hs) - self.clock.looping_call(self.send_stop_syncing, UPDATE_SYNCING_USERS_MS) + self.clock.looping_call(self.send_stop_syncing, UPDATE_SYNCING_USERS) hs.register_async_shutdown_handler( phase="before", @@ -581,7 +582,7 @@ def send_stop_syncing(self) -> None: for (user_id, device_id), last_sync_ms in list( self._user_devices_going_offline.items() ): - if now - last_sync_ms > UPDATE_SYNCING_USERS_MS: + if now - last_sync_ms > UPDATE_SYNCING_USERS.as_millis(): self._user_devices_going_offline.pop((user_id, device_id), None) self.send_user_sync(user_id, device_id, False, last_sync_ms) @@ -861,20 +862,20 @@ def __init__(self, hs: "HomeServer"): # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. self.clock.call_later( - 30, + Duration(seconds=30), self.clock.looping_call, self._handle_timeouts, - 5000, + Duration(seconds=5), ) # Presence information is persisted, whether or not it is being tracked # internally. if self._presence_enabled: self.clock.call_later( - 60, + Duration(minutes=1), self.clock.looping_call, self._persist_unpersisted_changes, - 60 * 1000, + Duration(minutes=1), ) presence_wheel_timer_size_gauge.register_hook( @@ -2430,7 +2431,7 @@ class PresenceFederationQueue: _KEEP_ITEMS_IN_QUEUE_FOR_MS = 5 * 60 * 1000 # How often to check if we can expire entries from the queue. - _CLEAR_ITEMS_EVERY_MS = 60 * 1000 + _CLEAR_ITEMS_EVERY_MS = Duration(minutes=1) def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler): self._clock = hs.get_clock() diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 59904cd995a..8f16ae6dece 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -34,6 +34,7 @@ from synapse.storage.databases.main.media_repository import LocalMedia, RemoteMedia from synapse.types import JsonDict, JsonValue, Requester, UserID, create_requester from synapse.util.caches.descriptors import cached +from synapse.util.duration import Duration from synapse.util.stringutils import parse_and_validate_mxc_uri if TYPE_CHECKING: @@ -583,7 +584,7 @@ async def _update_join_states( # Do not actually update the room state for shadow-banned users. if requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) return room_ids = await self.store.get_rooms_for_user(target_user.to_string()) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d62ad5393f1..1026bfd8766 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -92,6 +92,7 @@ from synapse.util import stringutils from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter from synapse.util.stringutils import parse_and_validate_server_name from synapse.visibility import filter_events_for_client @@ -1179,7 +1180,7 @@ async def create_room( if (invite_list or invite_3pid_list) and requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) # Allow the request to go through, but remove any associated invites. invite_3pid_list = [] diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index d5f72c1732d..6f8481de9ae 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -66,6 +66,7 @@ from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_left_room +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -642,7 +643,7 @@ async def update_membership( if action == Membership.INVITE and requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) raise ShadowBanError() key = (room_id,) @@ -1647,7 +1648,7 @@ async def do_3pid_invite( if requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) raise ShadowBanError() # We need to rate limit *before* we send out any 3PID invites, so we @@ -2190,7 +2191,7 @@ def __init__(self, hs: "HomeServer"): # We kick this off to pick up outstanding work from before the last restart. self._clock.call_later( - 0, + Duration(seconds=0), self.notify_new_event, ) @@ -2232,7 +2233,7 @@ async def _unsafe_process(self) -> None: # # We wait for a short time so that we don't "tight" loop just # keeping the table up to date. - await self._clock.sleep(0.5) + await self._clock.sleep(Duration(milliseconds=500)) self.pos = self._store.get_room_max_stream_ordering() await self._store.update_room_forgetter_stream_pos(self.pos) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 6d661453ac8..c87b5f854a5 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -32,6 +32,7 @@ from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions from synapse.storage.databases.main.state_deltas import StateDelta from synapse.types import JsonDict +from synapse.util.duration import Duration from synapse.util.events import get_plain_text_topic_from_event_content if TYPE_CHECKING: @@ -72,7 +73,7 @@ def __init__(self, hs: "HomeServer"): # We kick this off so that we don't have to wait for a change before # we start populating stats self.clock.call_later( - 0, + Duration(seconds=0), self.notify_new_event, ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 8b577d5d585..e66396fecc2 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -41,6 +41,7 @@ UserID, ) from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.util.duration import Duration from synapse.util.metrics import Measure from synapse.util.retryutils import filter_destinations_by_retry_limiter from synapse.util.wheel_timer import WheelTimer @@ -60,15 +61,15 @@ class RoomMember: # How often we expect remote servers to resend us presence. -FEDERATION_TIMEOUT = 60 * 1000 +FEDERATION_TIMEOUT = Duration(minutes=1) # How often to resend typing across federation. -FEDERATION_PING_INTERVAL = 40 * 1000 +FEDERATION_PING_INTERVAL = Duration(seconds=40) # How long to remember a typing notification happened in a room before # forgetting about it. -FORGET_TIMEOUT = 10 * 60 * 1000 +FORGET_TIMEOUT = Duration(minutes=10) class FollowerTypingHandler: @@ -106,7 +107,7 @@ def __init__(self, hs: "HomeServer"): self._rooms_updated: set[str] = set() - self.clock.looping_call(self._handle_timeouts, 5000) + self.clock.looping_call(self._handle_timeouts, Duration(seconds=5)) self.clock.looping_call(self._prune_old_typing, FORGET_TIMEOUT) def _reset(self) -> None: @@ -141,7 +142,10 @@ def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None: # user. if self.federation and self.is_mine_id(member.user_id): last_fed_poke = self._member_last_federation_poke.get(member, None) - if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: + if ( + not last_fed_poke + or last_fed_poke + FEDERATION_PING_INTERVAL.as_millis() <= now + ): self.hs.run_as_background_process( "typing._push_remote", self._push_remote, @@ -165,7 +169,7 @@ async def _push_remote(self, member: RoomMember, typing: bool) -> None: now = self.clock.time_msec() self.wheel_timer.insert( - now=now, obj=member, then=now + FEDERATION_PING_INTERVAL + now=now, obj=member, then=now + FEDERATION_PING_INTERVAL.as_millis() ) hosts: StrCollection = ( @@ -315,7 +319,7 @@ async def started_typing( if requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) raise ShadowBanError() await self.auth.check_user_in_room(room_id, requester) @@ -350,7 +354,7 @@ async def stopped_typing( if requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) raise ShadowBanError() await self.auth.check_user_in_room(room_id, requester) @@ -428,8 +432,10 @@ async def _recv_edu(self, origin: str, content: JsonDict) -> None: if user.domain in domains: logger.info("Got typing update from %s: %r", user_id, content) now = self.clock.time_msec() - self._member_typing_until[member] = now + FEDERATION_TIMEOUT - self.wheel_timer.insert(now=now, obj=member, then=now + FEDERATION_TIMEOUT) + self._member_typing_until[member] = now + FEDERATION_TIMEOUT.as_millis() + self.wheel_timer.insert( + now=now, obj=member, then=now + FEDERATION_TIMEOUT.as_millis() + ) self._push_update_local(member=member, typing=content["typing"]) def _push_update_local(self, member: RoomMember, typing: bool) -> None: diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index e5210a3e978..36b037e8e1f 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -40,6 +40,7 @@ from synapse.storage.databases.main.user_directory import SearchResult from synapse.storage.roommember import ProfileInfo from synapse.types import UserID +from synapse.util.duration import Duration from synapse.util.metrics import Measure from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import non_null_str_or_none @@ -52,7 +53,7 @@ # Don't refresh a stale user directory entry, using a Federation /profile request, # for 60 seconds. This gives time for other state events to arrive (which will # then be coalesced such that only one /profile request is made). -USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000 +USER_DIRECTORY_STALE_REFRESH_TIME = Duration(minutes=1) # Maximum number of remote servers that we will attempt to refresh profiles for # in one go. @@ -60,7 +61,7 @@ # As long as we have servers to refresh (without backoff), keep adding more # every 15 seconds. -INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15 +INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = Duration(seconds=15) def calculate_time_of_next_retry(now_ts: int, retry_count: int) -> int: @@ -137,13 +138,13 @@ def __init__(self, hs: "HomeServer"): # We kick this off so that we don't have to wait for a change before # we start populating the user directory self.clock.call_later( - 0, + Duration(seconds=0), self.notify_new_event, ) # Kick off the profile refresh process on startup self._refresh_remote_profiles_call_later = self.clock.call_later( - 10, + Duration(seconds=10), self.kick_off_remote_profile_refresh_process, ) @@ -550,7 +551,7 @@ async def _handle_possible_remote_profile_change( now_ts = self.clock.time_msec() await self.store.set_remote_user_profile_in_user_dir_stale( user_id, - next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS, + next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME.as_millis(), retry_counter=0, ) # Schedule a wake-up to refresh the user directory for this server. @@ -558,13 +559,13 @@ async def _handle_possible_remote_profile_change( # other servers ahead of it in the queue to get in the way of updating # the profile if the server only just sent us an event. self.clock.call_later( - USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1, + USER_DIRECTORY_STALE_REFRESH_TIME + Duration(seconds=1), self.kick_off_remote_profile_refresh_process_for_remote_server, UserID.from_string(user_id).domain, ) # Schedule a wake-up to handle any backoffs that may occur in the future. self.clock.call_later( - 2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1, + USER_DIRECTORY_STALE_REFRESH_TIME * 2 + Duration(seconds=1), self.kick_off_remote_profile_refresh_process, ) return @@ -656,7 +657,9 @@ async def _unsafe_refresh_remote_profiles(self) -> None: if not users: return _, _, next_try_at_ts = users[0] - delay = ((next_try_at_ts - self.clock.time_msec()) // 1000) + 2 + delay = Duration( + milliseconds=next_try_at_ts - self.clock.time_msec() + ) + Duration(seconds=2) self._refresh_remote_profiles_call_later = self.clock.call_later( delay, self.kick_off_remote_profile_refresh_process, diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py index 4f9c632f5cf..1537a18cc05 100644 --- a/synapse/handlers/worker_lock.py +++ b/synapse/handlers/worker_lock.py @@ -72,7 +72,7 @@ def __init__(self, hs: "HomeServer") -> None: # that lock. self._locks: dict[tuple[str, str], WeakSet[WaitingLock | WaitingMultiLock]] = {} - self._clock.looping_call(self._cleanup_locks, 30_000) + self._clock.looping_call(self._cleanup_locks, Duration(seconds=30)) self._notifier.add_lock_released_callback(self._on_lock_released) @@ -187,7 +187,7 @@ def _wake_all_locks( lock.release_lock() self._clock.call_later( - 0, + Duration(seconds=0), _wake_all_locks, locks, ) diff --git a/synapse/http/client.py b/synapse/http/client.py index cb9b8cd6831..f0b9201086d 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -87,6 +87,7 @@ from synapse.types import ISynapseReactor, StrSequence from synapse.util.async_helpers import timeout_deferred from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.json import json_decoder if TYPE_CHECKING: @@ -161,7 +162,9 @@ def _is_ip_blocked( return False -_EPSILON = 0.00000001 +# The delay used by the scheduler to schedule tasks "as soon as possible", while +# still allowing other tasks to run between runs. +_EPSILON = Duration(microseconds=1) def _make_scheduler(clock: Clock) -> Callable[[Callable[[], object]], IDelayedCall]: diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index ec72e178c9a..303b3856a27 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -37,6 +37,7 @@ from synapse.types import ISynapseThreadlessReactor from synapse.util.caches.ttlcache import TTLCache from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.json import json_decoder from synapse.util.metrics import Measure @@ -315,7 +316,7 @@ async def _make_well_known_request( logger.info("Error fetching %s: %s. Retrying", uri_str, e) # Sleep briefly in the hopes that they come back up - await self._clock.sleep(0.5) + await self._clock.sleep(Duration(milliseconds=500)) def _cache_period_from_headers( diff --git a/synapse/http/server.py b/synapse/http/server.py index 5f4e7484fd7..226cb008317 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -76,6 +76,7 @@ from synapse.util.caches import intern_dict from synapse.util.cancellation import is_function_cancellable from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.iterutils import chunk_seq from synapse.util.json import json_encoder @@ -334,7 +335,7 @@ async def _async_render_wrapper(self, request: "SynapseRequest") -> None: callback_return = await self._async_render(request) except LimitExceededError as e: if e.pause: - await self._clock.sleep(e.pause) + await self._clock.sleep(Duration(seconds=e.pause)) raise if callback_return is not None: diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 7b4408b2bce..29c5e66ec49 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -70,6 +70,7 @@ from synapse.storage.databases.main.media_repository import LocalMedia, RemoteMedia from synapse.types import UserID from synapse.util.async_helpers import Linearizer +from synapse.util.duration import Duration from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import random_string @@ -80,10 +81,10 @@ # How often to run the background job to update the "recently accessed" # attribute of local and remote media. -UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000 # 1 minute +UPDATE_RECENTLY_ACCESSED_TS = Duration(minutes=1) # How often to run the background job to check for local and remote media # that should be purged according to the configured media retention settings. -MEDIA_RETENTION_CHECK_PERIOD_MS = 60 * 60 * 1000 # 1 hour +MEDIA_RETENTION_CHECK_PERIOD = Duration(hours=1) class MediaRepository: @@ -166,7 +167,7 @@ def __init__(self, hs: "HomeServer"): # with the duration between runs dictated by the homeserver config. self.clock.looping_call( self._start_apply_media_retention_rules, - MEDIA_RETENTION_CHECK_PERIOD_MS, + MEDIA_RETENTION_CHECK_PERIOD, ) if hs.config.media.url_preview_enabled: @@ -485,7 +486,7 @@ async def get_local_media_info( if now >= wait_until: break - await self.clock.sleep(0.5) + await self.clock.sleep(Duration(milliseconds=500)) logger.info("Media %s has not yet been uploaded", media_id) self.respond_not_yet_uploaded(request) diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index bc12212c46a..e83869bf4d6 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -51,6 +51,7 @@ from synapse.logging.opentracing import start_active_span, trace, trace_with_opname from synapse.media._base import ThreadedFileSender from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.file_consumer import BackgroundFileConsumer from ..types import JsonDict @@ -457,7 +458,7 @@ async def write_chunks_to(self, callback: Callable[[bytes], object]) -> None: callback(chunk) # We yield to the reactor by sleeping for 0 seconds. - await self.clock.sleep(0) + await self.clock.sleep(Duration(seconds=0)) @implementer(interfaces.IConsumer) @@ -652,7 +653,7 @@ async def _resumeProducingRepeatedly(self) -> None: self.paused = False while not self.paused: producer.resumeProducing() - await self.clock.sleep(0) + await self.clock.sleep(Duration(seconds=0)) class Header: diff --git a/synapse/media/url_previewer.py b/synapse/media/url_previewer.py index bbd8017b130..2c5e518918b 100644 --- a/synapse/media/url_previewer.py +++ b/synapse/media/url_previewer.py @@ -47,6 +47,7 @@ from synapse.types import JsonDict, UserID from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.duration import Duration from synapse.util.json import json_encoder from synapse.util.stringutils import random_string @@ -208,7 +209,9 @@ def __init__( ) if self._worker_run_media_background_jobs: - self.clock.looping_call(self._start_expire_url_cache_data, 10 * 1000) + self.clock.looping_call( + self._start_expire_url_cache_data, Duration(seconds=10) + ) async def preview(self, url: str, user: UserID, ts: int) -> bytes: # the in-memory cache: diff --git a/synapse/metrics/common_usage_metrics.py b/synapse/metrics/common_usage_metrics.py index 3f38412fa7a..bd27a9ca9fc 100644 --- a/synapse/metrics/common_usage_metrics.py +++ b/synapse/metrics/common_usage_metrics.py @@ -23,6 +23,7 @@ import attr from synapse.metrics import SERVER_NAME_LABEL +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -70,7 +71,7 @@ def setup(self) -> None: ) self._clock.looping_call( self._hs.run_as_background_process, - 5 * 60 * 1000, + Duration(minutes=5), desc="common_usage_metrics_update_gauges", func=self._update_gauges, ) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 6a2d152e3f6..0580f3665cf 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -158,6 +158,7 @@ from synapse.util.async_helpers import maybe_awaitable from synapse.util.caches.descriptors import CachedFunction, cached as _cached from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.frozenutils import freeze if TYPE_CHECKING: @@ -1389,7 +1390,7 @@ def looping_background_call( if self._hs.config.worker.run_background_tasks or run_on_all_instances: self._clock.looping_call( self._hs.run_as_background_process, - msec, + Duration(milliseconds=msec), desc, lambda: maybe_awaitable(f(*args, **kwargs)), ) @@ -1444,8 +1445,7 @@ def delayed_background_call( desc = f.__name__ return self._clock.call_later( - # convert ms to seconds as needed by call_later. - msec * 0.001, + Duration(milliseconds=msec), self._hs.run_as_background_process, desc, lambda: maybe_awaitable(f(*args, **kwargs)), @@ -1457,7 +1457,7 @@ async def sleep(self, seconds: float) -> None: Added in Synapse v1.49.0. """ - await self._clock.sleep(seconds) + await self._clock.sleep(Duration(seconds=seconds)) async def send_http_push_notification( self, diff --git a/synapse/notifier.py b/synapse/notifier.py index 260a2c0d87f..d8d2db17f12 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -61,6 +61,7 @@ from synapse.util.async_helpers import ( timeout_deferred, ) +from synapse.util.duration import Duration from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_client @@ -235,7 +236,7 @@ class Notifier: Primarily used from the /events stream. """ - UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 + UNUSED_STREAM_EXPIRY = Duration(minutes=10) def __init__(self, hs: "HomeServer"): self.user_to_user_stream: dict[str, _NotifierUserStream] = {} @@ -269,9 +270,7 @@ def __init__(self, hs: "HomeServer"): self.state_handler = hs.get_state_handler() - self.clock.looping_call( - self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS - ) + self.clock.looping_call(self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY) # This is not a very cheap test to perform, but it's only executed # when rendering the metrics page, which is likely once per minute at @@ -861,7 +860,7 @@ async def wait_for_stream_token(self, stream_token: StreamToken) -> bool: logged = True # TODO: be better - await self.clock.sleep(0.5) + await self.clock.sleep(Duration(milliseconds=500)) async def _get_room_ids( self, user: UserID, explicit_room_id: str | None @@ -889,7 +888,7 @@ async def _is_world_readable(self, room_id: str) -> bool: def remove_expired_streams(self) -> None: time_now_ms = self.clock.time_msec() expired_streams = [] - expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS + expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY.as_millis() for stream in self.user_to_user_stream.values(): if stream.count_listeners(): continue diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 36dc9bf6fcc..ce4a2102e4d 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -29,6 +29,7 @@ from synapse.push.mailer import Mailer from synapse.push.push_types import EmailReason from synapse.storage.databases.main.event_push_actions import EmailPushAction +from synapse.util.duration import Duration from synapse.util.threepids import validate_email if TYPE_CHECKING: @@ -229,7 +230,7 @@ async def _unsafe_process(self) -> None: if soonest_due_at is not None: delay = self.seconds_until(soonest_due_at) self.timed_call = self.hs.get_clock().call_later( - delay, + Duration(seconds=delay), self.on_timer, ) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index edcabf0c295..1e7e742dddd 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -40,6 +40,7 @@ if TYPE_CHECKING: from synapse.server import HomeServer +from synapse.util.duration import Duration logger = logging.getLogger(__name__) @@ -336,7 +337,7 @@ async def _unsafe_process(self) -> None: else: logger.info("Push failed: delaying for %ds", self.backoff_delay) self.timed_call = self.hs.get_clock().call_later( - self.backoff_delay, + Duration(seconds=self.backoff_delay), self.on_timer, ) self.backoff_delay = min( @@ -371,7 +372,7 @@ async def _process_one(self, push_action: HttpPushAction) -> bool: delay_ms = random.randint(1, self.push_jitter_delay_ms) diff_ms = event.origin_server_ts + delay_ms - self.clock.time_msec() if diff_ms > 0: - await self.clock.sleep(diff_ms / 1000) + await self.clock.sleep(Duration(milliseconds=diff_ms)) rejected = await self.dispatch_push_event(event, tweaks, badge) if rejected is False: diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index d76b40cf39b..2bab9c2d713 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -42,6 +42,7 @@ from synapse.types import JsonDict from synapse.util.caches.response_cache import ResponseCache from synapse.util.cancellation import is_function_cancellable +from synapse.util.duration import Duration from synapse.util.stringutils import random_string if TYPE_CHECKING: @@ -317,7 +318,7 @@ async def send_request( # If we timed out we probably don't need to worry about backing # off too much, but lets just wait a little anyway. - await clock.sleep(1) + await clock.sleep(Duration(seconds=1)) except (ConnectError, DNSLookupError) as e: if not cls.RETRY_ON_CONNECT_ERROR: raise @@ -332,7 +333,7 @@ async def send_request( e, ) - await clock.sleep(delay) + await clock.sleep(Duration(seconds=delay)) attempts += 1 except HttpResponseException as e: # We convert to SynapseError as we know that it was a SynapseError diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 297feb00497..fdda932ead2 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -55,6 +55,7 @@ ) from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -173,7 +174,7 @@ async def on_rdata( ) # Yield to reactor so that we don't block. - await self._clock.sleep(0) + await self._clock.sleep(Duration(seconds=0)) elif stream_name == PushersStream.NAME: for row in rows: if row.deleted: diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 3068e60af0a..489a2c76a62 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -55,6 +55,7 @@ parse_command_from_line, ) from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.stringutils import random_string if TYPE_CHECKING: @@ -193,7 +194,9 @@ def connectionMade(self) -> None: self._send_pending_commands() # Starts sending pings - self._send_ping_loop = self.clock.looping_call(self.send_ping, 5000) + self._send_ping_loop = self.clock.looping_call( + self.send_ping, Duration(seconds=5) + ) # Always send the initial PING so that the other side knows that they # can time us out. diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 27d43e6fba8..93ba48b4065 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -53,6 +53,7 @@ tcp_inbound_commands_counter, tcp_outbound_commands_counter, ) +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.replication.tcp.handler import ReplicationCommandHandler @@ -317,7 +318,7 @@ def __init__( self.hs = hs # nb must be called this for @wrap_as_background_process self.server_name = hs.hostname - hs.get_clock().looping_call(self._send_ping, 30 * 1000) + hs.get_clock().looping_call(self._send_ping, Duration(seconds=30)) @wrap_as_background_process("redis_ping") async def _send_ping(self) -> None: diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 134d8d921f9..36dd39ed672 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -34,6 +34,7 @@ from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol from synapse.replication.tcp.streams import EventsStream from synapse.replication.tcp.streams._base import CachesStream, StreamRow, Token +from synapse.util.duration import Duration from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -116,7 +117,7 @@ def __init__(self, hs: "HomeServer"): # # Note that if the position hasn't advanced then we won't send anything. if any(EventsStream.NAME == s.NAME for s in self.streams): - self.clock.looping_call(self.on_notifier_poke, 1000) + self.clock.looping_call(self.on_notifier_poke, Duration(seconds=1)) def on_notifier_poke(self) -> None: """Checks if there is actually any new data and sends it to the diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py index b052052be05..3cb1e09f44c 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py @@ -58,6 +58,7 @@ EmailRequestTokenBody, MsisdnRequestTokenBody, ) +from synapse.util.duration import Duration from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.stringutils import assert_valid_client_secret, random_string from synapse.util.threepids import check_3pid_allowed, validate_email @@ -125,7 +126,9 @@ async def on_POST(self, request: SynapseRequest) -> tuple[int, JsonDict]: # comments for request_token_inhibit_3pid_errors. # Also wait for some random amount of time between 100ms and 1s to make it # look like we did something. - await self.hs.get_clock().sleep(random.randint(1, 10) / 10) + await self.hs.get_clock().sleep( + Duration(milliseconds=random.randint(100, 1000)) + ) return 200, {"sid": random_string(16)} raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND) @@ -383,7 +386,9 @@ async def on_POST(self, request: SynapseRequest) -> tuple[int, JsonDict]: # comments for request_token_inhibit_3pid_errors. # Also wait for some random amount of time between 100ms and 1s to make it # look like we did something. - await self.hs.get_clock().sleep(random.randint(1, 10) / 10) + await self.hs.get_clock().sleep( + Duration(milliseconds=random.randint(100, 1000)) + ) return 200, {"sid": random_string(16)} raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE) @@ -449,7 +454,9 @@ async def on_POST(self, request: SynapseRequest) -> tuple[int, JsonDict]: # comments for request_token_inhibit_3pid_errors. # Also wait for some random amount of time between 100ms and 1s to make it # look like we did something. - await self.hs.get_clock().sleep(random.randint(1, 10) / 10) + await self.hs.get_clock().sleep( + Duration(milliseconds=random.randint(100, 1000)) + ) return 200, {"sid": random_string(16)} logger.info("MSISDN %s is already in use by %s", msisdn, existing_user_id) diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py index 9503446b92f..fdd2f1985a3 100644 --- a/synapse/rest/client/register.py +++ b/synapse/rest/client/register.py @@ -59,6 +59,7 @@ from synapse.metrics import SERVER_NAME_LABEL, threepid_send_requests from synapse.push.mailer import Mailer from synapse.types import JsonDict +from synapse.util.duration import Duration from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.stringutils import assert_valid_client_secret, random_string @@ -150,7 +151,9 @@ async def on_POST(self, request: SynapseRequest) -> tuple[int, JsonDict]: # Also wait for some random amount of time between 100ms and 1s to make it # look like we did something. await self.already_in_use_mailer.send_already_in_use_mail(email) - await self.hs.get_clock().sleep(random.randint(1, 10) / 10) + await self.hs.get_clock().sleep( + Duration(milliseconds=random.randint(100, 1000)) + ) return 200, {"sid": random_string(16)} raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE) @@ -219,7 +222,9 @@ async def on_POST(self, request: SynapseRequest) -> tuple[int, JsonDict]: # comments for request_token_inhibit_3pid_errors. # Also wait for some random amount of time between 100ms and 1s to make it # look like we did something. - await self.hs.get_clock().sleep(random.randint(1, 10) / 10) + await self.hs.get_clock().sleep( + Duration(milliseconds=random.randint(100, 1000)) + ) return 200, {"sid": random_string(16)} raise SynapseError( diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 0c1ac1f11b4..43c7b6f9938 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -57,7 +57,7 @@ def __init__(self, hs: "HomeServer"): ] = {} # Try to clean entries every 30 mins. This means entries will exist # for at *LEAST* 30 mins, and at *MOST* 60 mins. - self.clock.looping_call(self._cleanup, CLEANUP_PERIOD.as_millis()) + self.clock.looping_call(self._cleanup, CLEANUP_PERIOD) def _get_transaction_key(self, request: IRequest, requester: Requester) -> Hashable: """A helper function which returns a transaction key that can be used diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 9fc49be4b13..a92233c863e 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -54,6 +54,7 @@ from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.duration import Duration from synapse.util.metrics import Measure, measure_func from synapse.util.stringutils import shortstr @@ -663,7 +664,7 @@ def __init__(self, hs: "HomeServer"): _StateResMetrics ) - self.clock.looping_call(self._report_metrics, 120 * 1000) + self.clock.looping_call(self._report_metrics, Duration(minutes=2)) async def resolve_state_groups( self, diff --git a/synapse/state/v2.py b/synapse/state/v2.py index c410c3a7ec2..1241a4d66e5 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -40,6 +40,7 @@ from synapse.events import EventBase, is_creator from synapse.storage.databases.main.event_federation import StateDifference from synapse.types import MutableStateMap, StateMap, StrCollection +from synapse.util.duration import Duration logger = logging.getLogger(__name__) @@ -48,7 +49,7 @@ class Clock(Protocol): # This is usually synapse.util.Clock, but it's replaced with a FakeClock in tests. # We only ever sleep(0) though, so that other async functions can make forward # progress without waiting for stateres to complete. - async def sleep(self, duration_ms: float) -> None: ... + async def sleep(self, duration: Duration) -> None: ... class StateResolutionStore(Protocol): @@ -639,7 +640,7 @@ async def _reverse_topological_power_sort( # We await occasionally when we're working with large data sets to # ensure that we don't block the reactor loop for too long. if idx % _AWAIT_AFTER_ITERATIONS == 0: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) event_to_pl = {} for idx, event_id in enumerate(graph, start=1): @@ -651,7 +652,7 @@ async def _reverse_topological_power_sort( # We await occasionally when we're working with large data sets to # ensure that we don't block the reactor loop for too long. if idx % _AWAIT_AFTER_ITERATIONS == 0: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) def _get_power_order(event_id: str) -> tuple[int, int, str]: ev = event_map[event_id] @@ -745,7 +746,7 @@ async def _iterative_auth_checks( # We await occasionally when we're working with large data sets to # ensure that we don't block the reactor loop for too long. if idx % _AWAIT_AFTER_ITERATIONS == 0: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) return resolved_state @@ -796,7 +797,7 @@ async def _mainline_sort( # We await occasionally when we're working with large data sets to # ensure that we don't block the reactor loop for too long. if idx != 0 and idx % _AWAIT_AFTER_ITERATIONS == 0: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) idx += 1 @@ -814,7 +815,7 @@ async def _mainline_sort( # We await occasionally when we're working with large data sets to # ensure that we don't block the reactor loop for too long. if idx % _AWAIT_AFTER_ITERATIONS == 0: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) event_ids.sort(key=lambda ev_id: order_map[ev_id]) @@ -865,7 +866,7 @@ async def _get_mainline_depth_for_event( idx += 1 if idx % _AWAIT_AFTER_ITERATIONS == 0: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) # Didn't find a power level auth event, so we just return 0 return 0 diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index c71bcdb7fb1..311534c5e7b 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -40,6 +40,7 @@ from synapse.storage.types import Connection, Cursor from synapse.types import JsonDict, StrCollection from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.json import json_encoder from . import engines @@ -162,7 +163,7 @@ def __init__( async def __aenter__(self) -> int: if self._sleep: - await self._clock.sleep(self._sleep_duration_ms / 1000) + await self._clock.sleep(Duration(milliseconds=self._sleep_duration_ms)) return self._update_duration_ms diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 4ca3f8f4e1a..8a2053d25af 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -32,6 +32,7 @@ from synapse.storage.database import LoggingTransaction from synapse.storage.databases import Databases from synapse.types.storage import _BackgroundUpdates +from synapse.util.duration import Duration from synapse.util.stringutils import shortstr if TYPE_CHECKING: @@ -50,7 +51,7 @@ def __init__(self, hs: "HomeServer", stores: Databases): if hs.config.worker.run_background_tasks: self._delete_state_loop_call = hs.get_clock().looping_call( - self._delete_state_groups_loop, 60 * 1000 + self._delete_state_groups_loop, Duration(minutes=1) ) self.stores.state.db_pool.updates.register_background_update_handler( diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 18f0eac5852..2d5e1d3c482 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -62,6 +62,7 @@ from synapse.storage.types import Connection, Cursor, SQLQueryParameters from synapse.types import StrCollection from synapse.util.async_helpers import delay_cancellation +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -631,7 +632,7 @@ def __init__( # Check ASAP (and then later, every 1s) to see if we have finished # background updates of tables that aren't safe to update. self._clock.call_later( - 0.0, + Duration(seconds=0), self.hs.run_as_background_process, "upsert_safety_check", self._check_safe_to_upsert, @@ -679,7 +680,7 @@ async def _check_safe_to_upsert(self) -> None: # If there's any updates still running, reschedule to run. if background_update_names: self._clock.call_later( - 15.0, + Duration(seconds=15), self.hs.run_as_background_process, "upsert_safety_check", self._check_safe_to_upsert, @@ -706,7 +707,7 @@ def loop() -> None: "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters ) - self._clock.looping_call(loop, 10000) + self._clock.looping_call(loop, Duration(seconds=10)) def new_transaction( self, diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index b7b9b424618..a4530796f20 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -45,6 +45,7 @@ from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.util.caches.descriptors import CachedFunction +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -71,11 +72,11 @@ # How long between cache invalidation table cleanups, once we have caught up # with the backlog. -REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h") +REGULAR_CLEANUP_INTERVAL = Duration(hours=1) # How long between cache invalidation table cleanups, before we have caught # up with the backlog. -CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m") +CATCH_UP_CLEANUP_INTERVAL = Duration(minutes=1) # Maximum number of cache invalidation rows to delete at once. CLEAN_UP_MAX_BATCH_SIZE = 20_000 @@ -139,7 +140,7 @@ def __init__( self.database_engine, PostgresEngine ): self.hs.get_clock().call_later( - CATCH_UP_CLEANUP_INTERVAL_MS / 1000, + CATCH_UP_CLEANUP_INTERVAL, self._clean_up_cache_invalidation_wrapper, ) @@ -825,12 +826,12 @@ async def _clean_up_cache_invalidation_wrapper(self) -> None: # Vary how long we wait before calling again depending on whether we # are still sifting through backlog or we have caught up. if in_backlog: - next_interval = CATCH_UP_CLEANUP_INTERVAL_MS + next_interval = CATCH_UP_CLEANUP_INTERVAL else: - next_interval = REGULAR_CLEANUP_INTERVAL_MS + next_interval = REGULAR_CLEANUP_INTERVAL self.hs.get_clock().call_later( - next_interval / 1000, + next_interval, self._clean_up_cache_invalidation_wrapper, ) diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index 5d667a53453..a5ae4bf506a 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -32,6 +32,7 @@ ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.util.duration import Duration from synapse.util.json import json_encoder if TYPE_CHECKING: @@ -54,7 +55,7 @@ def __init__( hs.config.worker.run_background_tasks and self.hs.config.server.redaction_retention_period is not None ): - hs.get_clock().looping_call(self._censor_redactions, 5 * 60 * 1000) + hs.get_clock().looping_call(self._censor_redactions, Duration(minutes=5)) @wrap_as_background_process("_censor_redactions") async def _censor_redactions(self) -> None: diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 4948d0c286b..7cd3667a2b0 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -42,6 +42,7 @@ ) from synapse.types import JsonDict, UserID from synapse.util.caches.lrucache import LruCache +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -437,7 +438,7 @@ def __init__( ) if hs.config.worker.run_background_tasks and self.user_ips_max_age: - self.clock.looping_call(self._prune_old_user_ips, 5 * 1000) + self.clock.looping_call(self._prune_old_user_ips, Duration(seconds=5)) if self._update_on_this_worker: # This is the designated worker that can write to the client IP @@ -448,7 +449,7 @@ def __init__( tuple[str, str, str], tuple[str, str | None, int] ] = {} - self.clock.looping_call(self._update_client_ips_batch, 5 * 1000) + self.clock.looping_call(self._update_client_ips_batch, Duration(seconds=5)) hs.register_async_shutdown_handler( phase="before", eventType="shutdown", diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 28e706d5c34..fc61f46c1c5 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -152,7 +152,7 @@ def __init__( if hs.config.worker.run_background_tasks: self.clock.looping_call( run_as_background_process, - DEVICE_FEDERATION_INBOX_CLEANUP_INTERVAL.as_millis(), + DEVICE_FEDERATION_INBOX_CLEANUP_INTERVAL, "_delete_old_federation_inbox_rows", self.server_name, self._delete_old_federation_inbox_rows, @@ -1029,7 +1029,7 @@ def _delete_old_federation_inbox_rows_txn(txn: LoggingTransaction) -> bool: # We sleep a bit so that we don't hammer the database in a tight # loop first time we run this. - await self.clock.sleep(1) + await self.clock.sleep(Duration(seconds=1)) async def get_devices_with_messages( self, user_id: str, device_ids: StrCollection diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index caae2a0648d..cbad40faf75 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -62,6 +62,7 @@ from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter from synapse.util.json import json_decoder, json_encoder from synapse.util.stringutils import shortstr @@ -191,7 +192,7 @@ def __init__( if hs.config.worker.run_background_tasks: self.clock.looping_call( - self._prune_old_outbound_device_pokes, 60 * 60 * 1000 + self._prune_old_outbound_device_pokes, Duration(hours=1) ) def process_replication_rows( diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index b2f0aeaf581..cc7083b605d 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -56,6 +56,7 @@ from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter from synapse.util.json import json_encoder @@ -155,7 +156,7 @@ def __init__( if hs.config.worker.run_background_tasks: hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + self._delete_old_forward_extrem_cache, Duration(hours=1) ) # Cache of event ID to list of auth event IDs and their depths. @@ -171,7 +172,9 @@ def __init__( # index. self.tests_allow_no_chain_cover_index = True - self.clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000) + self.clock.looping_call( + self._get_stats_for_federation_staging, Duration(seconds=30) + ) if isinstance(self.database_engine, PostgresEngine): self.db_pool.updates.register_background_validate_constraint_and_delete_rows( diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 2e99d7314e8..a66caa672cd 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -105,6 +105,7 @@ from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.types import JsonDict, StrCollection from synapse.util.caches.descriptors import cached +from synapse.util.duration import Duration from synapse.util.json import json_encoder if TYPE_CHECKING: @@ -270,15 +271,17 @@ def __init__( self._find_stream_orderings_for_times_txn(cur) cur.close() - self.clock.looping_call(self._find_stream_orderings_for_times, 10 * 60 * 1000) + self.clock.looping_call( + self._find_stream_orderings_for_times, Duration(minutes=10) + ) self._rotate_count = 10000 self._doing_notif_rotation = False if hs.config.worker.run_background_tasks: - self.clock.looping_call(self._rotate_notifs, 30 * 1000) + self.clock.looping_call(self._rotate_notifs, Duration(seconds=30)) self.clock.looping_call( - self._clear_old_push_actions_staging, 30 * 60 * 1000 + self._clear_old_push_actions_staging, Duration(minutes=30) ) self.db_pool.updates.register_background_index_update( @@ -1817,7 +1820,7 @@ def _clear_old_push_actions_staging_txn(txn: LoggingTransaction) -> bool: return # We sleep to ensure that we don't overwhelm the DB. - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) async def get_push_actions_for_user( self, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 4cf708442d4..ae6ee50dc24 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -92,6 +92,7 @@ from synapse.util.caches.lrucache import AsyncLruCache from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -278,7 +279,7 @@ def __init__( # We periodically clean out old transaction ID mappings self.clock.looping_call( self._cleanup_old_transaction_ids, - 5 * 60 * 1000, + Duration(minutes=5), ) self._get_event_cache: AsyncLruCache[tuple[str], EventCacheEntry] = ( diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 51f04acbcb1..dd49f98366e 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -38,6 +38,7 @@ ) from synapse.types import ISynapseReactor from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.stringutils import random_string if TYPE_CHECKING: @@ -49,11 +50,13 @@ # How often to renew an acquired lock by updating the `last_renewed_ts` time in # the lock table. -_RENEWAL_INTERVAL_MS = 30 * 1000 +_RENEWAL_INTERVAL = Duration(seconds=30) # How long before an acquired lock times out. _LOCK_TIMEOUT_MS = 2 * 60 * 1000 +_LOCK_REAP_INTERVAL = Duration(milliseconds=_LOCK_TIMEOUT_MS / 10.0) + class LockStore(SQLBaseStore): """Provides a best effort distributed lock between worker instances. @@ -106,9 +109,7 @@ def __init__( self._acquiring_locks: set[tuple[str, str]] = set() - self.clock.looping_call( - self._reap_stale_read_write_locks, _LOCK_TIMEOUT_MS / 10.0 - ) + self.clock.looping_call(self._reap_stale_read_write_locks, _LOCK_REAP_INTERVAL) @wrap_as_background_process("LockStore._on_shutdown") async def _on_shutdown(self) -> None: @@ -410,7 +411,7 @@ def __init__( def _setup_looping_call(self) -> None: self._looping_call = self._clock.looping_call( self._renew, - _RENEWAL_INTERVAL_MS, + _RENEWAL_INTERVAL, self._server_name, self._store, self._hs, diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index dc8e2c16165..b2b45612478 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -34,6 +34,7 @@ from synapse.storage.databases.main.event_push_actions import ( EventPushActionsWorkerStore, ) +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -78,7 +79,7 @@ def __init__( # Read the extrems every 60 minutes if hs.config.worker.run_background_tasks: - self.clock.looping_call(self._read_forward_extremities, 60 * 60 * 1000) + self.clock.looping_call(self._read_forward_extremities, Duration(hours=1)) # Used in _generate_user_daily_visits to keep track of progress self._last_user_visit_update = self._get_start_of_day() diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 545b0f11c42..9a9c0fffc7e 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -49,13 +49,12 @@ from synapse.storage.util.sequence import build_sequence_generator from synapse.types import JsonDict, StrCollection, UserID, UserInfo from synapse.util.caches.descriptors import cached +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter if TYPE_CHECKING: from synapse.server import HomeServer -THIRTY_MINUTES_IN_MS = 30 * 60 * 1000 - logger = logging.getLogger(__name__) @@ -213,7 +212,7 @@ def __init__( if hs.config.worker.run_background_tasks: self.clock.call_later( - 0.0, + Duration(seconds=0), self._set_expiration_date_when_missing, ) @@ -227,7 +226,7 @@ def __init__( # Create a background job for culling expired 3PID validity tokens if hs.config.worker.run_background_tasks: self.clock.looping_call( - self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS + self.cull_expired_threepid_validation_tokens, Duration(minutes=30) ) async def register_user( @@ -2739,7 +2738,7 @@ def __init__( # Create a background job for removing expired login tokens if hs.config.worker.run_background_tasks: self.clock.looping_call( - self._delete_expired_login_tokens, THIRTY_MINUTES_IN_MS + self._delete_expired_login_tokens, Duration(minutes=30) ) async def add_access_token_to_user( diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 4fb7779d38d..9b06ab69fed 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -63,6 +63,7 @@ get_domain_from_id, ) from synapse.util.caches.descriptors import _CacheContext, cached, cachedList +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -110,10 +111,10 @@ def __init__( self._known_servers_count = 1 self.hs.get_clock().looping_call( self._count_known_servers, - 60 * 1000, + Duration(minutes=1), ) self.hs.get_clock().call_later( - 1, + Duration(seconds=1), self._count_known_servers, ) federation_known_servers_gauge.register_hook( diff --git a/synapse/storage/databases/main/session.py b/synapse/storage/databases/main/session.py index 1154bb2d599..f088a8d88cf 100644 --- a/synapse/storage/databases/main/session.py +++ b/synapse/storage/databases/main/session.py @@ -30,6 +30,7 @@ LoggingTransaction, ) from synapse.types import JsonDict +from synapse.util.duration import Duration from synapse.util.json import json_encoder if TYPE_CHECKING: @@ -55,7 +56,7 @@ def __init__( # Create a background job for culling expired sessions. if hs.config.worker.run_background_tasks: - self.clock.looping_call(self._delete_expired_sessions, 30 * 60 * 1000) + self.clock.looping_call(self._delete_expired_sessions, Duration(minutes=30)) async def create_session( self, session_type: str, value: JsonDict, expiry_ms: int diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index 6f87308cde6..828eed3a73c 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -96,7 +96,7 @@ def __init__( if self.hs.config.worker.run_background_tasks: self.clock.looping_call( self.delete_old_sliding_sync_connections, - CONNECTION_EXPIRY_FREQUENCY.as_millis(), + CONNECTION_EXPIRY_FREQUENCY, ) async def get_latest_bump_stamp_for_room( diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 70c5b928fdc..2fdd27d3da1 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -37,6 +37,7 @@ from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.types import JsonDict, StrCollection from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -81,7 +82,7 @@ def __init__( super().__init__(database, db_conn, hs) if hs.config.worker.run_background_tasks: - self.clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) + self.clock.looping_call(self._cleanup_transactions, Duration(minutes=30)) @wrap_as_background_process("cleanup_transactions") async def _cleanup_transactions(self) -> None: diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 6f9bbcac678..818f8b1a69b 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -58,6 +58,7 @@ run_in_background, ) from synapse.util.clock import Clock +from synapse.util.duration import Duration logger = logging.getLogger(__name__) @@ -640,7 +641,7 @@ async def _acquire_lock(self, key: Hashable) -> _LinearizerEntry: # This needs to happen while we hold the lock. We could put it on the # exit path, but that would slow down the uncontended case. try: - await self._clock.sleep(0) + await self._clock.sleep(Duration(seconds=0)) except CancelledError: self._release_lock(key, entry) raise @@ -818,7 +819,9 @@ def time_it_out() -> None: # We don't track these calls since they are short. delayed_call = clock.call_later( - timeout, time_it_out, call_later_cancel_on_shutdown=cancel_on_shutdown + Duration(seconds=timeout), + time_it_out, + call_later_cancel_on_shutdown=cancel_on_shutdown, ) def convert_cancelled(value: Failure) -> Failure: diff --git a/synapse/util/batching_queue.py b/synapse/util/batching_queue.py index 514abcbec16..43eefcb7f18 100644 --- a/synapse/util/batching_queue.py +++ b/synapse/util/batching_queue.py @@ -36,6 +36,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.metrics import SERVER_NAME_LABEL from synapse.util.clock import Clock +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -175,7 +176,7 @@ async def _process_queue(self, key: Hashable) -> None: # pattern is to call `add_to_queue` multiple times at once, and # deferring to the next reactor tick allows us to batch all of # those up. - await self._clock.sleep(0) + await self._clock.sleep(Duration(seconds=0)) next_values = self._next_values.pop(key, []) if not next_values: diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 528e4bb852c..87870f42237 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -38,6 +38,7 @@ from synapse.config import cache as cache_config from synapse.util.caches import EvictionReason, register_cache from synapse.util.clock import Clock +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -112,7 +113,7 @@ def __init__( def f() -> "defer.Deferred[None]": return hs.run_as_background_process("prune_cache", self._prune_cache) - self._clock.looping_call(f, self._expiry_ms / 2) + self._clock.looping_call(f, Duration(milliseconds=self._expiry_ms / 2)) def __setitem__(self, key: KT, value: VT) -> None: now = self._clock.time_msec() diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index d304e804e94..a3e7bd4d03e 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -50,6 +50,7 @@ iterate_tree_cache_items, ) from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.linked_list import ListNode if TYPE_CHECKING: @@ -202,9 +203,9 @@ async def _internal_expire_old_entries( if (i + 1) % 10000 == 0: logger.debug("Waiting during drop") if node.last_access_ts_secs > now - expiry_seconds: - await clock.sleep(0.5) + await clock.sleep(Duration(milliseconds=500)) else: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) logger.debug("Waking during drop") node = next_node @@ -248,7 +249,7 @@ def setup_expire_lru_cache_entries(hs: "HomeServer") -> None: clock = hs.get_clock() clock.looping_call( _expire_old_entries, - 30 * 1000, + Duration(seconds=30), server_name, hs, clock, diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index b1cdc81dda2..0289e13f6a0 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -42,6 +42,7 @@ from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred from synapse.util.caches import EvictionReason, register_cache from synapse.util.clock import Clock +from synapse.util.duration import Duration logger = logging.getLogger(__name__) @@ -120,7 +121,7 @@ def __init__( self._result_cache: dict[KV, ResponseCacheEntry] = {} self.clock = clock - self.timeout_sec = timeout_ms / 1000.0 + self.timeout = Duration(milliseconds=timeout_ms) self._name = name self._metrics = register_cache( @@ -195,9 +196,9 @@ def on_complete(r: RV) -> RV: # if this cache has a non-zero timeout, and the callback has not cleared # the should_cache bit, we leave it in the cache for now and schedule # its removal later. - if self.timeout_sec and context.should_cache: + if self.timeout and context.should_cache: self.clock.call_later( - self.timeout_sec, + self.timeout, self._entry_timeout, key, # We don't need to track these calls since they don't hold any strong diff --git a/synapse/util/clock.py b/synapse/util/clock.py index 65f71648967..6fd31864b75 100644 --- a/synapse/util/clock.py +++ b/synapse/util/clock.py @@ -31,6 +31,7 @@ from synapse.logging import context from synapse.types import ISynapseThreadlessReactor from synapse.util import log_failure +from synapse.util.duration import Duration from synapse.util.stringutils import random_string_insecure_fast P = ParamSpec("P") @@ -84,14 +85,14 @@ def shutdown(self) -> None: self.cancel_all_looping_calls() self.cancel_all_delayed_calls() - async def sleep(self, seconds: float) -> None: + async def sleep(self, duration: Duration) -> None: d: defer.Deferred[float] = defer.Deferred() # Start task in the `sentinel` logcontext, to avoid leaking the current context # into the reactor once it finishes. with context.PreserveLoggingContext(): # We can ignore the lint here since this class is the one location callLater should # be called. - self._reactor.callLater(seconds, d.callback, seconds) # type: ignore[call-later-not-tracked] + self._reactor.callLater(duration.as_secs(), d.callback, duration.as_secs()) # type: ignore[call-later-not-tracked] await d def time(self) -> float: @@ -105,13 +106,13 @@ def time_msec(self) -> int: def looping_call( self, f: Callable[P, object], - msec: float, + duration: Duration, *args: P.args, **kwargs: P.kwargs, ) -> LoopingCall: """Call a function repeatedly. - Waits `msec` initially before calling `f` for the first time. + Waits `duration` initially before calling `f` for the first time. If the function given to `looping_call` returns an awaitable/deferred, the next call isn't scheduled until after the returned awaitable has finished. We get @@ -124,16 +125,16 @@ def looping_call( Args: f: The function to call repeatedly. - msec: How long to wait between calls in milliseconds. + duration: How long to wait between calls. *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ - return self._looping_call_common(f, msec, False, *args, **kwargs) + return self._looping_call_common(f, duration, False, *args, **kwargs) def looping_call_now( self, f: Callable[P, object], - msec: float, + duration: Duration, *args: P.args, **kwargs: P.kwargs, ) -> LoopingCall: @@ -148,16 +149,16 @@ def looping_call_now( Args: f: The function to call repeatedly. - msec: How long to wait between calls in milliseconds. + duration: How long to wait between calls. *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ - return self._looping_call_common(f, msec, True, *args, **kwargs) + return self._looping_call_common(f, duration, True, *args, **kwargs) def _looping_call_common( self, f: Callable[P, object], - msec: float, + duration: Duration, now: bool, *args: P.args, **kwargs: P.kwargs, @@ -217,7 +218,7 @@ def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: # We want to start the task in the `sentinel` logcontext, to avoid leaking the # current context into the reactor after the function finishes. with context.PreserveLoggingContext(): - d = call.start(msec / 1000.0, now=now) + d = call.start(duration.as_secs(), now=now) d.addErrback(log_failure, "Looping call died", consumeErrors=False) self._looping_calls.append(call) @@ -225,7 +226,7 @@ def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: "%s(%s): Scheduled looping call every %sms later", looping_call_context_string, instance_id, - msec, + duration.as_millis(), # Find out who is scheduling the call which makes it easy to follow in the # logs. stack_info=True, @@ -251,7 +252,7 @@ def cancel_all_looping_calls(self, consumeErrors: bool = True) -> None: def call_later( self, - delay: float, + delay: Duration, callback: Callable, *args: Any, call_later_cancel_on_shutdown: bool = True, @@ -264,7 +265,7 @@ def call_later( `run_as_background_process` to give it more specific label and track metrics. Args: - delay: How long to wait in seconds. + delay: How long to wait. callback: Function to call *args: Postional arguments to pass to function. call_later_cancel_on_shutdown: Whether this call should be tracked for cleanup during @@ -322,7 +323,9 @@ def wrapped_callback(*args: Any, **kwargs: Any) -> None: # We can ignore the lint here since this class is the one location callLater should # be called. - call = self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) # type: ignore[call-later-not-tracked] + call = self._reactor.callLater( + delay.as_secs(), wrapped_callback, *args, **kwargs + ) # type: ignore[call-later-not-tracked] logger.debug( "call_later(%s): Scheduled call for %ss later (tracked for shutdown: %s)", diff --git a/synapse/util/duration.py b/synapse/util/duration.py index 3419f6dda60..135b9808520 100644 --- a/synapse/util/duration.py +++ b/synapse/util/duration.py @@ -13,6 +13,7 @@ # from datetime import timedelta +from typing import overload # Constant so we don't keep creating new timedelta objects when calling # `.as_millis()`. @@ -35,6 +36,82 @@ def as_millis(self) -> int: """Returns the duration in milliseconds.""" return int(self / _ONE_MILLISECOND) - def as_secs(self) -> int: + def as_secs(self) -> float: """Returns the duration in seconds.""" - return int(self.total_seconds()) + return self.total_seconds() + + # Override arithmetic operations to return Duration instances + + def __add__(self, other: timedelta) -> "Duration": + """Add two durations together, returning a Duration.""" + result = super().__add__(other) + return Duration(seconds=result.total_seconds()) + + def __radd__(self, other: timedelta) -> "Duration": + """Add two durations together (reversed), returning a Duration.""" + result = super().__radd__(other) + return Duration(seconds=result.total_seconds()) + + def __sub__(self, other: timedelta) -> "Duration": + """Subtract two durations, returning a Duration.""" + result = super().__sub__(other) + return Duration(seconds=result.total_seconds()) + + def __rsub__(self, other: timedelta) -> "Duration": + """Subtract two durations (reversed), returning a Duration.""" + result = super().__rsub__(other) + return Duration(seconds=result.total_seconds()) + + def __mul__(self, other: float) -> "Duration": + """Multiply a duration by a scalar, returning a Duration.""" + result = super().__mul__(other) + return Duration(seconds=result.total_seconds()) + + def __rmul__(self, other: float) -> "Duration": + """Multiply a duration by a scalar (reversed), returning a Duration.""" + result = super().__rmul__(other) + return Duration(seconds=result.total_seconds()) + + @overload + def __truediv__(self, other: timedelta) -> float: ... + + @overload + def __truediv__(self, other: float) -> "Duration": ... + + def __truediv__(self, other: float | timedelta) -> "Duration | float": + """Divide a duration by a scalar or another duration. + + If dividing by a scalar, returns a Duration. + If dividing by a timedelta, returns a float ratio. + """ + result = super().__truediv__(other) + if isinstance(other, timedelta): + # Dividing by a timedelta gives a float ratio + assert isinstance(result, float) + return result + else: + # Dividing by a scalar gives a Duration + assert isinstance(result, timedelta) + return Duration(seconds=result.total_seconds()) + + @overload + def __floordiv__(self, other: timedelta) -> int: ... + + @overload + def __floordiv__(self, other: int) -> "Duration": ... + + def __floordiv__(self, other: int | timedelta) -> "Duration | int": + """Floor divide a duration by a scalar or another duration. + + If dividing by a scalar, returns a Duration. + If dividing by a timedelta, returns an int ratio. + """ + result = super().__floordiv__(other) + if isinstance(other, timedelta): + # Dividing by a timedelta gives an int ratio + assert isinstance(result, int) + return result + else: + # Dividing by a scalar gives a Duration + assert isinstance(result, timedelta) + return Duration(seconds=result.total_seconds()) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 024706d9cfd..d1053d227be 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -48,6 +48,7 @@ from synapse.logging.opentracing import start_active_span from synapse.metrics import SERVER_NAME_LABEL, Histogram, LaterGauge from synapse.util.clock import Clock +from synapse.util.duration import Duration if typing.TYPE_CHECKING: from contextlib import _GeneratorContextManager @@ -353,7 +354,9 @@ def queue_request() -> "defer.Deferred[None]": rate_limiter_name=self.metrics_name, **{SERVER_NAME_LABEL: self.our_server_name}, ).inc() - ret_defer = run_in_background(self.clock.sleep, self.sleep_sec) + ret_defer = run_in_background( + self.clock.sleep, Duration(seconds=self.sleep_sec) + ) self.sleeping_requests.add(request_id) @@ -414,6 +417,6 @@ def start_next_request() -> None: pass self.clock.call_later( - 0.0, + Duration(seconds=0), start_next_request, ) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 3b4423a1ff5..353ddb70bc3 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -35,6 +35,7 @@ wrap_as_background_process, ) from synapse.types import JsonMapping, ScheduledTask, TaskStatus +from synapse.util.duration import Duration from synapse.util.stringutils import random_string if TYPE_CHECKING: @@ -92,8 +93,8 @@ class TaskScheduler: """ # Precision of the scheduler, evaluation of tasks to run will only happen - # every `SCHEDULE_INTERVAL_MS` ms - SCHEDULE_INTERVAL_MS = 1 * 60 * 1000 # 1mn + # every `SCHEDULE_INTERVAL` + SCHEDULE_INTERVAL = Duration(minutes=1) # How often to clean up old tasks. CLEANUP_INTERVAL_MS = 30 * 60 * 1000 # Time before a complete or failed task is deleted from the DB @@ -103,7 +104,7 @@ class TaskScheduler: # Time from the last task update after which we will log a warning LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs # Report a running task's status and usage every so often. - OCCASIONAL_REPORT_INTERVAL_MS = 5 * 60 * 1000 # 5 minutes + OCCASIONAL_REPORT_INTERVAL = Duration(minutes=5) def __init__(self, hs: "HomeServer"): self.hs = hs # nb must be called this for @wrap_as_background_process @@ -127,11 +128,11 @@ def __init__(self, hs: "HomeServer"): if self._run_background_tasks: self._clock.looping_call( self._launch_scheduled_tasks, - TaskScheduler.SCHEDULE_INTERVAL_MS, + TaskScheduler.SCHEDULE_INTERVAL, ) self._clock.looping_call( self._clean_scheduled_tasks, - TaskScheduler.SCHEDULE_INTERVAL_MS, + TaskScheduler.SCHEDULE_INTERVAL, ) running_tasks_gauge.register_hook( @@ -433,7 +434,7 @@ async def wrapper() -> None: start_time = self._clock.time() occasional_status_call = self._clock.looping_call( _occasional_report, - TaskScheduler.OCCASIONAL_REPORT_INTERVAL_MS, + TaskScheduler.OCCASIONAL_REPORT_INTERVAL, log_context, start_time, ) @@ -468,7 +469,7 @@ async def wrapper() -> None: # Try launch a new task since we've finished with this one. self._clock.call_later( - 0.1, + Duration(milliseconds=100), self._launch_scheduled_tasks, ) diff --git a/synmark/suites/logging.py b/synmark/suites/logging.py index d89f487d3dc..243f9dbca09 100644 --- a/synmark/suites/logging.py +++ b/synmark/suites/logging.py @@ -37,6 +37,7 @@ from synapse.synapse_rust import reset_logging_config from synapse.types import ISynapseReactor from synapse.util.clock import Clock +from synapse.util.duration import Duration class LineCounter(LineOnlyReceiver): @@ -141,7 +142,7 @@ class _logging: if len(handler._buffer) == handler.maximum_buffer: while len(handler._buffer) > handler.maximum_buffer / 2: - await clock.sleep(0.01) + await clock.sleep(Duration(milliseconds=10)) await logger_factory.on_done diff --git a/tests/federation/transport/server/test__base.py b/tests/federation/transport/server/test__base.py index 3c553e6e402..00a9c2064c6 100644 --- a/tests/federation/transport/server/test__base.py +++ b/tests/federation/transport/server/test__base.py @@ -30,6 +30,7 @@ from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from synapse.util.ratelimitutils import FederationRateLimiter from tests import unittest @@ -53,13 +54,13 @@ def __init__( async def on_GET( self, origin: str, content: None, query: dict[bytes, list[bytes]] ) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} async def on_POST( self, origin: str, content: JsonDict, query: dict[bytes, list[bytes]] ) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index acd37a1c711..d71156d67e5 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -250,7 +250,7 @@ def test_delete_device_and_big_device_inbox(self) -> None: self.assertEqual(10, len(res)) # wait for the task scheduler to do a second delete pass - self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL.as_secs()) # remaining messages should now be deleted res = self.get_success( diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 70557a4a5fa..623eef0ecb6 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -544,7 +544,7 @@ def test_prune_typing_replication(self) -> None: ) self.assertEqual(rows, [(2, [ROOM_ID, []])]) - self.reactor.advance(FORGET_TIMEOUT) + self.reactor.advance(FORGET_TIMEOUT.as_secs()) rows, _, _ = self.get_success( self.handler.get_all_typing_updates( diff --git a/tests/http/test_servlet.py b/tests/http/test_servlet.py index 5bf8305d05e..2f1c8f03c6c 100644 --- a/tests/http/test_servlet.py +++ b/tests/http/test_servlet.py @@ -34,6 +34,7 @@ from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from tests import unittest from tests.http.server._base import test_disconnect @@ -108,11 +109,11 @@ def __init__(self, hs: HomeServer): @cancellable async def on_GET(self, request: SynapseRequest) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} async def on_POST(self, request: SynapseRequest) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} diff --git a/tests/logging/test_opentracing.py b/tests/logging/test_opentracing.py index 3aaa7432654..d5e643585d5 100644 --- a/tests/logging/test_opentracing.py +++ b/tests/logging/test_opentracing.py @@ -37,6 +37,7 @@ ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests.server import get_clock @@ -184,7 +185,7 @@ async def task(i: int) -> None: scopes.append(scope) self.assertEqual(self._tracer.active_span, scope.span) - await clock.sleep(4) + await clock.sleep(Duration(seconds=4)) self.assertEqual(self._tracer.active_span, scope.span) scope.close() @@ -194,7 +195,7 @@ async def root() -> None: scopes.append(root_scope) d1 = run_in_background(task, 1) - await clock.sleep(2) + await clock.sleep(Duration(seconds=2)) d2 = run_in_background(task, 2) # because we did run_in_background, the active span should still be the @@ -351,7 +352,7 @@ async def bg_task() -> None: # Now wait for the background process to finish while not callback_finished: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self.assertTrue( callback_finished, @@ -418,7 +419,7 @@ async def bg_task() -> None: # Now wait for the background process to finish while not callback_finished: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self.assertTrue( callback_finished, diff --git a/tests/replication/http/test__base.py b/tests/replication/http/test__base.py index b757c6428a7..1c7e7e997bb 100644 --- a/tests/replication/http/test__base.py +++ b/tests/replication/http/test__base.py @@ -30,6 +30,7 @@ from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from tests import unittest from tests.http.server._base import test_disconnect @@ -52,7 +53,7 @@ async def _serialize_payload(**kwargs: ReplicationEndpoint) -> JsonDict: async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict ) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} @@ -73,7 +74,7 @@ async def _serialize_payload(**kwargs: ReplicationEndpoint) -> JsonDict: async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict ) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} diff --git a/tests/rest/admin/test_background_updates.py b/tests/rest/admin/test_background_updates.py index 25112baaa2b..a4a3112e20e 100644 --- a/tests/rest/admin/test_background_updates.py +++ b/tests/rest/admin/test_background_updates.py @@ -31,6 +31,7 @@ from synapse.storage.background_updates import BackgroundUpdater from synapse.types import JsonDict from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest @@ -105,7 +106,7 @@ def _register_bg_update(self) -> None: "Adds a bg update but doesn't start it" async def _fake_update(progress: JsonDict, batch_size: int) -> int: - await self.clock.sleep(0.2) + await self.clock.sleep(Duration(milliseconds=200)) return batch_size self.store.db_pool.updates.register_background_update_handler( diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 7daf13ad220..1c340efa0cd 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -44,6 +44,7 @@ ) from synapse.types import UserID from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.task_scheduler import TaskScheduler from tests import unittest @@ -1161,7 +1162,7 @@ def test_delete_same_room_twice(self) -> None: # Mock PaginationHandler.purge_room to sleep for 100s, so we have time to do a second call # before the purge is over. Note that it doesn't purge anymore, but we don't care. async def purge_room(room_id: str, force: bool) -> None: - await self.hs.get_clock().sleep(100) + await self.hs.get_clock().sleep(Duration(seconds=100)) self.pagination_handler.purge_room = AsyncMock(side_effect=purge_room) # type: ignore[method-assign] @@ -1464,7 +1465,7 @@ def test_scheduled_purge_room(self) -> None: self._is_purged(room_id) # Wait for next scheduler run - self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS) + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL.as_secs()) self._is_purged(room_id) @@ -1501,7 +1502,7 @@ def test_schedule_shutdown_room(self) -> None: self._is_purged(room_id) # Wait for next scheduler run - self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS) + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL.as_secs()) # Test that all users has been kicked (room is shutdown) self._has_no_members(room_id) diff --git a/tests/rest/client/test_transactions.py b/tests/rest/client/test_transactions.py index 0407bb53477..31586a451ff 100644 --- a/tests/rest/client/test_transactions.py +++ b/tests/rest/client/test_transactions.py @@ -29,6 +29,7 @@ from synapse.rest.client.transactions import CLEANUP_PERIOD, HttpTransactionCache from synapse.types import ISynapseReactor, JsonDict from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.server import get_clock @@ -93,7 +94,7 @@ def cb() -> Generator["defer.Deferred[object]", object, tuple[int, JsonDict]]: # Ignore `multiple-internal-clocks` linter error here since we are creating a `Clock` # for testing purposes. yield defer.ensureDeferred( - Clock(reactor, server_name="test_server").sleep(0) # type: ignore[multiple-internal-clocks] + Clock(reactor, server_name="test_server").sleep(Duration(seconds=0)) # type: ignore[multiple-internal-clocks] ) return 1, {} diff --git a/tests/server_notices/__init__.py b/tests/server_notices/__init__.py index eca52930db2..19bda218e30 100644 --- a/tests/server_notices/__init__.py +++ b/tests/server_notices/__init__.py @@ -20,6 +20,7 @@ from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.unittest import override_config @@ -131,7 +132,7 @@ def _check_user_received_server_notice( break # Sleep and try again. - self.get_success(self.clock.sleep(0.1)) + self.get_success(self.clock.sleep(Duration(milliseconds=100))) else: self.fail( f"Failed to join the server notices room. No 'join' field in sync_body['rooms']: {sync_body['rooms']}" diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index 7db710846d3..85ce5bede25 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -42,6 +42,7 @@ ) from synapse.storage.databases.main.event_federation import StateDifference from synapse.types import EventID, StateMap +from synapse.util.duration import Duration from tests import unittest @@ -61,7 +62,7 @@ class FakeClock: - async def sleep(self, msec: float) -> None: + async def sleep(self, duration: Duration) -> None: return None diff --git a/tests/state/test_v21.py b/tests/state/test_v21.py index b17773fb562..58d800f921d 100644 --- a/tests/state/test_v21.py +++ b/tests/state/test_v21.py @@ -39,6 +39,7 @@ ) from synapse.types import StateMap from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.state.test_v2 import TestStateResolutionStore @@ -66,7 +67,7 @@ def monotonic_timestamp() -> int: class FakeClock: - async def sleep(self, duration_ms: float) -> None: + async def sleep(self, duration: Duration) -> None: defer.succeed(None) diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py index 3743a4a386f..622eb96ded0 100644 --- a/tests/storage/databases/main/test_lock.py +++ b/tests/storage/databases/main/test_lock.py @@ -26,7 +26,7 @@ from twisted.internet.testing import MemoryReactor from synapse.server import HomeServer -from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS, _RENEWAL_INTERVAL_MS +from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS, _RENEWAL_INTERVAL from synapse.util.clock import Clock from tests import unittest @@ -377,7 +377,7 @@ def test_maintain_lock(self) -> None: # Wait for ages with the lock, we should not be able to get the lock. for _ in range(10): - self.reactor.advance((_RENEWAL_INTERVAL_MS / 1000)) + self.reactor.advance((_RENEWAL_INTERVAL.as_secs())) lock2 = self.get_success( self.store.try_acquire_read_write_lock("name", "key", write=True) diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 3505423691f..e3f79d76707 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -38,6 +38,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import JsonDict from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.unittest import override_config @@ -59,7 +60,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: async def update(self, progress: JsonDict, count: int) -> int: duration_ms = 10 - await self.clock.sleep((count * duration_ms) / 1000) + await self.clock.sleep(Duration(milliseconds=count * duration_ms)) progress = {"my_key": progress["my_key"] + 1} await self.store.db_pool.runInteraction( "update_progress", @@ -309,7 +310,7 @@ def test_background_update_min_batch_set_in_config(self) -> None: # Run the update with the long-running update item async def update_long(progress: JsonDict, count: int) -> int: - await self.clock.sleep((count * duration_ms) / 1000) + await self.clock.sleep(Duration(milliseconds=count * duration_ms)) progress = {"my_key": progress["my_key"] + 1} await self.store.db_pool.runInteraction( "update_progress", diff --git a/tests/test_server.py b/tests/test_server.py index 2df6bdfa44a..ec31b6cc5f6 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -38,6 +38,7 @@ from synapse.types import JsonDict from synapse.util.cancellation import cancellable from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.http.server._base import test_disconnect @@ -406,11 +407,11 @@ def __init__(self, clock: Clock): @cancellable async def _async_render_GET(self, request: SynapseRequest) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} async def _async_render_POST(self, request: SynapseRequest) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} @@ -423,11 +424,11 @@ def __init__(self, clock: Clock): @cancellable async def _async_render_GET(self, request: SynapseRequest) -> tuple[int, bytes]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, b"ok" async def _async_render_POST(self, request: SynapseRequest) -> tuple[int, bytes]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, b"ok" diff --git a/tests/util/caches/test_response_cache.py b/tests/util/caches/test_response_cache.py index 30cd6ef0e48..def5c817db2 100644 --- a/tests/util/caches/test_response_cache.py +++ b/tests/util/caches/test_response_cache.py @@ -26,6 +26,7 @@ from twisted.internet import defer from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext +from synapse.util.duration import Duration from tests.server import get_clock from tests.unittest import TestCase @@ -55,7 +56,7 @@ async def instant_return(o: str) -> str: return o async def delayed_return(self, o: str) -> str: - await self.clock.sleep(1) + await self.clock.sleep(Duration(seconds=1)) return o def test_cache_hit(self) -> None: @@ -182,7 +183,7 @@ def test_cache_context_nocache(self, should_cache: bool) -> None: async def non_caching(o: str, cache_context: ResponseCacheContext[int]) -> str: nonlocal call_count call_count += 1 - await self.clock.sleep(1) + await self.clock.sleep(Duration(seconds=1)) cache_context.should_cache = should_cache return o diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index ca805bb20a0..a4114cdfcc5 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -37,6 +37,7 @@ ) from synapse.types import ISynapseReactor from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.unittest import logcontext_clean @@ -82,7 +83,7 @@ async def competing_callback() -> None: self._check_test_key("sentinel") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("sentinel") @@ -94,9 +95,9 @@ async def competing_callback() -> None: reactor.callLater(0, lambda: defer.ensureDeferred(competing_callback())) # type: ignore[call-later-not-tracked] with LoggingContext(name="foo", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") self.assertTrue( @@ -128,7 +129,7 @@ async def competing_callback() -> None: self._check_test_key("looping_call") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("looping_call") @@ -139,12 +140,12 @@ async def competing_callback() -> None: with LoggingContext(name="foo", server_name="test_server"): lc = clock.looping_call( - lambda: defer.ensureDeferred(competing_callback()), 0 + lambda: defer.ensureDeferred(competing_callback()), Duration(seconds=0) ) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") self.assertTrue( @@ -179,7 +180,7 @@ async def competing_callback() -> None: self._check_test_key("looping_call") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("looping_call") @@ -190,10 +191,10 @@ async def competing_callback() -> None: with LoggingContext(name="foo", server_name="test_server"): lc = clock.looping_call_now( - lambda: defer.ensureDeferred(competing_callback()), 0 + lambda: defer.ensureDeferred(competing_callback()), Duration(seconds=0) ) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") self.assertTrue( @@ -228,7 +229,7 @@ async def competing_callback() -> None: self._check_test_key("call_later") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("call_later") @@ -238,11 +239,13 @@ async def competing_callback() -> None: callback_finished = True with LoggingContext(name="foo", server_name="test_server"): - clock.call_later(0, lambda: defer.ensureDeferred(competing_callback())) + clock.call_later( + Duration(seconds=0), lambda: defer.ensureDeferred(competing_callback()) + ) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") self.assertTrue( @@ -280,7 +283,7 @@ async def competing_callback() -> None: self._check_test_key("foo") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("foo") @@ -303,7 +306,7 @@ async def competing_callback() -> None: await d self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self.assertTrue( callback_finished, @@ -338,7 +341,7 @@ async def competing_callback() -> None: self._check_test_key("sentinel") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("sentinel") @@ -364,7 +367,7 @@ async def competing_callback() -> None: d.callback(None) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self.assertTrue( callback_finished, @@ -400,7 +403,7 @@ async def competing_callback() -> None: self._check_test_key("foo") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("foo") @@ -446,7 +449,7 @@ async def competing_callback() -> None: run_in_background(lambda: (d.callback(None), d)[1]) # type: ignore[call-overload, func-returns-value] self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self.assertTrue( callback_finished, @@ -486,7 +489,7 @@ def callback(result: object) -> object: # Now wait for the function under test to have run, and check that # the logcontext is left in a sane state. while not callback_finished: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") self.assertTrue( @@ -501,7 +504,7 @@ def callback(result: object) -> object: async def test_run_in_background_with_blocking_fn(self) -> None: async def blocking_function() -> None: # Ignore linter error since we are creating a `Clock` for testing purposes. - await Clock(reactor, server_name="test_server").sleep(0) # type: ignore[multiple-internal-clocks] + await Clock(reactor, server_name="test_server").sleep(Duration(seconds=0)) # type: ignore[multiple-internal-clocks] await self._test_run_in_background(blocking_function) @@ -535,7 +538,9 @@ async def test_run_in_background_with_coroutine(self) -> None: async def testfunc() -> None: self._check_test_key("foo") # Ignore linter error since we are creating a `Clock` for testing purposes. - d = defer.ensureDeferred(Clock(reactor, server_name="test_server").sleep(0)) # type: ignore[multiple-internal-clocks] + d = defer.ensureDeferred( + Clock(reactor, server_name="test_server").sleep(Duration(seconds=0)) # type: ignore[multiple-internal-clocks] + ) self.assertIs(current_context(), SENTINEL_CONTEXT) await d self._check_test_key("foo") @@ -579,7 +584,7 @@ async def competing_callback() -> None: self._check_test_key("foo") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("foo") @@ -591,7 +596,7 @@ async def competing_callback() -> None: with LoggingContext(name="foo", server_name="test_server"): run_coroutine_in_background(competing_callback()) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") self.assertTrue( diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py index e33ded8a7f3..2c8e21b3398 100644 --- a/tests/util/test_task_scheduler.py +++ b/tests/util/test_task_scheduler.py @@ -26,6 +26,7 @@ from synapse.server import HomeServer from synapse.types import JsonMapping, ScheduledTask, TaskStatus from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.task_scheduler import TaskScheduler from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -68,7 +69,7 @@ def test_schedule_task(self) -> None: # The timestamp being 30s after now the task should been executed # after the first scheduling loop is run - self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL.as_secs()) task = self.get_success(self.task_scheduler.get_task(task_id)) assert task is not None @@ -87,7 +88,7 @@ async def _sleeping_task( self, task: ScheduledTask ) -> tuple[TaskStatus, JsonMapping | None, str | None]: # Sleep for a second - await self.hs.get_clock().sleep(1) + await self.hs.get_clock().sleep(Duration(seconds=1)) return TaskStatus.COMPLETE, None, None def test_schedule_lot_of_tasks(self) -> None: @@ -187,7 +188,7 @@ def test_schedule_resumable_task(self) -> None: # Simulate a synapse restart by emptying the list of running tasks self.task_scheduler._running_tasks = set() - self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL.as_secs())) task = self.get_success(self.task_scheduler.get_task(task_id)) assert task is not None