Skip to content

Commit 7c37672

Browse files
committed
add removing option to also remove unused crawls if doing a full sync, disable by default
1 parent 0d414f7 commit 7c37672

File tree

2 files changed

+76
-10
lines changed

2 files changed

+76
-10
lines changed

src/indexer.ts

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { initRedisWaitForSuccess } from "./util/redis.js";
99
import { AsyncIterReader } from "warcio";
1010
import { RedisDedupeIndex } from "./util/state.js";
1111
import { basename } from "node:path";
12+
import { sleep } from "./util/timing.js";
1213

1314
export type DedupeIndexEntry = {
1415
name: string;
@@ -42,6 +43,13 @@ export class CrawlIndexer {
4243
type: "string",
4344
required: false,
4445
},
46+
47+
removing: {
48+
describe: "If set, also remove unsued crawls/hashes from index",
49+
type: "boolean",
50+
required: false,
51+
default: false,
52+
},
4553
})
4654
.parseSync();
4755
}
@@ -62,16 +70,24 @@ export class CrawlIndexer {
6270

6371
for await (const entry of this.iterWACZ({
6472
url: params.sourceUrl,
65-
name: params.sourceCrawlId || params.sourceUrl,
73+
name: basename(params.sourceUrl),
74+
crawlId: params.sourceCrawlId,
6675
})) {
6776
await dedupeIndex.queueImportSource(entry.name, JSON.stringify(entry));
77+
if (params.removing && entry.crawlId) {
78+
await dedupeIndex.markNotRemoved(entry.crawlId);
79+
}
6880
}
6981

7082
let count = 0;
83+
let total = 0;
7184
let res;
7285

7386
while ((res = await dedupeIndex.nextQueuedImportSource())) {
74-
const { name, entry, total } = res;
87+
const { name, entry, remaining } = res;
88+
if (!total) {
89+
total = remaining;
90+
}
7591
const { url, crawlId, size, hash } = JSON.parse(
7692
entry,
7793
) as DedupeIndexEntry;
@@ -107,7 +123,15 @@ export class CrawlIndexer {
107123
await dedupeIndex.markImportSourceDone(name, crawlIdReal);
108124
}
109125

126+
if (params.removing) {
127+
const removeset = await dedupeIndex.getRemoveSet();
128+
if (removeset.size > 0) {
129+
await dedupeIndex.removeCrawlIds(removeset);
130+
}
131+
}
132+
110133
logger.info("Done!");
134+
await sleep(30);
111135
await dedupeIndex.markImportFinishedTS();
112136
process.exit(ExitCodes.Success);
113137
}
@@ -180,7 +204,6 @@ export class CrawlIndexer {
180204
}
181205

182206
async *iterWACZ(entry: DedupeIndexEntry): AsyncIterable<DedupeIndexEntry> {
183-
const { name } = entry;
184207
let { url } = entry;
185208
let path = url;
186209

@@ -191,8 +214,7 @@ export class CrawlIndexer {
191214
}
192215

193216
if (path.endsWith(".wacz")) {
194-
console.log({ ...entry, name: basename(name || url) });
195-
yield { ...entry, name: basename(name || url) };
217+
yield entry;
196218
} else if (path.endsWith(".json")) {
197219
if (!url.startsWith("http://") && !url.startsWith("https://")) {
198220
const blob = await openAsBlob(url);

src/util/state.ts

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -263,12 +263,12 @@ export class RedisDedupeIndex {
263263
for await (const hashes of this.dedupeRedis.hscanStream(
264264
`h:${this.crawlId}`,
265265
)) {
266-
let value = false;
266+
let isValue = false;
267267
for (const hash of hashes) {
268-
if (!value) {
268+
if (!isValue) {
269269
await this.dedupeRedis.hsetnx(DUPE_ALL_HASH_KEY, hash, this.crawlId);
270270
}
271-
value = !value;
271+
isValue = !isValue;
272272
}
273273
}
274274

@@ -371,14 +371,58 @@ export class RedisDedupeIndex {
371371

372372
await this.dedupeRedis.lrem(this.pendingQ, 1, res);
373373
const { name } = JSON.parse(res);
374-
const total = (await this.dedupeRedis.llen(this.sourceQ)) + 1;
374+
const remaining = (await this.dedupeRedis.llen(this.sourceQ)) + 1;
375375
await this.dedupeRedis.setex(this.pendingPrefix + name, "1", 300);
376-
return { name, entry: res, total };
376+
return { name, entry: res, remaining };
377377
}
378378

379379
async markImportFinishedTS() {
380380
await this.dedupeRedis.set("last_update_ts", new Date().toISOString());
381381
}
382+
383+
// REMOVE ON IMPORT
384+
385+
async markNotRemoved(crawlId: string) {
386+
await this.dedupeRedis.sadd("noremove", crawlId);
387+
}
388+
389+
async getRemoveSet() {
390+
const removeSet = await this.dedupeRedis.sdiff(DUPE_ALL_CRAWLS, "noremove");
391+
await this.dedupeRedis.del("noremove");
392+
return new Set<string>(removeSet);
393+
}
394+
395+
async removeCrawlIds(toRemove: Set<string>) {
396+
for await (const hashes of this.dedupeRedis.hscanStream(
397+
DUPE_ALL_HASH_KEY,
398+
)) {
399+
let isValue = false;
400+
let key = "";
401+
for (const hash of hashes) {
402+
if (!isValue) {
403+
key = hash;
404+
}
405+
if (key && isValue && toRemove.has(hash)) {
406+
await this.dedupeRedis.hdel(DUPE_ALL_HASH_KEY, key);
407+
}
408+
isValue = !isValue;
409+
}
410+
}
411+
412+
for (const crawlId of toRemove) {
413+
const allWACZ = await this.dedupeRedis.lrange(`c:${crawlId}:wacz`, 0, -1);
414+
for (const waczdata of allWACZ) {
415+
try {
416+
const { filename } = JSON.parse(waczdata);
417+
await this.dedupeRedis.srem(this.sourceDone, filename);
418+
} catch (e) {
419+
// ignore
420+
}
421+
}
422+
await this.dedupeRedis.del(`h:${crawlId}`, `c:${crawlId}:wacz`);
423+
await this.dedupeRedis.srem(DUPE_ALL_CRAWLS, crawlId);
424+
}
425+
}
382426
}
383427

384428
// ============================================================================

0 commit comments

Comments
 (0)