Skip to content
Merged
1 change: 1 addition & 0 deletions changelog.d/19229.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move towards using a dedicated `Duration` type.
56 changes: 56 additions & 0 deletions rust/src/duration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* This file is licensed under the Affero General Public License (AGPL) version 3.
*
* Copyright (C) 2025 Element Creations, Ltd
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* See the GNU Affero General Public License for more details:
* <https://www.gnu.org/licenses/agpl-3.0.html>.
*/

use once_cell::sync::OnceCell;
use pyo3::{
types::{IntoPyDict, PyAnyMethods},
Bound, BoundObject, IntoPyObject, Py, PyAny, PyErr, PyResult, Python,
};

/// A reference to the `synapse.util.duration` module.
static DURATION: OnceCell<Py<PyAny>> = OnceCell::new();

/// Access to the `synapse.util.duration` module.
fn duration_module(py: Python<'_>) -> PyResult<&Bound<'_, PyAny>> {
Ok(DURATION
.get_or_try_init(|| py.import("synapse.util.duration").map(Into::into))?
.bind(py))
}

/// Mirrors the `synapse.util.duration.Duration` Python class.
pub struct SynapseDuration {
microseconds: u64,
}

impl SynapseDuration {
/// For now we only need to create durations from milliseconds.
pub fn from_milliseconds(milliseconds: u64) -> Self {
Self {
microseconds: milliseconds * 1_000,
}
}
}

impl<'py> IntoPyObject<'py> for &SynapseDuration {
type Target = PyAny;
type Output = Bound<'py, Self::Target>;
type Error = PyErr;

fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
let duration_module = duration_module(py)?;
let kwargs = [("microseconds", self.microseconds)].into_py_dict(py)?;
let duration_instance = duration_module.call_method("Duration", (), Some(&kwargs))?;
Ok(duration_instance.into_bound())
}
}
1 change: 1 addition & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use pyo3::prelude::*;
use pyo3_log::ResetHandle;

pub mod acl;
pub mod duration;
pub mod errors;
pub mod events;
pub mod http;
Expand Down
5 changes: 4 additions & 1 deletion rust/src/rendezvous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use ulid::Ulid;

use self::session::Session;
use crate::{
duration::SynapseDuration,
errors::{NotFoundError, SynapseError},
http::{http_request_from_twisted, http_response_to_twisted, HeaderMapPyExt},
UnwrapInfallible,
Expand Down Expand Up @@ -132,6 +133,8 @@ impl RendezvousHandler {
.unwrap_infallible()
.unbind();

let eviction_duration = SynapseDuration::from_milliseconds(eviction_interval);

// Construct a Python object so that we can get a reference to the
// evict method and schedule it to run.
let self_ = Py::new(
Expand All @@ -149,7 +152,7 @@ impl RendezvousHandler {
let evict = self_.getattr(py, "_evict")?;
homeserver.call_method0("get_clock")?.call_method(
"looping_call",
(evict, eviction_interval),
(evict, &eviction_duration),
None,
)?;

Expand Down
3 changes: 2 additions & 1 deletion synapse/api/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from synapse.storage.databases.main import DataStore
from synapse.types import Requester
from synapse.util.clock import Clock
from synapse.util.duration import Duration
from synapse.util.wheel_timer import WheelTimer

if TYPE_CHECKING:
Expand Down Expand Up @@ -100,7 +101,7 @@ def __init__(
# and doesn't affect correctness.
self._timer: WheelTimer[Hashable] = WheelTimer()

self.clock.looping_call(self._prune_message_counts, 15 * 1000)
self.clock.looping_call(self._prune_message_counts, Duration(seconds=15))

def _get_key(self, requester: Requester | None, key: Hashable | None) -> Hashable:
"""Use the requester's MXID as a fallback key if no key is provided."""
Expand Down
12 changes: 6 additions & 6 deletions synapse/app/phone_stats_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ def performance_stats_init() -> None:
# table will decrease
clock.looping_call(
hs.get_datastores().main.generate_user_daily_visits,
Duration(minutes=5).as_millis(),
Duration(minutes=5),
)

# monthly active user limiting functionality
clock.looping_call(
hs.get_datastores().main.reap_monthly_active_users,
Duration(hours=1).as_millis(),
Duration(hours=1),
)
hs.get_datastores().main.reap_monthly_active_users()

Expand Down Expand Up @@ -263,29 +263,29 @@ async def _generate_monthly_active_users() -> None:

if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
generate_monthly_active_users()
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
clock.looping_call(generate_monthly_active_users, Duration(minutes=5))
# End of monthly active user settings

if hs.config.metrics.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(
phone_stats_home,
PHONE_HOME_INTERVAL.as_millis(),
PHONE_HOME_INTERVAL,
hs,
stats,
)

# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
clock.call_later(
0,
Duration(seconds=0),
performance_stats_init,
)

# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME.as_secs(),
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME,
phone_stats_home,
hs,
stats,
Expand Down
3 changes: 2 additions & 1 deletion synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
from synapse.storage.databases.main import DataStore
from synapse.types import DeviceListUpdates, JsonMapping
from synapse.util.clock import Clock, DelayedCallWrapper
from synapse.util.duration import Duration

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -504,7 +505,7 @@ def __init__(
self.scheduled_recovery: DelayedCallWrapper | None = None

def recover(self) -> None:
delay = 2**self.backoff_counter
delay = Duration(seconds=2**self.backoff_counter)
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
self.scheduled_recovery = self.clock.call_later(
delay,
Expand Down
3 changes: 2 additions & 1 deletion synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.duration import Duration
from synapse.util.retryutils import NotRetryingDestination

if TYPE_CHECKING:
Expand Down Expand Up @@ -132,7 +133,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.pdu_destination_tried: dict[str, dict[str, int]] = {}
self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
self._clock.looping_call(self._clear_tried_cache, Duration(minutes=1))
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()

Expand Down
7 changes: 5 additions & 2 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.duration import Duration
from synapse.util.stringutils import parse_server_name

if TYPE_CHECKING:
Expand Down Expand Up @@ -226,7 +227,7 @@ async def _handle_old_staged_events(self) -> None:
)

# We pause a bit so that we don't start handling all rooms at once.
await self._clock.sleep(random.uniform(0, 0.1))
await self._clock.sleep(Duration(seconds=random.uniform(0, 0.1)))

async def on_backfill_request(
self, origin: str, room_id: str, versions: list[str], limit: int
Expand Down Expand Up @@ -301,7 +302,9 @@ async def on_incoming_transaction(
# Start a periodic check for old staged events. This is to handle
# the case where locks time out, e.g. if another process gets killed
# without dropping its locks.
self._clock.looping_call(self._handle_old_staged_events, 60 * 1000)
self._clock.looping_call(
self._handle_old_staged_events, Duration(minutes=1)
)

# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
Expand Down
3 changes: 2 additions & 1 deletion synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.util.duration import Duration
from synapse.util.metrics import Measure

from .units import Edu
Expand Down Expand Up @@ -137,7 +138,7 @@ def register(queue_name: QueueNames, queue: Sized) -> None:
assert isinstance(queue, Sized)
register(queue_name, queue=queue)

self.clock.looping_call(self._clear_queue, 30 * 1000)
self.clock.looping_call(self._clear_queue, Duration(seconds=30))

def shutdown(self) -> None:
"""Stops this federation sender instance from sending further transactions."""
Expand Down
13 changes: 7 additions & 6 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
get_domain_from_id,
)
from synapse.util.clock import Clock
from synapse.util.duration import Duration
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter

Expand Down Expand Up @@ -218,12 +219,12 @@
# Please note that rate limiting still applies, so while the loop is
# executed every X seconds the destinations may not be woken up because
# they are being rate limited following previous attempt failures.
WAKEUP_RETRY_PERIOD_SEC = 60
WAKEUP_RETRY_PERIOD = Duration(minutes=1)

# Time (in s) to wait in between waking up each destination, i.e. one destination
# Time to wait in between waking up each destination, i.e. one destination
# will be woken up every <x> seconds until we have woken every destination
# has outstanding catch-up.
WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5
WAKEUP_INTERVAL_BETWEEN_DESTINATIONS = Duration(seconds=5)


class AbstractFederationSender(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -379,7 +380,7 @@ async def _handle(self) -> None:

queue.attempt_new_transaction()

await self.clock.sleep(current_sleep_seconds)
await self.clock.sleep(Duration(seconds=current_sleep_seconds))

if not self.queue:
break
Expand Down Expand Up @@ -468,7 +469,7 @@ def __init__(self, hs: "HomeServer"):
# Regularly wake up destinations that have outstanding PDUs to be caught up
self.clock.looping_call_now(
self.hs.run_as_background_process,
WAKEUP_RETRY_PERIOD_SEC * 1000.0,
WAKEUP_RETRY_PERIOD,
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)
Expand Down Expand Up @@ -1161,4 +1162,4 @@ async def _wake_destinations_needing_catchup(self) -> None:
last_processed,
)
self.wake_destination(destination)
await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC)
await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS)
3 changes: 2 additions & 1 deletion synapse/handlers/account_validity.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from synapse.types import UserID
from synapse.util import stringutils
from synapse.util.async_helpers import delay_cancellation
from synapse.util.duration import Duration

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -73,7 +74,7 @@ def __init__(self, hs: "HomeServer"):

# Check the renewal emails to send and send them every 30min.
if hs.config.worker.run_background_tasks:
self.clock.looping_call(self._send_renewal_emails, 30 * 60 * 1000)
self.clock.looping_call(self._send_renewal_emails, Duration(minutes=30))

async def is_user_expired(self, user_id: str) -> bool:
"""Checks if a user has expired against third-party modules.
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from synapse.types import JsonDict, Requester, StrCollection, UserID
from synapse.util import stringutils as stringutils
from synapse.util.async_helpers import delay_cancellation, maybe_awaitable
from synapse.util.duration import Duration
from synapse.util.msisdn import phone_number_to_msisdn
from synapse.util.stringutils import base62_encode
from synapse.util.threepids import canonicalise_email
Expand Down Expand Up @@ -242,7 +243,7 @@ def __init__(self, hs: "HomeServer"):
if hs.config.worker.run_background_tasks:
self._clock.looping_call(
run_as_background_process,
5 * 60 * 1000,
Duration(minutes=5),
"expire_old_sessions",
self.server_name,
self._expire_old_sessions,
Expand Down
9 changes: 5 additions & 4 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
UserID,
create_requester,
)
from synapse.util.duration import Duration
from synapse.util.events import generate_fake_event_id
from synapse.util.metrics import Measure
from synapse.util.sentinel import Sentinel
Expand Down Expand Up @@ -92,7 +93,7 @@ async def _schedule_db_events() -> None:
# Kick off again (without blocking) to catch any missed notifications
# that may have fired before the callback was added.
self._clock.call_later(
0,
Duration(seconds=0),
self.notify_new_event,
)

Expand Down Expand Up @@ -501,17 +502,17 @@ def _schedule_next_at_or_none(self, next_send_ts: Timestamp | None) -> None:

def _schedule_next_at(self, next_send_ts: Timestamp) -> None:
delay = next_send_ts - self._get_current_ts()
delay_sec = delay / 1000 if delay > 0 else 0
delay_duration = Duration(milliseconds=max(delay, 0))

if self._next_delayed_event_call is None:
self._next_delayed_event_call = self._clock.call_later(
delay_sec,
delay_duration,
self.hs.run_as_background_process,
"_send_on_timeout",
self._send_on_timeout,
)
else:
self._next_delayed_event_call.reset(delay_sec)
self._next_delayed_event_call.reset(delay_duration.as_secs())

async def get_all_for_user(self, requester: Requester) -> list[JsonDict]:
"""Return all pending delayed events requested by the given user."""
Expand Down
Loading
Loading