Commit 5f9e1a96 authored by Yihua Huang's avatar Yihua Huang

Merge pull request #233 from x1ny/master

修正FileCacheQueueScheduler导致程序不能正常结束和未关闭流
parents 7d7eb033 90e14b31
...@@ -374,6 +374,7 @@ public class Spider implements Runnable, Task { ...@@ -374,6 +374,7 @@ public class Spider implements Runnable, Task {
public void close() { public void close() {
destroyEach(downloader); destroyEach(downloader);
destroyEach(pageProcessor); destroyEach(pageProcessor);
destroyEach(scheduler);
for (Pipeline pipeline : pipelines) { for (Pipeline pipeline : pipelines) {
destroyEach(pipeline); destroyEach(pipeline);
} }
......
...@@ -12,17 +12,19 @@ import java.util.Set; ...@@ -12,17 +12,19 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Store urls and cursor in files so that a Spider can resume the status when shutdown.<br> * Store urls and cursor in files so that a Spider can resume the status when shutdown.<br>
* *
* @author code4crafter@gmail.com <br> * @author code4crafter@gmail.com <br>
* @since 0.2.0 * @since 0.2.0
*/ */
public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler { public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable {
private String filePath = System.getProperty("java.io.tmpdir"); private String filePath = System.getProperty("java.io.tmpdir");
...@@ -44,6 +46,8 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement ...@@ -44,6 +46,8 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement
private Set<String> urls; private Set<String> urls;
private ScheduledExecutorService flushThreadPool;
public FileCacheQueueScheduler(String filePath) { public FileCacheQueueScheduler(String filePath) {
if (!filePath.endsWith("/") && !filePath.endsWith("\\")) { if (!filePath.endsWith("/") && !filePath.endsWith("\\")) {
filePath += "/"; filePath += "/";
...@@ -94,7 +98,8 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement ...@@ -94,7 +98,8 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement
} }
private void initFlushThread() { private void initFlushThread() {
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() { flushThreadPool = Executors.newScheduledThreadPool(1);
flushThreadPool.scheduleAtFixedRate(new Runnable() {
@Override @Override
public void run() { public void run() {
flush(); flush();
...@@ -162,6 +167,12 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement ...@@ -162,6 +167,12 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement
} }
} }
public void close() throws IOException {
flushThreadPool.shutdown();
fileUrlWriter.close();
fileCursorWriter.close();
}
private String getFileName(String filename) { private String getFileName(String filename) {
return filePath + task.getUUID() + filename; return filePath + task.getUUID() + filename;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment