Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/handlers/src/admin/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ where
"/users/{id}/unlock",
post_with(self::users::unlock, self::users::unlock_doc),
)
.api_route(
"/users/{id}/kill-sessions",
post_with(self::users::kill_sessions, self::users::kill_sessions_doc),
)
.api_route(
"/user-emails",
get_with(self::user_emails::list, self::user_emails::list_doc)
Expand Down
229 changes: 229 additions & 0 deletions crates/handlers/src/admin/v1/users/kill_sessions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2025 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
// Please see LICENSE files in the repository root for full details.

use aide::{NoApi, OperationIo, transform::TransformOperation};
use axum::{Json, response::IntoResponse};
use hyper::StatusCode;
use mas_axum_utils::record_error;
use mas_data_model::BoxRng;
use mas_storage::{
compat::CompatSessionFilter,
oauth2::OAuth2SessionFilter,
queue::{QueueJobRepositoryExt as _, SyncDevicesJob},
user::BrowserSessionFilter,
};
use tracing::{error, info};
use ulid::Ulid;

use crate::{
admin::{
call_context::CallContext,
model::{Resource, User},
params::UlidPathParam,
response::{ErrorResponse, SingleResponse},
},
impl_from_error_for_route,
};

#[derive(Debug, thiserror::Error, OperationIo)]
#[aide(output_with = "Json<ErrorResponse>")]
pub enum RouteError {
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),

#[error("User ID {0} not found")]
NotFound(Ulid),
}

impl_from_error_for_route!(mas_storage::RepositoryError);

impl IntoResponse for RouteError {
fn into_response(self) -> axum::response::Response {
let error = ErrorResponse::from_error(&self);
let sentry_event_id = record_error!(self, Self::Internal(_));
let status = match self {
Self::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::NotFound(_) => StatusCode::NOT_FOUND,
};
(status, sentry_event_id, Json(error)).into_response()
}
}

pub fn doc(operation: TransformOperation) -> TransformOperation {
operation
.id("KillSessions")
.summary("Kill all sessions (compatibility, oauth2, user sessions)")
.description(
"Calling this endpoint will end all the compatibility, oauth2 and user sessions, preventing any further use. A job will be scheduled to sync the user's devices with the homeserver.",
)
.tag("user")
.response_with::<200, Json<SingleResponse<User>>, _>(|t| {
// In the samples, the second user is the one which can request admin
let [_alice, bob, ..] = User::samples();
let id = bob.id();
let response = SingleResponse::new(bob, format!("/api/admin/v1/users/{id}/kill-sessions"));
t.description("All sessions were killed").example(response)
})
.response_with::<404, RouteError, _>(|t| {
let response = ErrorResponse::from_error(&RouteError::NotFound(Ulid::nil()));
t.description("User was not found")
.example(response)
})
}

#[tracing::instrument(name = "handler.admin.v1.users.kill_sessions", skip_all)]
pub async fn handler(
CallContext {
mut repo, clock, ..
}: CallContext,
NoApi(mut rng): NoApi<BoxRng>,
id: UlidPathParam,
) -> Result<Json<SingleResponse<User>>, RouteError> {
let id = *id;
let user = repo
.user()
.lookup(id)
.await?
.ok_or(RouteError::NotFound(id))?;

let filter = CompatSessionFilter::new().for_user(&user).active_only();
let compat_session_affected = repo.compat_session().finish_bulk(&clock, filter).await?;

let filter = OAuth2SessionFilter::new().for_user(&user).active_only();
let oauth2_session_affected = repo.oauth2_session().finish_bulk(&clock, filter).await?;

let filter = BrowserSessionFilter::new().for_user(&user).active_only();
let browser_session_affected = repo.browser_session().finish_bulk(&clock, filter).await?;
// Schedule a job to sync the devices of the user with the homeserver
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.await?;

repo.save().await?;

info!("Ended {compat_session_affected} active compatibility sessions");
info!("Ended {oauth2_session_affected} active OAuth 2.0 sessions");
info!("Ended {browser_session_affected} active browser sessions");

Ok(Json(SingleResponse::new(
User::from(user),
format!("/api/admin/v1/users/{id}/kill-sessions"),
)))
}

#[cfg(test)]
mod tests {
use chrono::Duration;
use hyper::{Request, StatusCode};
use mas_data_model::{Clock as _, Device};
use sqlx::PgPool;

use crate::test_utils::{RequestBuilderExt, ResponseExt, TestState, setup};

#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_kill_sessions(pool: PgPool) {
setup();
let mut state = TestState::from_pool(pool).await.unwrap();
let token = state.token_with_scope("urn:mas:admin").await;
let mut rng = state.rng();

// Provision a user and a compat session
let mut repo = state.repository().await.unwrap();
let user = repo
.user()
.add(&mut rng, &state.clock, "alice".to_owned())
.await
.unwrap();
let device = Device::generate(&mut rng);
let session = repo
.compat_session()
.add(&mut rng, &state.clock, &user, device, None, false, None)
.await
.unwrap();
repo.save().await.unwrap();

let request = Request::post(format!("/api/admin/v1/users/{}/kill-sessions", &user.id))
.bearer(&token)
.empty();
let response = state.request(request).await;
response.assert_status(StatusCode::OK);
let body: serde_json::Value = response.json();

assert_eq!(body["data"]["id"], format!("{}", &user.id));
// The finished_at timestamp should be the same as the current time
let mut repo = state.repository().await.unwrap();
let expected = repo
.compat_session()
.lookup(session.id)
.await
.unwrap()
.unwrap();
assert_eq!(expected.finished_at().unwrap(), state.clock.now());
}

#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_kill_already_finished_session(pool: PgPool) {
setup();
let mut state = TestState::from_pool(pool).await.unwrap();
let token = state.token_with_scope("urn:mas:admin").await;
let mut rng = state.rng();

// Provision a user and a compat session
let mut repo = state.repository().await.unwrap();
let user = repo
.user()
.add(&mut rng, &state.clock, "alice".to_owned())
.await
.unwrap();
let device = Device::generate(&mut rng);
let session = repo
.compat_session()
.add(&mut rng, &state.clock, &user, device, None, false, None)
.await
.unwrap();

// Finish the session first
let session = repo
.compat_session()
.finish(&state.clock, session)
.await
.unwrap();

repo.save().await.unwrap();

// Move the clock forward
state.clock.advance(Duration::try_minutes(1).unwrap());

let request = Request::post(format!("/api/admin/v1/users/{}/kill-sessions", &user.id))
.bearer(&token)
.empty();
let response = state.request(request).await;
response.assert_status(StatusCode::OK);
let body: serde_json::Value = response.json();

assert_eq!(body["data"]["id"], format!("{}", &user.id));
let mut repo = state.repository().await.unwrap();
let expected = repo
.compat_session()
.lookup(session.id)
.await
.unwrap()
.unwrap();
assert_ne!(expected.finished_at().unwrap(), state.clock.now());
}

#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_kill_sessions_on_unknown_users(pool: PgPool) {
setup();
let mut state = TestState::from_pool(pool).await.unwrap();
let token = state.token_with_scope("urn:mas:admin").await;

let request = Request::post("/api/admin/v1/users/01040G2081040G2081040G2081/kill-sessions")
.bearer(&token)
.empty();
let response = state.request(request).await;
response.assert_status(StatusCode::NOT_FOUND);
}
}
2 changes: 2 additions & 0 deletions crates/handlers/src/admin/v1/users/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod add;
mod by_username;
mod deactivate;
mod get;
mod kill_sessions;
mod list;
mod lock;
mod reactivate;
Expand All @@ -20,6 +21,7 @@ pub use self::{
by_username::{doc as by_username_doc, handler as by_username},
deactivate::{doc as deactivate_doc, handler as deactivate},
get::{doc as get_doc, handler as get},
kill_sessions::{doc as kill_sessions_doc, handler as kill_sessions},
list::{doc as list_doc, handler as list},
lock::{doc as lock_doc, handler as lock},
reactivate::{doc as reactivate_doc, handler as reactivate},
Expand Down
71 changes: 71 additions & 0 deletions docs/api/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -2726,6 +2726,77 @@
}
}
},
"/api/admin/v1/users/{id}/kill-sessions": {
"post": {
"tags": [
"user"
],
"summary": "Kill all sessions (compatibility, oauth2, user sessions)",
"description": "Calling this endpoint will end all the compatibility, oauth2 and user sessions, preventing any further use. A job will be scheduled to sync the user's devices with the homeserver.",
"operationId": "KillSessions",
"parameters": [
{
"in": "path",
"name": "id",
"required": true,
"schema": {
"title": "The ID of the resource",
"$ref": "#/components/schemas/ULID"
},
"style": "simple"
}
],
"responses": {
"200": {
"description": "All sessions were killed",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/SingleResponse_for_User"
},
"example": {
"data": {
"type": "user",
"id": "02081040G2081040G2081040G2",
"attributes": {
"username": "bob",
"created_at": "1970-01-01T00:00:00Z",
"locked_at": null,
"deactivated_at": null,
"admin": true,
"legacy_guest": false
},
"links": {
"self": "/api/admin/v1/users/02081040G2081040G2081040G2"
}
},
"links": {
"self": "/api/admin/v1/users/02081040G2081040G2081040G2/kill-sessions"
}
}
}
}
},
"404": {
"description": "User was not found",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
},
"example": {
"errors": [
{
"title": "User ID 00000000000000000000000000 not found"
}
]
}
}
}
}
}
}
},
"/api/admin/v1/user-emails": {
"get": {
"tags": [
Expand Down
Loading