Skip to content

Commit 1b2ca23

Browse files
committed
cleanup pass:
- support dedupe without requiring wacz, no crawl dependency tracking stored - add dedupe test w/o wacz - cleanup dedupe related naming
1 parent f090360 commit 1b2ca23

File tree

4 files changed

+126
-60
lines changed

4 files changed

+126
-60
lines changed

src/crawler.ts

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1694,7 +1694,10 @@ self.__bx_behaviors.selectMainBehavior();
16941694
}
16951695

16961696
if (this.params.generateWACZ && (this.storage || this.deduping)) {
1697-
await this.crawlState.setWACZFilename();
1697+
const filename = await this.crawlState.setWACZFilename();
1698+
if (this.deduping) {
1699+
await this.crawlState.addSourceWACZForDedupe(filename);
1700+
}
16981701
}
16991702

17001703
if (POST_CRAWL_STATES.includes(initState)) {
@@ -1896,30 +1899,34 @@ self.__bx_behaviors.selectMainBehavior();
18961899
const wacz = await this.generateWACZ();
18971900

18981901
if (wacz) {
1899-
if (this.deduping) {
1900-
await this.crawlState.setStatus("post-crawl");
1901-
await this.crawlState.updateDedupeSource(wacz);
1902+
await this.crawlState.clearWACZFilename();
19021903

1903-
await this.crawlState.clearDupeFileRef();
1904+
if (this.deduping) {
1905+
await this.crawlState.updateDedupeSourceWACZ(wacz);
19041906
}
19051907

1906-
await this.crawlState.clearWACZFilename();
1907-
}
1908-
1909-
if (wacz && this.storage && this.uploadAndDeleteLocal) {
1910-
await this.crawlState.setArchiveSize(0);
1908+
if (this.storage && this.uploadAndDeleteLocal) {
1909+
await this.crawlState.setArchiveSize(0);
19111910

1912-
logger.info(
1913-
`Uploaded WACZ, deleting local data to free up space: ${this.collDir}`,
1914-
);
1915-
try {
1916-
fs.rmSync(this.collDir, { recursive: true, force: true });
1917-
} catch (e) {
1918-
logger.warn(`Unable to clear ${this.collDir} before exit`, e);
1911+
logger.info(
1912+
`Uploaded WACZ, deleting local data to free up space: ${this.collDir}`,
1913+
);
1914+
try {
1915+
fs.rmSync(this.collDir, { recursive: true, force: true });
1916+
} catch (e) {
1917+
logger.warn(`Unable to clear ${this.collDir} before exit`, e);
1918+
}
19191919
}
19201920
}
19211921
}
19221922

1923+
if (this.deduping) {
1924+
//await this.crawlState.clearDupeCrawlRef();
1925+
1926+
// commit crawl data to main index
1927+
await this.crawlState.commitDedupeDone();
1928+
}
1929+
19231930
if (this.finalExit && generateFiles && this.params.saveProfile) {
19241931
const resource = await this.browser.saveProfile(
19251932
this.params.saveProfile,
@@ -1995,7 +2002,7 @@ self.__bx_behaviors.selectMainBehavior();
19952002

19962003
await this.closeLog();
19972004

1998-
const requires = await this.crawlState.getDupeDependentSources();
2005+
const requires = await this.crawlState.getDupeDependentCrawls();
19992006

20002007
const waczOpts: WACZInitOpts = {
20012008
input: warcFileList.map((x) => path.join(this.archivesDir, x)),

src/util/recorder.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -841,7 +841,7 @@ export class Recorder extends EventEmitter {
841841
requestId,
842842
errorReason,
843843
});
844-
await this.crawlState.addDupeCrawlRef(crawlId, index);
844+
await this.crawlState.addDupeCrawlDependency(crawlId, index);
845845
return true;
846846
}
847847
}
@@ -1701,7 +1701,7 @@ export class Recorder extends EventEmitter {
17011701
origUrl,
17021702
date,
17031703
));
1704-
await this.crawlState.addDupeCrawlRef(crawlId, index);
1704+
await this.crawlState.addDupeCrawlDependency(crawlId, index);
17051705
isDupe = true;
17061706
} else {
17071707
// no dupe, continue

src/util/state.ts

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ export type DedupeSourceEntry = {
210210
export class RedisDedupeIndex {
211211
dedupeRedis: Redis;
212212
crawlId: string;
213-
dedupeKeyIndex = -1;
213+
dedupeKeyIndex = 0;
214214
dedupeCurrFilename = "";
215215

216216
sourceDone = "src:d";
@@ -224,37 +224,32 @@ export class RedisDedupeIndex {
224224
this.crawlId = crawlId;
225225
}
226226

227-
// DEDUPE SOURCE
227+
// DEDUPE SOURCE WACZ (to track dependencies)
228228

229-
async addSourceForDedupe(filename: string) {
230-
//const count = await this.dedupeRedis.incr(`c:${key}:count`) - 1;
229+
async addSourceWACZForDedupe(filename: string) {
230+
const crawlId = this.crawlId;
231231
const count =
232232
(await this.dedupeRedis.rpush(
233-
`c:${this.crawlId}:wacz`,
233+
`c:${crawlId}:wacz`,
234234
JSON.stringify({ filename }),
235235
)) - 1;
236236
this.dedupeCurrFilename = filename;
237237
this.dedupeKeyIndex = count;
238238
}
239239

240-
async updateDedupeSource(wacz: WACZ) {
241-
if (this.dedupeKeyIndex < 0) {
242-
return;
243-
}
244-
240+
async updateDedupeSourceWACZ(wacz: WACZ) {
245241
const value: DedupeSourceEntry = {
246242
filename: wacz.getLocalFilename() || this.dedupeCurrFilename,
247243
hash: wacz.getHash(),
248244
size: wacz.getSize(),
249245
};
250246

247+
const crawlId = this.crawlId;
251248
await this.dedupeRedis.lset(
252-
`c:${this.crawlId}:wacz`,
249+
`c:${crawlId}:wacz`,
253250
this.dedupeKeyIndex,
254251
JSON.stringify(value),
255252
);
256-
257-
await this.commitDedupeDone();
258253
}
259254

260255
// COMMIT DEDUPE TO SHARED INDEX
@@ -325,9 +320,12 @@ export class RedisDedupeIndex {
325320
await this.dedupeRedis.lpush(this.sourceQ, data);
326321
}
327322

328-
async addImportedSourceForDedupe(key: string, entry: DedupeSourceEntry) {
323+
async addImportedSourceForDedupe(crawlId: string, entry: DedupeSourceEntry) {
329324
return (
330-
(await this.dedupeRedis.rpush(`c:${key}:wacz`, JSON.stringify(entry))) - 1
325+
(await this.dedupeRedis.rpush(
326+
`c:${crawlId}:wacz`,
327+
JSON.stringify(entry),
328+
)) - 1
331329
);
332330
}
333331

@@ -766,7 +764,6 @@ return inx;
766764
"state",
767765
);
768766
}
769-
await this.addSourceForDedupe(this.waczFilename!);
770767
return this.waczFilename!;
771768
}
772769

@@ -1396,29 +1393,32 @@ return inx;
13961393
);
13971394
}
13981395

1399-
// DEPENDENT CRAWLS FOR DEDUPE
1400-
async addDupeCrawlRef(crawlId: string, index: string) {
1396+
// DEPENDENT CRAWLS FOR DEDUPE (requires WACZ)
1397+
async addDupeCrawlDependency(crawlId: string, index: string) {
14011398
await this.redis.sadd(`${this.uid}:duperef`, crawlId + " " + index);
14021399
await this.redis.sadd(`${this.crawlId}:reqCrawls`, crawlId);
14031400
}
14041401

1405-
async clearDupeFileRef() {
1406-
await this.redis.del(`${this.uid}:duperef`);
1407-
}
1402+
// async clearDupeCrawlDependency() {
1403+
// await this.redis.del(`${this.uid}:duperef`);
1404+
// }
14081405

1409-
async getDupeDependentSources() {
1406+
// Requires crawling with WACZ to match dependencies
1407+
async getDupeDependentCrawls() {
14101408
const dependRefs = await this.redis.smembers(`${this.uid}:duperef`);
14111409
const crawlIds = [];
14121410
for (const value of dependRefs) {
14131411
const [crawlId, index] = value.split(" ");
1414-
const source = await this.dedupeRedis.lindex(
1415-
`c:${crawlId}:wacz`,
1416-
Number(index),
1417-
);
1418-
if (crawlId && crawlId !== this.crawlId && source) {
1419-
const entry = JSON.parse(source);
1420-
entry.crawlId = crawlId;
1421-
crawlIds.push(entry);
1412+
if (crawlId && crawlId !== this.crawlId) {
1413+
const source = await this.dedupeRedis.lindex(
1414+
`c:${crawlId}:wacz`,
1415+
Number(index),
1416+
);
1417+
if (source) {
1418+
const entry = JSON.parse(source);
1419+
entry.crawlId = crawlId;
1420+
crawlIds.push(entry);
1421+
}
14221422
}
14231423
}
14241424
return crawlIds;

tests/dedupe-basic.test.js

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ afterAll(async () => {
2828
execSync("docker network rm dedupe");
2929
});
3030

31-
function runCrawl(name, db="0") {
31+
function runCrawl(name, {db = 0, limit = 4, wacz = true} = {}) {
3232
fs.rmSync(`./test-crawls/collections/${name}`, { recursive: true, force: true });
3333

34-
const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe -e CRAWL_ID=${name} webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit 4 --exclude community --collection ${name} --redisDedupeUrl redis://dedupe-redis:6379/${db} --generateWACZ`);
34+
const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe -e CRAWL_ID=${name} webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit ${limit} --exclude community --collection ${name} --redisDedupeUrl redis://dedupe-redis:6379/${db} ${wacz ? "--generateWACZ" : ""}`);
3535

3636
return new Promise((resolve) => {
3737
crawler.on("exit", (code) => {
@@ -54,6 +54,16 @@ function loadFirstWARC(name) {
5454
return parser;
5555
}
5656

57+
function deleteFirstWARC(name) {
58+
const archiveWarcLists = fs.readdirSync(
59+
`test-crawls/collections/${name}/archive`,
60+
);
61+
62+
const warcName = path.join(`test-crawls/collections/${name}/archive`, archiveWarcLists[0]);
63+
64+
fs.unlinkSync(warcName);
65+
}
66+
5767
function loadDataPackageRelated(name) {
5868
execSync(
5969
`unzip test-crawls/collections/${name}/${name}.wacz -d test-crawls/collections/${name}/wacz`,
@@ -67,11 +77,60 @@ function loadDataPackageRelated(name) {
6777
return dataPackageJSON.relation;
6878
}
6979

80+
test("check revisit records written on duplicate crawl, same collection, no wacz", async () => {
81+
82+
const collName = "dedupe-test-same-coll";
83+
84+
expect(await runCrawl(collName, {limit: 1, wacz: false})).toBe(0);
85+
86+
let statusCode = -1;
87+
88+
let response = 0;
89+
let revisit = 0;
90+
91+
const parserOrig = loadFirstWARC(collName);
92+
93+
for await (const record of parserOrig) {
94+
if (record.warcTargetURI && record.warcTargetURI.startsWith("urn:")) {
95+
continue;
96+
}
97+
98+
if (record.warcType === "response") {
99+
response++;
100+
}
101+
}
102+
103+
deleteFirstWARC(collName);
104+
105+
expect(await runCrawl(collName, {limit: 1, wacz: false})).toBe(0);
106+
107+
const dupeOrig = loadFirstWARC(collName);
108+
109+
for await (const record of dupeOrig) {
110+
if (record.warcTargetURI && record.warcTargetURI.startsWith("urn:")) {
111+
continue;
112+
}
113+
114+
if (record.warcType === "revisit") {
115+
revisit++;
116+
}
117+
}
118+
119+
expect(response).toBeGreaterThan(0);
120+
121+
// revisits should match number of responses for non urn:
122+
expect(response).toBe(revisit);
123+
124+
numResponses = response;
125+
});
126+
127+
128+
70129

71-
test("check revisit records written on duplicate crawl", async () => {
130+
test("check revisit records written on duplicate crawl, different collections, with wacz", async () => {
72131

73-
expect(await runCrawl("dedupe-test-orig")).toBe(0);
74-
expect(await runCrawl("dedupe-test-dupe")).toBe(0);
132+
expect(await runCrawl("dedupe-test-orig", {db: 1})).toBe(0);
133+
expect(await runCrawl("dedupe-test-dupe", {db: 1})).toBe(0);
75134

76135
let statusCode = -1;
77136

@@ -111,20 +170,20 @@ test("check revisit records written on duplicate crawl", async () => {
111170
});
112171

113172

114-
test("import index and crawl dupe", async () => {
173+
test("import dupe index from wacz", async () => {
115174

116-
execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedupe-test-orig/dedupe-test-orig.wacz --sourceCrawlId dedupe-test-orig --redisDedupeUrl redis://dedupe-redis:6379/1`);
175+
execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedupe-test-orig/dedupe-test-orig.wacz --sourceCrawlId dedupe-test-orig --redisDedupeUrl redis://dedupe-redis:6379/2`);
117176

118-
const redis = new Redis("redis://127.0.0.1:37379/1", { lazyConnect: true, retryStrategy: () => null });
177+
const redis = new Redis("redis://127.0.0.1:37379/2", { lazyConnect: true, retryStrategy: () => null });
119178

120179
await redis.connect({maxRetriesPerRequest: 50});
121180

122181
expect(await redis.hlen("alldupes")).toBe(numResponses);
123182
});
124183

125184

126-
test("imported crawl dupe matches previous dupe count", async () => {
127-
expect(await runCrawl("dedupe-test-dupe-2", 1)).toBe(0);
185+
test("verify crawl with imported dupe index has same dupes as dedupe against original", async () => {
186+
expect(await runCrawl("dedupe-test-dupe-2", {db: 2})).toBe(0);
128187

129188
const dupeOrig = loadFirstWARC("dedupe-test-dupe-2");
130189

0 commit comments

Comments
 (0)