Skip to content

Commit 15ec80f

Browse files
FileCacheQueueScheduler使用BloomFilter进行去重 (#1176)
Co-authored-by: xiezc <blanexie@qq.com>
1 parent 2c135da commit 15ec80f

File tree

2 files changed

+29
-75
lines changed

2 files changed

+29
-75
lines changed

webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,6 @@ private void onDownloadSuccess(Request request, Page page) {
458458
logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
459459
}
460460
sleep(site.getSleepTime());
461-
return;
462461
}
463462

464463
private void onDownloaderFail(Request request) {

webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java

Lines changed: 29 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,13 @@
11
package us.codecraft.webmagic.scheduler;
22

3-
import java.io.BufferedReader;
4-
import java.io.Closeable;
5-
import java.io.File;
6-
import java.io.FileNotFoundException;
7-
import java.io.FileReader;
8-
import java.io.FileWriter;
9-
import java.io.IOException;
10-
import java.io.PrintWriter;
11-
import java.util.LinkedHashSet;
12-
import java.util.Set;
13-
import java.util.concurrent.BlockingQueue;
14-
import java.util.concurrent.Executors;
15-
import java.util.concurrent.LinkedBlockingQueue;
16-
import java.util.concurrent.ScheduledExecutorService;
17-
import java.util.concurrent.TimeUnit;
18-
import java.util.concurrent.atomic.AtomicBoolean;
19-
import java.util.concurrent.atomic.AtomicInteger;
20-
21-
import org.apache.commons.io.IOUtils;
223
import org.apache.commons.lang3.math.NumberUtils;
23-
244
import us.codecraft.webmagic.Request;
255
import us.codecraft.webmagic.Task;
26-
import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
6+
7+
import java.io.*;
8+
import java.util.concurrent.*;
9+
import java.util.concurrent.atomic.AtomicBoolean;
10+
import java.util.concurrent.atomic.AtomicInteger;
2711

2812

2913
/**
@@ -32,7 +16,7 @@
3216
* @author code4crafter@gmail.com <br>
3317
* @since 0.2.0
3418
*/
35-
public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable {
19+
public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, Closeable {
3620

3721
private String filePath = System.getProperty("java.io.tmpdir");
3822

@@ -52,8 +36,6 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement
5236

5337
private BlockingQueue<Request> queue;
5438

55-
private Set<String> urls;
56-
5739
private ScheduledExecutorService flushThreadPool;
5840

5941
public FileCacheQueueScheduler(String filePath) {
@@ -83,36 +65,13 @@ private void init(Task task) {
8365
}
8466

8567
private void initDuplicateRemover() {
86-
setDuplicateRemover(
87-
new DuplicateRemover() {
88-
@Override
89-
public boolean isDuplicate(Request request, Task task) {
90-
if (!inited.get()) {
91-
init(task);
92-
}
93-
return !urls.add(request.getUrl());
94-
}
95-
96-
@Override
97-
public void resetDuplicateCheck(Task task) {
98-
urls.clear();
99-
}
100-
101-
@Override
102-
public int getTotalRequestsCount(Task task) {
103-
return urls.size();
104-
}
105-
});
68+
BloomFilterDuplicateRemover bloomFilterDuplicateRemover = new BloomFilterDuplicateRemover(this.filePath.hashCode());
69+
setDuplicateRemover(bloomFilterDuplicateRemover);
10670
}
10771

10872
private void initFlushThread() {
109-
flushThreadPool = Executors.newScheduledThreadPool(1);
110-
flushThreadPool.scheduleAtFixedRate(new Runnable() {
111-
@Override
112-
public void run() {
113-
flush();
114-
}
115-
}, 10, 10, TimeUnit.SECONDS);
73+
flushThreadPool = Executors.newScheduledThreadPool(1);
74+
flushThreadPool.scheduleAtFixedRate(this::flush, 10, 10, TimeUnit.SECONDS);
11675
}
11776

11877
private void initWriter() {
@@ -127,7 +86,6 @@ private void initWriter() {
12786
private void readFile() {
12887
try {
12988
queue = new LinkedBlockingQueue<Request>();
130-
urls = new LinkedHashSet<String>();
13189
readCursorFile();
13290
readUrlFile();
13391
// initDuplicateRemover();
@@ -140,46 +98,43 @@ private void readFile() {
14098
}
14199

142100
private void readUrlFile() throws IOException {
143-
String line;
144-
BufferedReader fileUrlReader = null;
145-
try {
146-
fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)));
101+
try (BufferedReader fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)))) {
102+
String line;
147103
int lineReaded = 0;
148104
while ((line = fileUrlReader.readLine()) != null) {
149-
urls.add(line.trim());
105+
Request request = deserializeRequest(line);
106+
this.getDuplicateRemover().isDuplicate(request, null);
150107
lineReaded++;
151108
if (lineReaded > cursor.get()) {
152-
queue.add(deserializeRequest(line));
109+
queue.add(request);
153110
}
154111
}
155-
} finally {
156-
if (fileUrlReader != null) {
157-
IOUtils.closeQuietly(fileUrlReader);
158-
}
159112
}
160113
}
161114

162115
private void readCursorFile() throws IOException {
163-
BufferedReader fileCursorReader = null;
164-
try {
165-
fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor)));
116+
String fileName = getFileName(fileCursor);
117+
try (BufferedReader fileCursorReader = new BufferedReader(new FileReader(fileName))) {
166118
String line;
119+
String lastLine = null;
167120
//read the last number
168121
while ((line = fileCursorReader.readLine()) != null) {
169-
cursor = new AtomicInteger(NumberUtils.toInt(line));
122+
line = line.trim();
123+
if (!line.isEmpty()) {
124+
lastLine = line;
125+
}
170126
}
171-
} finally {
172-
if (fileCursorReader != null) {
173-
IOUtils.closeQuietly(fileCursorReader);
127+
if (lastLine != null) {
128+
cursor.set(NumberUtils.toInt(line));
174129
}
175130
}
176131
}
177-
132+
178133
public void close() throws IOException {
179-
flushThreadPool.shutdown();
180-
fileUrlWriter.close();
181-
fileCursorWriter.close();
182-
}
134+
flushThreadPool.shutdown();
135+
fileUrlWriter.close();
136+
fileCursorWriter.close();
137+
}
183138

184139
private String getFileName(String filename) {
185140
return filePath + task.getUUID() + filename;

0 commit comments

Comments
 (0)