11package us .codecraft .webmagic .scheduler ;
22
3- import java .io .BufferedReader ;
4- import java .io .Closeable ;
5- import java .io .File ;
6- import java .io .FileNotFoundException ;
7- import java .io .FileReader ;
8- import java .io .FileWriter ;
9- import java .io .IOException ;
10- import java .io .PrintWriter ;
11- import java .util .LinkedHashSet ;
12- import java .util .Set ;
13- import java .util .concurrent .BlockingQueue ;
14- import java .util .concurrent .Executors ;
15- import java .util .concurrent .LinkedBlockingQueue ;
16- import java .util .concurrent .ScheduledExecutorService ;
17- import java .util .concurrent .TimeUnit ;
18- import java .util .concurrent .atomic .AtomicBoolean ;
19- import java .util .concurrent .atomic .AtomicInteger ;
20-
21- import org .apache .commons .io .IOUtils ;
223import org .apache .commons .lang3 .math .NumberUtils ;
23-
244import us .codecraft .webmagic .Request ;
255import us .codecraft .webmagic .Task ;
26- import us .codecraft .webmagic .scheduler .component .DuplicateRemover ;
6+
7+ import java .io .*;
8+ import java .util .concurrent .*;
9+ import java .util .concurrent .atomic .AtomicBoolean ;
10+ import java .util .concurrent .atomic .AtomicInteger ;
2711
2812
2913/**
3216 * @author code4crafter@gmail.com <br>
3317 * @since 0.2.0
3418 */
35- public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler ,Closeable {
19+ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler , Closeable {
3620
3721 private String filePath = System .getProperty ("java.io.tmpdir" );
3822
@@ -52,8 +36,6 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement
5236
5337 private BlockingQueue <Request > queue ;
5438
55- private Set <String > urls ;
56-
5739 private ScheduledExecutorService flushThreadPool ;
5840
5941 public FileCacheQueueScheduler (String filePath ) {
@@ -83,36 +65,13 @@ private void init(Task task) {
8365 }
8466
8567 private void initDuplicateRemover () {
86- setDuplicateRemover (
87- new DuplicateRemover () {
88- @ Override
89- public boolean isDuplicate (Request request , Task task ) {
90- if (!inited .get ()) {
91- init (task );
92- }
93- return !urls .add (request .getUrl ());
94- }
95-
96- @ Override
97- public void resetDuplicateCheck (Task task ) {
98- urls .clear ();
99- }
100-
101- @ Override
102- public int getTotalRequestsCount (Task task ) {
103- return urls .size ();
104- }
105- });
68+ BloomFilterDuplicateRemover bloomFilterDuplicateRemover = new BloomFilterDuplicateRemover (this .filePath .hashCode ());
69+ setDuplicateRemover (bloomFilterDuplicateRemover );
10670 }
10771
10872 private void initFlushThread () {
109- flushThreadPool = Executors .newScheduledThreadPool (1 );
110- flushThreadPool .scheduleAtFixedRate (new Runnable () {
111- @ Override
112- public void run () {
113- flush ();
114- }
115- }, 10 , 10 , TimeUnit .SECONDS );
73+ flushThreadPool = Executors .newScheduledThreadPool (1 );
74+ flushThreadPool .scheduleAtFixedRate (this ::flush , 10 , 10 , TimeUnit .SECONDS );
11675 }
11776
11877 private void initWriter () {
@@ -127,7 +86,6 @@ private void initWriter() {
12786 private void readFile () {
12887 try {
12988 queue = new LinkedBlockingQueue <Request >();
130- urls = new LinkedHashSet <String >();
13189 readCursorFile ();
13290 readUrlFile ();
13391 // initDuplicateRemover();
@@ -140,46 +98,43 @@ private void readFile() {
14098 }
14199
142100 private void readUrlFile () throws IOException {
143- String line ;
144- BufferedReader fileUrlReader = null ;
145- try {
146- fileUrlReader = new BufferedReader (new FileReader (getFileName (fileUrlAllName )));
101+ try (BufferedReader fileUrlReader = new BufferedReader (new FileReader (getFileName (fileUrlAllName )))) {
102+ String line ;
147103 int lineReaded = 0 ;
148104 while ((line = fileUrlReader .readLine ()) != null ) {
149- urls .add (line .trim ());
105+ Request request = deserializeRequest (line );
106+ this .getDuplicateRemover ().isDuplicate (request , null );
150107 lineReaded ++;
151108 if (lineReaded > cursor .get ()) {
152- queue .add (deserializeRequest ( line ) );
109+ queue .add (request );
153110 }
154111 }
155- } finally {
156- if (fileUrlReader != null ) {
157- IOUtils .closeQuietly (fileUrlReader );
158- }
159112 }
160113 }
161114
162115 private void readCursorFile () throws IOException {
163- BufferedReader fileCursorReader = null ;
164- try {
165- fileCursorReader = new BufferedReader (new FileReader (getFileName (fileCursor )));
116+ String fileName = getFileName (fileCursor );
117+ try (BufferedReader fileCursorReader = new BufferedReader (new FileReader (fileName ))) {
166118 String line ;
119+ String lastLine = null ;
167120 //read the last number
168121 while ((line = fileCursorReader .readLine ()) != null ) {
169- cursor = new AtomicInteger (NumberUtils .toInt (line ));
122+ line = line .trim ();
123+ if (!line .isEmpty ()) {
124+ lastLine = line ;
125+ }
170126 }
171- } finally {
172- if (fileCursorReader != null ) {
173- IOUtils .closeQuietly (fileCursorReader );
127+ if (lastLine != null ) {
128+ cursor .set (NumberUtils .toInt (line ));
174129 }
175130 }
176131 }
177-
132+
178133 public void close () throws IOException {
179- flushThreadPool .shutdown ();
180- fileUrlWriter .close ();
181- fileCursorWriter .close ();
182- }
134+ flushThreadPool .shutdown ();
135+ fileUrlWriter .close ();
136+ fileCursorWriter .close ();
137+ }
183138
184139 private String getFileName (String filename ) {
185140 return filePath + task .getUUID () + filename ;
0 commit comments