Skip to content

Commit d5041ca

Browse files
ikreymertw4l
andcommitted
Dedup Backend Initial Implementation (#2868)
Fixes #2867 The backend implementation involves: Operator - A new CollIndex CRD type, btrix-crds updated to 0.2.0 - Operator that manages the new CRD type, creating a new Redis instance when the index should exist (uses redis_dedupe_memory and redis_dedupe_storage chart values) - dedupe_importer_channel can configure crawler channel for index imports - Operator starts the crawler in 'indexer' mode Workflows & Crawls: - Workflows have a new 'dedupeCollId' field for dedupe while crawling The `dedupeCollId` must also be a collection that the crawl is auto-added to. - There is a new waiting state: `waiting_for_dedupe_index` that is entered if a crawl is starting, but index is not yet ready. - Each crawl has bi-directional links for crawls that it requires for dedupe via `requiresCrawls` and other crawls for which this crawl is required via `requiredByCrawls`. - autoAddCollections automatically updated to always include `dedupeCollId` collection. Collection: - Collection has a new `hasDedupeIndex` field - Items added/removed to/from collection result in marking CollIndex object for updates by updating collItemsUpdatedAt timestamp to trigger a reindex - CollIndex object deleted on collection delete For indexing, dependent on version of crawler from webrecorder/browsertrix-crawler#884 that supports indexing mode. --------- Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
1 parent 97dd148 commit d5041ca

33 files changed

+864
-94
lines changed

backend/btrixcloud/colls.py

Lines changed: 99 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
UserFilePreparer,
4949
MIN_UPLOAD_PART_SIZE,
5050
PublicCollOut,
51+
ResourcesOnly,
5152
)
5253
from .utils import (
5354
dt_now,
@@ -57,6 +58,8 @@
5758
case_insensitive_collation,
5859
)
5960

61+
from .crawlmanager import CrawlManager
62+
6063
if TYPE_CHECKING:
6164
from .orgs import OrgOps
6265
from .storages import StorageOps
@@ -81,8 +84,16 @@ class CollectionOps:
8184
event_webhook_ops: EventWebhookOps
8285
crawl_ops: CrawlOps
8386
page_ops: PageOps
87+
crawl_manager: CrawlManager
8488

85-
def __init__(self, mdb, storage_ops, orgs, event_webhook_ops):
89+
def __init__(
90+
self,
91+
mdb,
92+
orgs: OrgOps,
93+
storage_ops: StorageOps,
94+
crawl_manager: CrawlManager,
95+
event_webhook_ops: EventWebhookOps,
96+
):
8697
self.collections = mdb["collections"]
8798
self.crawls = mdb["crawls"]
8899
self.crawl_configs = mdb["crawl_configs"]
@@ -91,6 +102,7 @@ def __init__(self, mdb, storage_ops, orgs, event_webhook_ops):
91102

92103
self.orgs = orgs
93104
self.storage_ops = storage_ops
105+
self.crawl_manager = crawl_manager
94106
self.event_webhook_ops = event_webhook_ops
95107

96108
def set_crawl_ops(self, ops):
@@ -140,11 +152,15 @@ async def add_collection(self, oid: UUID, coll_in: CollIn):
140152
access=coll_in.access,
141153
defaultThumbnailName=coll_in.defaultThumbnailName,
142154
allowPublicDownload=coll_in.allowPublicDownload,
155+
hasDedupeIndex=coll_in.hasDedupeIndex,
143156
)
144157
try:
145158
await self.collections.insert_one(coll.to_dict())
146159
org = await self.orgs.get_org_by_id(oid)
147160
await self.clear_org_previous_slugs_matching_slug(slug, org)
161+
# create collection index
162+
if coll.hasDedupeIndex:
163+
await self.crawl_manager.create_coll_index(coll)
148164

149165
if crawl_ids:
150166
await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org)
@@ -193,22 +209,33 @@ async def update_collection(
193209
db_update["$push"] = {"previousSlugs": previous_slug}
194210

195211
try:
196-
result = await self.collections.find_one_and_update(
212+
prev_result = await self.collections.find_one_and_update(
197213
{"_id": coll_id, "oid": org.id},
198214
db_update,
199-
return_document=pymongo.ReturnDocument.AFTER,
215+
return_document=pymongo.ReturnDocument.BEFORE,
200216
)
201217
except pymongo.errors.DuplicateKeyError as err:
202218
# pylint: disable=raise-missing-from
203219
field = get_duplicate_key_error_field(err)
204220
raise HTTPException(status_code=400, detail=f"collection_{field}_taken")
205221

206-
if not result:
222+
if not prev_result:
207223
raise HTTPException(status_code=404, detail="collection_not_found")
208224

209225
if slug_update:
210226
await self.clear_org_previous_slugs_matching_slug(slug_update, org)
211227

228+
# if dedupe index is true, but was false
229+
if update.hasDedupeIndex and not prev_result.get("hasDedupeIndex"):
230+
# get latest coll, create index
231+
coll = await self.get_collection(coll_id, org.id)
232+
await self.crawl_manager.create_coll_index(coll)
233+
234+
# if dedupe is false, but was true
235+
if update.hasDedupeIndex is False and prev_result.get("hasDedupeIndex"):
236+
# delete index -- may need extra restrictions
237+
await self.crawl_manager.delete_coll_index(coll_id)
238+
212239
return {"updated": True}
213240

214241
async def clear_org_previous_slugs_matching_slug(
@@ -220,6 +247,16 @@ async def clear_org_previous_slugs_matching_slug(
220247
{"$pull": {"previousSlugs": slug}},
221248
)
222249

250+
async def get_coll_dedupe_index(self, coll_id: UUID) -> bool:
251+
"""return true/false if collection has dedupe index, or raise"""
252+
result = await self.collections.find_one(
253+
{"_id": coll_id}, projection=["hasDedupeIndex"]
254+
)
255+
if not result:
256+
raise HTTPException(status_code=404, detail="collection_not_found")
257+
258+
return result["hasDedupeIndex"] is True
259+
223260
async def add_crawls_to_collection(
224261
self,
225262
coll_id: UUID,
@@ -228,8 +265,6 @@ async def add_crawls_to_collection(
228265
headers: Optional[dict] = None,
229266
) -> CollOut:
230267
"""Add crawls to collection"""
231-
await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org)
232-
233268
modified = dt_now()
234269
result = await self.collections.find_one_and_update(
235270
{"_id": coll_id},
@@ -239,8 +274,11 @@ async def add_crawls_to_collection(
239274
if not result:
240275
raise HTTPException(status_code=404, detail="collection_not_found")
241276

277+
# do this after checking if collection exists
278+
await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org)
279+
242280
await self.update_collection_counts_and_tags(coll_id)
243-
await self.update_collection_dates(coll_id, org.id)
281+
await self.update_collection_dates(coll_id, org.id, update_index=True)
244282

245283
asyncio.create_task(
246284
self.event_webhook_ops.create_added_to_collection_notification(
@@ -269,7 +307,7 @@ async def remove_crawls_from_collection(
269307
raise HTTPException(status_code=404, detail="collection_not_found")
270308

271309
await self.update_collection_counts_and_tags(coll_id)
272-
await self.update_collection_dates(coll_id, org.id)
310+
await self.update_collection_dates(coll_id, org.id, update_index=True)
273311

274312
asyncio.create_task(
275313
self.event_webhook_ops.create_removed_from_collection_notification(
@@ -293,6 +331,24 @@ async def get_collection_raw(
293331

294332
return result
295333

334+
async def enable_dedupe_index(self, coll_id: UUID):
335+
"""enable dedupe index if it doesn't exist yet"""
336+
result = await self.collections.find_one_and_update(
337+
{"_id": coll_id, "hasDedupeIndex": {"$ne": True}},
338+
{"$set": {"hasDedupeIndex": True}},
339+
return_document=pymongo.ReturnDocument.AFTER,
340+
)
341+
342+
# not changed, nothing to do
343+
if not result:
344+
return False
345+
346+
coll = Collection.from_dict(result)
347+
348+
await self.crawl_manager.create_coll_index(coll)
349+
350+
return True
351+
296352
async def get_collection_raw_by_slug(
297353
self,
298354
coll_slug: str,
@@ -395,6 +451,16 @@ async def get_collection_out(
395451

396452
return CollOut.from_dict(result)
397453

454+
async def get_internal_replay_list(self, coll_id: UUID, oid: UUID) -> ResourcesOnly:
455+
"""get list of internally resolved signed WACZ files"""
456+
org = await self.orgs.get_org_by_id(oid)
457+
resources, _, _ = await self.get_collection_crawl_resources(coll_id, org)
458+
459+
for file_ in resources:
460+
file_.path = self.storage_ops.resolve_internal_access_path(file_.path)
461+
462+
return ResourcesOnly(resources=resources)
463+
398464
async def get_public_collection_out(
399465
self,
400466
coll_id: UUID,
@@ -638,6 +704,9 @@ async def delete_collection(self, coll_id: UUID, org: Organization):
638704
if coll.thumbnail:
639705
await self.delete_thumbnail(coll_id, org)
640706

707+
if coll.hasDedupeIndex:
708+
await self.crawl_manager.delete_coll_index(coll.id)
709+
641710
result = await self.collections.delete_one({"_id": coll_id, "oid": org.id})
642711
if result.deleted_count < 1:
643712
raise HTTPException(status_code=404, detail="collection_not_found")
@@ -739,7 +808,9 @@ async def update_collection_counts_and_tags(self, collection_id: UUID):
739808
},
740809
)
741810

742-
async def update_collection_dates(self, coll_id: UUID, oid: UUID):
811+
async def update_collection_dates(
812+
self, coll_id: UUID, oid: UUID, update_index=False
813+
):
743814
"""Update collection earliest and latest dates from page timestamps"""
744815
# pylint: disable=too-many-locals
745816
coll = await self.get_collection(coll_id, oid)
@@ -748,6 +819,10 @@ async def update_collection_dates(self, coll_id: UUID, oid: UUID):
748819
earliest_ts = None
749820
latest_ts = None
750821

822+
# update_index is set, update dedupe index if it exists
823+
if update_index and coll.hasDedupeIndex:
824+
await self.crawl_manager.update_coll_index(coll_id)
825+
751826
match_query = {
752827
"oid": coll.oid,
753828
"crawl_id": {"$in": crawl_ids},
@@ -782,13 +857,16 @@ async def update_collection_dates(self, coll_id: UUID, oid: UUID):
782857

783858
async def update_crawl_collections(self, crawl_id: str, oid: UUID):
784859
"""Update counts, dates, and modified for all collections in crawl"""
860+
# accessing directly to handle both crawls and uploads
785861
crawl = await self.crawls.find_one({"_id": crawl_id})
786-
crawl_coll_ids = crawl.get("collectionIds")
862+
crawl_coll_ids = crawl.get("collectionIds") or []
787863
modified = dt_now()
788864

789865
for coll_id in crawl_coll_ids:
790866
await self.update_collection_counts_and_tags(coll_id)
791-
await self.update_collection_dates(coll_id, oid)
867+
await self.update_collection_dates(
868+
coll_id, oid, crawl.get("dedupeCollId") != coll_id
869+
)
792870
await self.collections.find_one_and_update(
793871
{"_id": coll_id},
794872
{"$set": {"modified": modified}},
@@ -999,12 +1077,20 @@ async def calculate_thumbnail_storage(self, oid: UUID) -> int:
9991077
# ============================================================================
10001078
# pylint: disable=too-many-locals
10011079
def init_collections_api(
1002-
app, mdb, orgs, storage_ops, event_webhook_ops, user_dep
1080+
app,
1081+
mdb,
1082+
orgs: OrgOps,
1083+
storage_ops: StorageOps,
1084+
crawl_manager: CrawlManager,
1085+
event_webhook_ops: EventWebhookOps,
1086+
user_dep,
10031087
) -> CollectionOps:
10041088
"""init collections api"""
10051089
# pylint: disable=invalid-name, unused-argument, too-many-arguments
10061090

1007-
colls: CollectionOps = CollectionOps(mdb, storage_ops, orgs, event_webhook_ops)
1091+
colls: CollectionOps = CollectionOps(
1092+
mdb, orgs, storage_ops, crawl_manager, event_webhook_ops
1093+
)
10081094

10091095
org_crawl_dep = orgs.org_crawl_dep
10101096
org_viewer_dep = orgs.org_viewer_dep

backend/btrixcloud/crawlconfigs.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,14 @@ async def add_crawl_config(
333333

334334
first_seed = seeds[0].url
335335

336+
# the dedupe collection id must also be in auto add collections
337+
if config_in.dedupeCollId:
338+
if config_in.autoAddCollections is None:
339+
config_in.autoAddCollections = []
340+
341+
if config_in.dedupeCollId not in config_in.autoAddCollections:
342+
config_in.autoAddCollections.append(config_in.dedupeCollId)
343+
336344
now = dt_now()
337345
crawlconfig = CrawlConfig(
338346
id=uuid4(),
@@ -360,6 +368,7 @@ async def add_crawl_config(
360368
firstSeed=first_seed,
361369
seedCount=seed_count,
362370
shareable=config_in.shareable,
371+
dedupeCollId=config_in.dedupeCollId,
363372
)
364373

365374
if config_in.runNow:
@@ -376,6 +385,9 @@ async def add_crawl_config(
376385
storage_quota_reached = False
377386
exec_mins_quota_reached = False
378387

388+
if config_in.dedupeCollId:
389+
await self.coll_ops.enable_dedupe_index(config_in.dedupeCollId)
390+
379391
if config_in.runNow:
380392
try:
381393
crawl_id = await self.run_now_internal(crawlconfig, org, user)
@@ -629,6 +641,26 @@ async def update_crawl_config(
629641
update.tags is not None
630642
and ",".join(orig_crawl_config.tags) != ",".join(update.tags)
631643
)
644+
645+
metadata_changed = metadata_changed or (
646+
update.dedupeCollId is not None
647+
and update.dedupeCollId != orig_crawl_config.dedupeCollId
648+
)
649+
650+
if isinstance(update.dedupeCollId, UUID):
651+
dedupe_coll_id = update.dedupeCollId
652+
elif update.dedupeCollId == "":
653+
dedupe_coll_id = None
654+
else:
655+
dedupe_coll_id = orig_crawl_config.dedupeCollId
656+
657+
if (
658+
dedupe_coll_id
659+
and update.autoAddCollections is not None
660+
and dedupe_coll_id not in update.autoAddCollections
661+
):
662+
update.autoAddCollections.append(dedupe_coll_id)
663+
632664
metadata_changed = metadata_changed or (
633665
update.autoAddCollections is not None
634666
and sorted(orig_crawl_config.autoAddCollections)
@@ -656,7 +688,7 @@ async def update_crawl_config(
656688
query["modifiedByName"] = user.name
657689
query["modified"] = dt_now()
658690

659-
# if empty str, just clear the profile
691+
# profile - if empty str, just clear the profile
660692
if update.profileid == "":
661693
query["profileid"] = None
662694
# else, ensure its a valid profile
@@ -672,6 +704,14 @@ async def update_crawl_config(
672704
self.assert_can_org_use_proxy(org, update.proxyId)
673705
query["proxyId"] = update.proxyId
674706

707+
# dedupe - if empty dedupeCollId, clear the coll id
708+
if update.dedupeCollId == "":
709+
query["dedupeCollId"] = None
710+
# else, enable dedupe on collection
711+
if isinstance(update.dedupeCollId, UUID):
712+
query["dedupeCollId"] = update.dedupeCollId
713+
await self.coll_ops.enable_dedupe_index(update.dedupeCollId)
714+
675715
if update.config is not None:
676716
query["config"] = update.config.dict()
677717

@@ -686,10 +726,15 @@ async def update_crawl_config(
686726
query["seedCount"] = len(update.config.seeds)
687727
query["seedFileId"] = None
688728

729+
update_query: dict[str, Any] = {"$set": query, "$inc": {"rev": 1}}
730+
# only add here if not setting autoAddCollections
731+
if dedupe_coll_id and "autoAddCollections" not in query:
732+
update_query["$addToSet"] = {"autoAddCollections": dedupe_coll_id}
733+
689734
# update in db
690735
result = await self.crawl_configs.find_one_and_update(
691736
{"_id": cid, "inactive": {"$ne": True}},
692-
{"$set": query, "$inc": {"rev": 1}},
737+
update_query,
693738
return_document=pymongo.ReturnDocument.AFTER,
694739
)
695740

@@ -1185,6 +1230,10 @@ async def remove_collection_from_all_configs(
11851230
{"$pull": {"autoAddCollections": coll_id}},
11861231
)
11871232

1233+
await self.crawl_configs.update_many(
1234+
{"oid": org.id, "dedupeCollId": coll_id}, {"$set": {"dedupeCollId": None}}
1235+
)
1236+
11881237
async def get_crawl_config_tags(self, org):
11891238
"""get distinct tags from all crawl configs for this org"""
11901239
return await self.crawl_configs.distinct("tags", {"oid": org.id})

0 commit comments

Comments
 (0)