Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
80aef8f
dedup work:
ikreymer Aug 30, 2025
bf53ec2
args: add separate --dedupIndexUrl to support separate redis for dedup
ikreymer Sep 17, 2025
8cb7676
add indexer entrypoint:
ikreymer Sep 18, 2025
5b22fb1
keep skipping dupe URLs as before
ikreymer Sep 18, 2025
09421aa
warc writing:
ikreymer Sep 18, 2025
d90ed28
rename --dedupStoreUrl -> redisDedupUrl
ikreymer Sep 18, 2025
f9c5c19
update to latest warcio (2.4.7) to fix issus when returning payload o…
ikreymer Sep 18, 2025
cbf3b2f
bump to 2.4.7
ikreymer Sep 18, 2025
e932c1c
tests: add dedup-basic.test for simple dedup, ensure number of revisi…
ikreymer Sep 18, 2025
f685a97
deps update
ikreymer Sep 20, 2025
7d1e436
dedup indexing: strip hash prefix from digest, as cdx does not have it
ikreymer Sep 23, 2025
1a8ada7
use dedup redis for queue up wacz files that need to be updated
ikreymer Sep 23, 2025
d29bf31
dedup post requests and non-404s as well!
ikreymer Sep 25, 2025
b274456
- track source index for each hash, so entry becomes '<source index> …
ikreymer Oct 18, 2025
a1767dc
update to new data model:
ikreymer Oct 24, 2025
11bc180
cleanup, keep compatibility with redis 6 still
ikreymer Oct 24, 2025
bcc6882
always return wacz, store wacz depends only for current wacz
ikreymer Oct 24, 2025
7b02f65
rename 'dedup' -> 'dedupe' for consistency
ikreymer Oct 25, 2025
1db1010
indexer optimize: commit only if added
ikreymer Oct 25, 2025
64e4a95
add removing option to also remove unused crawls if doing a full sync…
ikreymer Oct 25, 2025
6c1f4d9
rebase fix
ikreymer Nov 28, 2025
4bc95fc
generate wacz filename if deduping
ikreymer Nov 28, 2025
7f3f5a3
cleanup pass:
ikreymer Nov 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/ui.js /app/html/rw
ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/sw.js /app/html/rwp/
ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/adblock/adblock.gz /app/html/rwp/adblock.gz

RUN chmod a+x /app/dist/main.js /app/dist/create-login-profile.js && chmod a+r /app/html/rwp/*
RUN chmod a+x /app/dist/main.js /app/dist/create-login-profile.js /app/dist/indexer.js && chmod a+r /app/html/rwp/*

RUN ln -s /app/dist/main.js /usr/bin/crawl; \
ln -s /app/dist/main.js /usr/bin/qa; \
ln -s /app/dist/create-login-profile.js /usr/bin/create-login-profile
ln -s /app/dist/create-login-profile.js /usr/bin/create-login-profile; \
ln -s /app/dist/indexer.js /usr/bin/indexer;

RUN mkdir -p /app/behaviors

Expand Down
98 changes: 62 additions & 36 deletions src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
} from "./util/storage.js";
import { ScreenCaster, WSTransport } from "./util/screencaster.js";
import { Screenshots } from "./util/screenshots.js";
import { initRedis } from "./util/redis.js";
import { initRedisWaitForSuccess } from "./util/redis.js";
import { logger, formatErr, LogDetails, LogContext } from "./util/logger.js";
import { WorkerState, closeWorkers, runWorkers } from "./util/worker.js";
import { sleep, timedRun, secondsElapsed } from "./util/timing.js";
Expand Down Expand Up @@ -203,6 +203,7 @@ export class Crawler {
| null = null;

recording: boolean;
deduping = false;

constructor() {
const args = this.parseArgs();
Expand Down Expand Up @@ -342,32 +343,30 @@ export class Crawler {

async initCrawlState() {
const redisUrl = this.params.redisStoreUrl || "redis://localhost:6379/0";
const dedupeRedisUrl = this.params.redisDedupeUrl || redisUrl;

this.deduping = dedupeRedisUrl !== redisUrl;

if (!redisUrl.startsWith("redis://")) {
logger.fatal(
"stateStoreUrl must start with redis:// -- Only redis-based store currently supported",
);
}

let redis;

while (true) {
try {
redis = await initRedis(redisUrl);
break;
} catch (e) {
//logger.fatal("Unable to connect to state store Redis: " + redisUrl);
logger.warn(`Waiting for redis at ${redisUrl}`, {}, "state");
await sleep(1);
}
}
const redis = await initRedisWaitForSuccess(redisUrl);

logger.debug(
`Storing state via Redis ${redisUrl} @ key prefix "${this.crawlId}"`,
{},
"state",
);

let dedupeRedis = redis;

if (redisUrl !== dedupeRedisUrl) {
dedupeRedis = await initRedisWaitForSuccess(dedupeRedisUrl);
}

logger.debug(`Max Page Time: ${this.maxPageTime} seconds`, {}, "state");

this.crawlState = new RedisCrawlState(
Expand All @@ -376,6 +375,7 @@ export class Crawler {
this.maxPageTime,
os.hostname(),
this.params.maxPageRetries,
dedupeRedis,
);

if (this.params.logErrorsToRedis) {
Expand Down Expand Up @@ -1095,7 +1095,7 @@ self.__bx_behaviors.selectMainBehavior();
const { page, cdp, data, workerid, callbacks, recorder } = opts;
data.callbacks = callbacks;

const { url, seedId } = data;
const { url, seedId, depth } = data;

const auth = this.seeds[seedId].authHeader();

Expand Down Expand Up @@ -1168,6 +1168,7 @@ self.__bx_behaviors.selectMainBehavior();

if (recorder) {
recorder.pageSeed = seed;
recorder.pageSeedDepth = depth;
}

// run custom driver here, if any
Expand Down Expand Up @@ -1346,6 +1347,7 @@ self.__bx_behaviors.selectMainBehavior();
} else {
if (pageSkipped) {
await this.crawlState.markExcluded(url);
this.limitHit = false;
} else {
const retry = await this.crawlState.markFailed(url);

Expand Down Expand Up @@ -1711,8 +1713,11 @@ self.__bx_behaviors.selectMainBehavior();
this.storage = initStorage();
}

if (this.params.generateWACZ && this.storage) {
await this.crawlState.setWACZFilename();
if (this.params.generateWACZ && (this.storage || this.deduping)) {
const filename = await this.crawlState.setWACZFilename();
if (this.deduping) {
await this.crawlState.addSourceWACZForDedupe(filename);
}
}

if (POST_CRAWL_STATES.includes(initState)) {
Expand Down Expand Up @@ -1911,21 +1916,37 @@ self.__bx_behaviors.selectMainBehavior();
}

if (this.params.generateWACZ && generateFiles) {
const uploaded = await this.generateWACZ();
const wacz = await this.generateWACZ();

if (uploaded && this.uploadAndDeleteLocal) {
await this.crawlState.setArchiveSize(0);
logger.info(
`Uploaded WACZ, deleting local data to free up space: ${this.collDir}`,
);
try {
fs.rmSync(this.collDir, { recursive: true, force: true });
} catch (e) {
logger.warn(`Unable to clear ${this.collDir} before exit`, e);
if (wacz) {
await this.crawlState.clearWACZFilename();

if (this.deduping) {
await this.crawlState.updateDedupeSourceWACZ(wacz);
}

if (this.storage && this.uploadAndDeleteLocal) {
await this.crawlState.setArchiveSize(0);

logger.info(
`Uploaded WACZ, deleting local data to free up space: ${this.collDir}`,
);
try {
fs.rmSync(this.collDir, { recursive: true, force: true });
} catch (e) {
logger.warn(`Unable to clear ${this.collDir} before exit`, e);
}
}
}
}

if (this.deduping) {
//await this.crawlState.clearDupeCrawlRef();

// commit crawl data to main index
await this.crawlState.commitDedupeDone();
}

if (this.finalExit && generateFiles && this.params.saveProfile) {
const resource = await this.browser.saveProfile(
this.params.saveProfile,
Expand Down Expand Up @@ -1963,7 +1984,7 @@ self.__bx_behaviors.selectMainBehavior();
await streamFinish(logFH);
}

async generateWACZ() {
async generateWACZ(): Promise<WACZ | null> {
logger.info("Generating WACZ");
await this.crawlState.setStatus("generate-wacz");

Expand All @@ -1977,11 +1998,11 @@ self.__bx_behaviors.selectMainBehavior();
if (!warcFileList.length) {
// if finished, just return
if (isFinished || (await this.crawlState.isCrawlCanceled())) {
return;
return null;
}
// possibly restarted after committing, so assume done here!
if ((await this.crawlState.numDone()) > 0) {
return;
return null;
}
// fail crawl otherwise
logger.fatal("No WARC Files, assuming crawl failed");
Expand All @@ -2001,6 +2022,8 @@ self.__bx_behaviors.selectMainBehavior();

await this.closeLog();

const requires = await this.crawlState.getDupeDependentCrawls();

const waczOpts: WACZInitOpts = {
input: warcFileList.map((x) => path.join(this.archivesDir, x)),
output: waczPath,
Expand All @@ -2009,6 +2032,7 @@ self.__bx_behaviors.selectMainBehavior();
warcCdxDir: this.warcCdxDir,
indexesDir: this.indexesDir,
softwareString: this.infoString,
requires,
};

if (process.env.WACZ_SIGN_URL) {
Expand Down Expand Up @@ -2038,13 +2062,8 @@ self.__bx_behaviors.selectMainBehavior();
const targetFilename = await this.crawlState.getWACZFilename();

await this.storage.uploadCollWACZ(wacz, targetFilename, isFinished);

await this.crawlState.clearWACZFilename();

return true;
}

return false;
return wacz;
} catch (e) {
logger.error("Error creating WACZ", e);
if (!streaming) {
Expand All @@ -2053,6 +2072,8 @@ self.__bx_behaviors.selectMainBehavior();
await this.setStatusAndExit(ExitCodes.UploadFailed, "interrupted");
}
}

return null;
}

logMemory() {
Expand Down Expand Up @@ -2218,7 +2239,12 @@ self.__bx_behaviors.selectMainBehavior();
// excluded in recorder
if (msg.startsWith("net::ERR_BLOCKED_BY_RESPONSE")) {
data.pageSkipped = true;
logger.warn("Page Load Blocked, skipping", { msg, loadState });
logger.warn(
"Page Load Blocked, skipping",
{ msg, loadState },
"pageStatus",
);
throw new Error("logged");
} else {
return this.pageFailed("Page Load Failed", retry, {
msg,
Expand Down
Loading
Loading