Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ async def delete_failed_crawl_files(self, crawl_id: str, oid: UUID):
"""Delete crawl files for failed crawl"""
crawl = await self.get_base_crawl(crawl_id)
org = await self.orgs.get_org_by_id(oid)
await self._delete_crawl_files(crawl, org)
deleted_file_size = await self._delete_crawl_files(crawl, org)
await self.crawls.find_one_and_update(
{"_id": crawl_id, "oid": oid},
{
Expand All @@ -441,6 +441,7 @@ async def delete_failed_crawl_files(self, crawl_id: str, oid: UUID):
}
},
)
await self.orgs.inc_org_bytes_stored(oid, -deleted_file_size, "crawl")

async def delete_all_crawl_qa_files(self, crawl_id: str, org: Organization):
"""Delete files for all qa runs in a crawl"""
Expand Down
98 changes: 85 additions & 13 deletions backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import urllib.parse
from datetime import datetime
from uuid import UUID
import asyncio

from typing import (
Annotated,
Expand Down Expand Up @@ -79,6 +80,8 @@
MatchCrawlQueueResponse,
CrawlLogLine,
TagsResponse,
TYPE_AUTO_PAUSED_STATES,
UserRole,
)


Expand All @@ -93,7 +96,12 @@ class CrawlOps(BaseCrawlOps):

crawl_manager: CrawlManager

def __init__(self, crawl_manager: CrawlManager, log_ops: CrawlLogOps, *args):
def __init__(
self,
crawl_manager: CrawlManager,
log_ops: CrawlLogOps,
*args,
):
super().__init__(*args)
self.crawl_manager = crawl_manager
self.log_ops = log_ops
Expand Down Expand Up @@ -357,12 +365,12 @@ async def get_active_crawls(self, oid: UUID, limit: int) -> list[str]:
res_list = await res.to_list()
return [res["_id"] for res in res_list]

async def get_active_crawls_size(self, oid: UUID) -> int:
"""get size of all active (running, waiting, paused) crawls"""
async def get_active_crawls_pending_size(self, oid: UUID) -> int:
"""get pending size of all active (running, waiting, paused) crawls"""
cursor = self.crawls.aggregate(
[
{"$match": {"state": {"$in": RUNNING_AND_WAITING_STATES}, "oid": oid}},
{"$group": {"_id": None, "totalSum": {"$sum": "$stats.size"}}},
{"$group": {"_id": None, "totalSum": {"$sum": "$pendingSize"}}},
]
)
results = await cursor.to_list(length=1)
Expand Down Expand Up @@ -647,14 +655,16 @@ async def update_crawl_state_if_allowed(
return res is not None

async def update_running_crawl_stats(
self, crawl_id: str, is_qa: bool, stats: CrawlStats
self, crawl_id: str, is_qa: bool, stats: CrawlStats, pending_size: int
) -> bool:
"""update running crawl stats"""
prefix = "" if not is_qa else "qa."
query = {"_id": crawl_id, "type": "crawl", f"{prefix}state": "running"}
res = await self.crawls.find_one_and_update(
query, {"$set": {f"{prefix}stats": stats.dict()}}
)
update: dict[str, dict | int] = {f"{prefix}stats": stats.dict()}
if not is_qa:
update["pendingSize"] = pending_size

res = await self.crawls.find_one_and_update(query, {"$set": update})
return res is not None

async def inc_crawl_exec_time(
Expand Down Expand Up @@ -812,7 +822,11 @@ async def get_crawl_stats(
return crawls_data

async def pause_crawl(
self, crawl_id: str, org: Organization, pause: bool
self,
crawl_id: str,
org: Organization,
pause: bool,
paused_at: Optional[datetime] = None,
) -> Dict[str, bool]:
"""pause or resume a crawl temporarily"""
crawl = await self.get_base_crawl(crawl_id, org)
Expand All @@ -821,10 +835,13 @@ async def pause_crawl(

result = None

if pause:
if pause and not paused_at:
paused_at = dt_now()
else:
paused_at = None

if not pause:
# If unpausing, unset autoPausedEmailsSent so that we will send
# emails again if quota is reached
await self.set_auto_paused_emails_sent(crawl_id, org, False)

try:
result = await self.crawl_manager.pause_resume_crawl(
Expand Down Expand Up @@ -1195,6 +1212,57 @@ async def get_crawl_logs(
qa_run_id=qa_run_id,
)

async def notify_org_admins_of_auto_paused_crawl(
self,
paused_reason: TYPE_AUTO_PAUSED_STATES,
crawl_id: str,
cid: UUID,
org: Organization,
):
"""Send email to all org admins about automatically paused crawl"""
if await self.get_auto_paused_emails_sent(crawl_id, org):
return

users = await self.orgs.get_users_for_org(org, UserRole.OWNER)
workflow = await self.crawl_configs.get_crawl_config_out(cid, org)

await asyncio.gather(
*[
self.user_manager.email.send_crawl_auto_paused(
user.name,
user.email,
paused_reason,
workflow.lastCrawlPausedExpiry,
cid,
org,
)
for user in users
]
)

await self.set_auto_paused_emails_sent(crawl_id, org)

async def set_auto_paused_emails_sent(
self, crawl_id: str, org: Organization, emails_sent: bool = True
):
"""Set if auto-paused emails already sent"""
await self.crawls.find_one_and_update(
{"_id": crawl_id, "oid": org.id, "type": "crawl"},
{"$set": {"autoPausedEmailsSent": emails_sent}},
)

async def get_auto_paused_emails_sent(
self, crawl_id: str, org: Organization
) -> bool:
"""Return whether auto-paused emails already sent for crawl"""
res = await self.crawls.find_one(
{"_id": crawl_id, "oid": org.id, "type": "crawl"},
projection=["autoPausedEmailsSent"],
)
if res:
return res.get("autoPausedEmailsSent", False)
return False


# ============================================================================
async def recompute_crawl_file_count_and_size(crawls, crawl_id: str):
Expand All @@ -1217,7 +1285,11 @@ async def recompute_crawl_file_count_and_size(crawls, crawl_id: str):
# ============================================================================
# pylint: disable=too-many-arguments, too-many-locals, too-many-statements
def init_crawls_api(
crawl_manager: CrawlManager, crawl_log_ops: CrawlLogOps, app, user_dep, *args
crawl_manager: CrawlManager,
crawl_log_ops: CrawlLogOps,
app,
user_dep,
*args,
):
"""API for crawl management, including crawl done callback"""
# pylint: disable=invalid-name, duplicate-code
Expand Down
29 changes: 29 additions & 0 deletions backend/btrixcloud/emailsender.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Organization,
InvitePending,
Subscription,
TYPE_AUTO_PAUSED_STATES,
)
from .utils import is_bool, get_origin

Expand Down Expand Up @@ -250,3 +251,31 @@ async def send_subscription_trial_ending_soon(
behavior_on_trial_end=behavior_on_trial_end,
support_email=self.support_email,
)

async def send_crawl_auto_paused(
self,
user_name: str,
receiver_email: str,
paused_reason: TYPE_AUTO_PAUSED_STATES,
paused_expiry: datetime,
cid: UUID,
org: Organization,
headers=None,
):
"""Send email indicating crawl was paused due to quota or disabled crawling"""

origin = get_origin(headers)
org_url = f"{origin}/orgs/{org.slug}"
workflow_url = f"{org_url}/workflows/{cid}/latest"

await self._send_encrypted(
receiver_email,
"crawlAutoPaused",
org_name=org.name,
user_name=user_name,
paused_reason=paused_reason,
paused_expiry=paused_expiry.isoformat(),
org_url=org_url,
workflow_url=workflow_url,
support_email=self.support_email,
)
35 changes: 29 additions & 6 deletions backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,30 @@ class UserOrgInfoOut(BaseModel):
]
RUNNING_STATES = get_args(TYPE_RUNNING_STATES)

TYPE_WAITING_STATES = Literal[
"starting", "waiting_capacity", "waiting_org_limit", "paused"
TYPE_MANUALLY_PAUSED_STATES = Literal["paused"]

TYPE_AUTO_PAUSED_STATES = Literal[
"paused_storage_quota_reached",
"paused_time_quota_reached",
"paused_org_readonly",
]
AUTO_PAUSED_STATES = get_args(TYPE_AUTO_PAUSED_STATES)

TYPE_PAUSED_STATES = Literal[
TYPE_MANUALLY_PAUSED_STATES,
TYPE_AUTO_PAUSED_STATES,
]
PAUSED_STATES = get_args(TYPE_PAUSED_STATES)

TYPE_WAITING_NOT_PAUSED_STATES = Literal[
"starting",
"waiting_capacity",
"waiting_org_limit",
]
WAITING_STATES = get_args(TYPE_WAITING_STATES)
WAITING_NOT_PAUSED_STATES = get_args(TYPE_WAITING_NOT_PAUSED_STATES)

TYPE_WAITING_STATES = Literal[TYPE_PAUSED_STATES, TYPE_WAITING_NOT_PAUSED_STATES]
WAITING_STATES = [*PAUSED_STATES, *WAITING_NOT_PAUSED_STATES]

TYPE_FAILED_STATES = Literal[
"canceled",
Expand All @@ -260,7 +280,7 @@ class UserOrgInfoOut(BaseModel):
"stopped_org_readonly",
]
SUCCESSFUL_STATES = get_args(TYPE_SUCCESSFUL_STATES)
SUCCESSFUL_AND_PAUSED_STATES = ["paused", *SUCCESSFUL_STATES]
SUCCESSFUL_AND_PAUSED_STATES = [*PAUSED_STATES, *SUCCESSFUL_STATES]

TYPE_RUNNING_AND_WAITING_STATES = Literal[TYPE_WAITING_STATES, TYPE_RUNNING_STATES]
RUNNING_AND_WAITING_STATES = [*WAITING_STATES, *RUNNING_STATES]
Expand All @@ -284,8 +304,6 @@ class CrawlStats(BaseModel):
done: int = 0
size: int = 0

profile_update: Optional[str] = ""


# ============================================================================

Expand Down Expand Up @@ -887,6 +905,7 @@ class CrawlOut(BaseMongoModel):

fileSize: int = 0
fileCount: int = 0
pendingSize: int = 0

tags: Optional[List[str]] = []

Expand Down Expand Up @@ -1071,6 +1090,10 @@ class Crawl(BaseCrawl, CrawlConfigCore):
qa: Optional[QARun] = None
qaFinished: Optional[Dict[str, QARun]] = {}

pendingSize: int = 0

autoPausedEmailsSent: bool = False


# ============================================================================
class CrawlCompleteIn(BaseModel):
Expand Down
Loading
Loading