Skip to content
Merged
1 change: 1 addition & 0 deletions changelog.d/19229.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move towards using a dedicated `Duration` type.
9 changes: 7 additions & 2 deletions rust/src/rendezvous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use mime::Mime;
use pyo3::{
exceptions::PyValueError,
pyclass, pymethods,
types::{PyAnyMethods, PyModule, PyModuleMethods},
types::{IntoPyDict, PyAnyMethods, PyModule, PyModuleMethods},
Bound, IntoPyObject, Py, PyAny, PyResult, Python,
};
use ulid::Ulid;
Expand Down Expand Up @@ -132,6 +132,11 @@ impl RendezvousHandler {
.unwrap_infallible()
.unbind();

let duration_module = py.import("synapse.util.duration")?;

let kwargs = [("milliseconds", eviction_interval)].into_py_dict(py)?;
let eviction_duration = duration_module.call_method("Duration", (), Some(&kwargs))?;

// Construct a Python object so that we can get a reference to the
// evict method and schedule it to run.
let self_ = Py::new(
Expand All @@ -149,7 +154,7 @@ impl RendezvousHandler {
let evict = self_.getattr(py, "_evict")?;
homeserver.call_method0("get_clock")?.call_method(
"looping_call",
(evict, eviction_interval),
(evict, eviction_duration),
None,
)?;

Expand Down
3 changes: 2 additions & 1 deletion synapse/api/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from synapse.storage.databases.main import DataStore
from synapse.types import Requester
from synapse.util.clock import Clock
from synapse.util.duration import Duration
from synapse.util.wheel_timer import WheelTimer

if TYPE_CHECKING:
Expand Down Expand Up @@ -100,7 +101,7 @@ def __init__(
# and doesn't affect correctness.
self._timer: WheelTimer[Hashable] = WheelTimer()

self.clock.looping_call(self._prune_message_counts, 15 * 1000)
self.clock.looping_call(self._prune_message_counts, Duration(seconds=15))

def _get_key(self, requester: Requester | None, key: Hashable | None) -> Hashable:
"""Use the requester's MXID as a fallback key if no key is provided."""
Expand Down
12 changes: 6 additions & 6 deletions synapse/app/phone_stats_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ def performance_stats_init() -> None:
# table will decrease
clock.looping_call(
hs.get_datastores().main.generate_user_daily_visits,
Duration(minutes=5).as_millis(),
Duration(minutes=5),
)

# monthly active user limiting functionality
clock.looping_call(
hs.get_datastores().main.reap_monthly_active_users,
Duration(hours=1).as_millis(),
Duration(hours=1),
)
hs.get_datastores().main.reap_monthly_active_users()

Expand Down Expand Up @@ -263,29 +263,29 @@ async def _generate_monthly_active_users() -> None:

if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
generate_monthly_active_users()
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
clock.looping_call(generate_monthly_active_users, Duration(minutes=5))
# End of monthly active user settings

if hs.config.metrics.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(
phone_stats_home,
PHONE_HOME_INTERVAL.as_millis(),
PHONE_HOME_INTERVAL,
hs,
stats,
)

# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
clock.call_later(
0,
Duration(seconds=0),
performance_stats_init,
)

# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME.as_secs(),
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME,
phone_stats_home,
hs,
stats,
Expand Down
3 changes: 2 additions & 1 deletion synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
from synapse.storage.databases.main import DataStore
from synapse.types import DeviceListUpdates, JsonMapping
from synapse.util.clock import Clock, DelayedCallWrapper
from synapse.util.duration import Duration

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -504,7 +505,7 @@ def __init__(
self.scheduled_recovery: DelayedCallWrapper | None = None

def recover(self) -> None:
delay = 2**self.backoff_counter
delay = Duration(seconds=2**self.backoff_counter)
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
self.scheduled_recovery = self.clock.call_later(
delay,
Expand Down
3 changes: 2 additions & 1 deletion synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.duration import Duration
from synapse.util.retryutils import NotRetryingDestination

if TYPE_CHECKING:
Expand Down Expand Up @@ -132,7 +133,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.pdu_destination_tried: dict[str, dict[str, int]] = {}
self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
self._clock.looping_call(self._clear_tried_cache, Duration(minutes=1))
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()

Expand Down
7 changes: 5 additions & 2 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.duration import Duration
from synapse.util.stringutils import parse_server_name

if TYPE_CHECKING:
Expand Down Expand Up @@ -226,7 +227,7 @@ async def _handle_old_staged_events(self) -> None:
)

# We pause a bit so that we don't start handling all rooms at once.
await self._clock.sleep(random.uniform(0, 0.1))
await self._clock.sleep(Duration(seconds=random.uniform(0, 0.1)))

async def on_backfill_request(
self, origin: str, room_id: str, versions: list[str], limit: int
Expand Down Expand Up @@ -301,7 +302,9 @@ async def on_incoming_transaction(
# Start a periodic check for old staged events. This is to handle
# the case where locks time out, e.g. if another process gets killed
# without dropping its locks.
self._clock.looping_call(self._handle_old_staged_events, 60 * 1000)
self._clock.looping_call(
self._handle_old_staged_events, Duration(minutes=1)
)

# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
Expand Down
3 changes: 2 additions & 1 deletion synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.util.duration import Duration
from synapse.util.metrics import Measure

from .units import Edu
Expand Down Expand Up @@ -137,7 +138,7 @@ def register(queue_name: QueueNames, queue: Sized) -> None:
assert isinstance(queue, Sized)
register(queue_name, queue=queue)

self.clock.looping_call(self._clear_queue, 30 * 1000)
self.clock.looping_call(self._clear_queue, Duration(seconds=30))

def shutdown(self) -> None:
"""Stops this federation sender instance from sending further transactions."""
Expand Down
13 changes: 7 additions & 6 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
get_domain_from_id,
)
from synapse.util.clock import Clock
from synapse.util.duration import Duration
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter

Expand Down Expand Up @@ -218,12 +219,12 @@
# Please note that rate limiting still applies, so while the loop is
# executed every X seconds the destinations may not be woken up because
# they are being rate limited following previous attempt failures.
WAKEUP_RETRY_PERIOD_SEC = 60
WAKEUP_RETRY_PERIOD = Duration(minutes=1)

# Time (in s) to wait in between waking up each destination, i.e. one destination
# Time to wait in between waking up each destination, i.e. one destination
# will be woken up every <x> seconds until we have woken every destination
# has outstanding catch-up.
WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5
WAKEUP_INTERVAL_BETWEEN_DESTINATIONS = Duration(seconds=5)


class AbstractFederationSender(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -379,7 +380,7 @@ async def _handle(self) -> None:

queue.attempt_new_transaction()

await self.clock.sleep(current_sleep_seconds)
await self.clock.sleep(Duration(seconds=current_sleep_seconds))

if not self.queue:
break
Expand Down Expand Up @@ -468,7 +469,7 @@ def __init__(self, hs: "HomeServer"):
# Regularly wake up destinations that have outstanding PDUs to be caught up
self.clock.looping_call_now(
self.hs.run_as_background_process,
WAKEUP_RETRY_PERIOD_SEC * 1000.0,
WAKEUP_RETRY_PERIOD,
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)
Expand Down Expand Up @@ -1161,4 +1162,4 @@ async def _wake_destinations_needing_catchup(self) -> None:
last_processed,
)
self.wake_destination(destination)
await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC)
await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS)
3 changes: 2 additions & 1 deletion synapse/handlers/account_validity.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from synapse.types import UserID
from synapse.util import stringutils
from synapse.util.async_helpers import delay_cancellation
from synapse.util.duration import Duration

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -73,7 +74,7 @@ def __init__(self, hs: "HomeServer"):

# Check the renewal emails to send and send them every 30min.
if hs.config.worker.run_background_tasks:
self.clock.looping_call(self._send_renewal_emails, 30 * 60 * 1000)
self.clock.looping_call(self._send_renewal_emails, Duration(minutes=30))

async def is_user_expired(self, user_id: str) -> bool:
"""Checks if a user has expired against third-party modules.
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from synapse.types import JsonDict, Requester, StrCollection, UserID
from synapse.util import stringutils as stringutils
from synapse.util.async_helpers import delay_cancellation, maybe_awaitable
from synapse.util.duration import Duration
from synapse.util.msisdn import phone_number_to_msisdn
from synapse.util.stringutils import base62_encode
from synapse.util.threepids import canonicalise_email
Expand Down Expand Up @@ -242,7 +243,7 @@ def __init__(self, hs: "HomeServer"):
if hs.config.worker.run_background_tasks:
self._clock.looping_call(
run_as_background_process,
5 * 60 * 1000,
Duration(minutes=5),
"expire_old_sessions",
self.server_name,
self._expire_old_sessions,
Expand Down
9 changes: 5 additions & 4 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
UserID,
create_requester,
)
from synapse.util.duration import Duration
from synapse.util.events import generate_fake_event_id
from synapse.util.metrics import Measure
from synapse.util.sentinel import Sentinel
Expand Down Expand Up @@ -92,7 +93,7 @@ async def _schedule_db_events() -> None:
# Kick off again (without blocking) to catch any missed notifications
# that may have fired before the callback was added.
self._clock.call_later(
0,
Duration(seconds=0),
self.notify_new_event,
)

Expand Down Expand Up @@ -501,17 +502,17 @@ def _schedule_next_at_or_none(self, next_send_ts: Timestamp | None) -> None:

def _schedule_next_at(self, next_send_ts: Timestamp) -> None:
delay = next_send_ts - self._get_current_ts()
delay_sec = delay / 1000 if delay > 0 else 0
delay_duration = Duration(milliseconds=max(delay, 0))

if self._next_delayed_event_call is None:
self._next_delayed_event_call = self._clock.call_later(
delay_sec,
delay_duration,
self.hs.run_as_background_process,
"_send_on_timeout",
self._send_on_timeout,
)
else:
self._next_delayed_event_call.reset(delay_sec)
self._next_delayed_event_call.reset(delay_duration.as_secs())

async def get_all_for_user(self, requester: Requester) -> list[JsonDict]:
"""Return all pending delayed events requested by the given user."""
Expand Down
13 changes: 6 additions & 7 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.cancellation import cancellable
from synapse.util.duration import Duration
from synapse.util.metrics import measure_func
from synapse.util.retryutils import (
NotRetryingDestination,
Expand All @@ -85,7 +86,7 @@

DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages"
MAX_DEVICE_DISPLAY_NAME_LEN = 100
DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000
DELETE_STALE_DEVICES_INTERVAL = Duration(days=1)


def _check_device_name_length(name: str | None) -> None:
Expand Down Expand Up @@ -186,7 +187,7 @@ def __init__(self, hs: "HomeServer"):
):
self.clock.looping_call(
self.hs.run_as_background_process,
DELETE_STALE_DEVICES_INTERVAL_MS,
DELETE_STALE_DEVICES_INTERVAL,
desc="delete_stale_devices",
func=self._delete_stale_devices,
)
Expand Down Expand Up @@ -915,7 +916,7 @@ async def handle_new_device_update(self) -> None:
)

DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000
DEVICE_MSGS_DELETE_SLEEP_MS = 100
DEVICE_MSGS_DELETE_SLEEP = Duration(milliseconds=100)

async def _delete_device_messages(
self,
Expand All @@ -941,9 +942,7 @@ async def _delete_device_messages(
if from_stream_id is None:
return TaskStatus.COMPLETE, None, None

await self.clock.sleep(
DeviceWriterHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0
)
await self.clock.sleep(DeviceWriterHandler.DEVICE_MSGS_DELETE_SLEEP)


class DeviceWriterHandler(DeviceHandler):
Expand Down Expand Up @@ -1469,7 +1468,7 @@ def __init__(self, hs: "HomeServer", device_handler: DeviceWriterHandler):
self._resync_retry_lock = Lock()
self.clock.looping_call(
self.hs.run_as_background_process,
30 * 1000,
Duration(seconds=30),
func=self._maybe_retry_device_resync,
desc="_maybe_retry_device_resync",
)
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.cancellation import cancellable
from synapse.util.duration import Duration
from synapse.util.json import json_decoder
from synapse.util.retryutils import (
NotRetryingDestination,
Expand Down Expand Up @@ -1634,7 +1635,7 @@ async def _delete_old_one_time_keys_task(
# matrix.org has about 15M users in the e2e_one_time_keys_json table
# (comprising 20M devices). We want this to take about a week, so we need
# to do about one batch of 100 users every 4 seconds.
await self.clock.sleep(4)
await self.clock.sleep(Duration(seconds=4))


def _check_cross_signing_key(
Expand Down
5 changes: 4 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
from synapse.types import JsonDict, StrCollection, get_domain_from_id
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer
from synapse.util.duration import Duration
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server

Expand Down Expand Up @@ -1972,7 +1973,9 @@ async def _sync_partial_state_room(
logger.warning(
"%s; waiting for %d ms...", e, e.retry_after_ms
)
await self.clock.sleep(e.retry_after_ms / 1000)
await self.clock.sleep(
Duration(milliseconds=e.retry_after_ms)
)

# Success, no need to try the rest of the destinations.
break
Expand Down
Loading
Loading