Commit fba33087 authored by yihua.huang's avatar yihua.huang

fix a thread pool exception

parent 3c79d031
...@@ -21,23 +21,28 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -21,23 +21,28 @@ import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Entrance of a crawler.<br> * Entrance of a crawler.<br>
* A spider contains four modules: Downloader, Scheduler, PageProcessor and Pipeline.<br> * A spider contains four modules: Downloader, Scheduler, PageProcessor and
* Every module is a field of Spider. <br> * Pipeline.<br>
* The modules are defined in interface. <br> * Every module is a field of Spider. <br>
* You can customize a spider with various implementations of them. <br> * The modules are defined in interface. <br>
* Examples: <br> * You can customize a spider with various implementations of them. <br>
* Examples: <br>
* <br> * <br>
* A simple crawler: <br> * A simple crawler: <br>
* Spider.create(new SimplePageProcessor("http://my.oschina.net/", "http://my.oschina.net/*blog/*")).run();<br> * Spider.create(new SimplePageProcessor("http://my.oschina.net/",
* "http://my.oschina.net/*blog/*")).run();<br>
* <br> * <br>
* Store results to files by FilePipeline: <br> * Store results to files by FilePipeline: <br>
* Spider.create(new SimplePageProcessor("http://my.oschina.net/", "http://my.oschina.net/*blog/*")) <br> * Spider.create(new SimplePageProcessor("http://my.oschina.net/",
* .pipeline(new FilePipeline("/data/temp/webmagic/")).run(); <br> * "http://my.oschina.net/*blog/*")) <br>
* .pipeline(new FilePipeline("/data/temp/webmagic/")).run(); <br>
* <br> * <br>
* Use FileCacheQueueScheduler to store urls and cursor in files, so that a Spider can resume the status when shutdown. <br> * Use FileCacheQueueScheduler to store urls and cursor in files, so that a
* Spider.create(new SimplePageProcessor("http://my.oschina.net/", "http://my.oschina.net/*blog/*")) <br> * Spider can resume the status when shutdown. <br>
* .scheduler(new FileCacheQueueScheduler("/data/temp/webmagic/cache/")).run(); <br> * Spider.create(new SimplePageProcessor("http://my.oschina.net/",
* * "http://my.oschina.net/*blog/*")) <br>
* .scheduler(new FileCacheQueueScheduler("/data/temp/webmagic/cache/")).run(); <br>
*
* @author code4crafter@gmail.com <br> * @author code4crafter@gmail.com <br>
* @see Downloader * @see Downloader
* @see Scheduler * @see Scheduler
...@@ -47,373 +52,381 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -47,373 +52,381 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class Spider implements Runnable, Task { public class Spider implements Runnable, Task {
protected Downloader downloader; protected Downloader downloader;
protected List<Pipeline> pipelines = new ArrayList<Pipeline>(); protected List<Pipeline> pipelines = new ArrayList<Pipeline>();
protected PageProcessor pageProcessor; protected PageProcessor pageProcessor;
protected List<String> startUrls; protected List<String> startUrls;
protected Site site; protected Site site;
protected String uuid; protected String uuid;
protected Scheduler scheduler = new QueueScheduler(); protected Scheduler scheduler = new QueueScheduler();
protected Logger logger = Logger.getLogger(getClass()); protected Logger logger = Logger.getLogger(getClass());
protected ExecutorService executorService; protected ExecutorService executorService;
protected int threadNum = 1; protected int threadNum = 1;
protected AtomicInteger stat = new AtomicInteger(STAT_INIT); protected AtomicInteger stat = new AtomicInteger(STAT_INIT);
protected final static int STAT_INIT = 0; protected final static int STAT_INIT = 0;
protected final static int STAT_RUNNING = 1; protected final static int STAT_RUNNING = 1;
protected final static int STAT_STOPPED = 2; protected final static int STAT_STOPPED = 2;
/** /**
* create a spider with pageProcessor. * create a spider with pageProcessor.
* *
* @param pageProcessor * @param pageProcessor
* @return new spider * @return new spider
* @see PageProcessor * @see PageProcessor
*/ */
public static Spider create(PageProcessor pageProcessor) { public static Spider create(PageProcessor pageProcessor) {
return new Spider(pageProcessor); return new Spider(pageProcessor);
} }
/** /**
* create a spider with pageProcessor. * create a spider with pageProcessor.
* *
* @param pageProcessor * @param pageProcessor
*/ */
public Spider(PageProcessor pageProcessor) { public Spider(PageProcessor pageProcessor) {
this.pageProcessor = pageProcessor; this.pageProcessor = pageProcessor;
this.site = pageProcessor.getSite(); this.site = pageProcessor.getSite();
this.startUrls = pageProcessor.getSite().getStartUrls(); this.startUrls = pageProcessor.getSite().getStartUrls();
} }
/** /**
* Set startUrls of Spider.<br> * Set startUrls of Spider.<br>
* Prior to startUrls of Site. * Prior to startUrls of Site.
* *
* @param startUrls * @param startUrls
* @return this * @return this
*/ */
public Spider startUrls(List<String> startUrls) { public Spider startUrls(List<String> startUrls) {
checkIfRunning(); checkIfRunning();
this.startUrls = startUrls; this.startUrls = startUrls;
return this; return this;
} }
/** /**
* Set an uuid for spider.<br> * Set an uuid for spider.<br>
* Default uuid is domain of site.<br> * Default uuid is domain of site.<br>
* *
* @param uuid * @param uuid
* @return this * @return this
*/ */
public Spider setUUID(String uuid) { public Spider setUUID(String uuid) {
this.uuid = uuid; this.uuid = uuid;
return this; return this;
} }
/** /**
* set scheduler for Spider * set scheduler for Spider
* *
* @param scheduler * @param scheduler
* @return this * @return this
* @Deprecated * @Deprecated
* @see #setScheduler(us.codecraft.webmagic.scheduler.Scheduler) * @see #setScheduler(us.codecraft.webmagic.scheduler.Scheduler)
*/ */
public Spider scheduler(Scheduler scheduler) { public Spider scheduler(Scheduler scheduler) {
return setScheduler(scheduler); return setScheduler(scheduler);
} }
/** /**
* set scheduler for Spider * set scheduler for Spider
* *
* @param scheduler * @param scheduler
* @return this * @return this
* @see Scheduler * @see Scheduler
* @since 0.2.1 * @since 0.2.1
*/ */
public Spider setScheduler(Scheduler scheduler) { public Spider setScheduler(Scheduler scheduler) {
checkIfRunning(); checkIfRunning();
this.scheduler = scheduler; this.scheduler = scheduler;
return this; return this;
} }
/** /**
* add a pipeline for Spider * add a pipeline for Spider
* *
* @param pipeline * @param pipeline
* @return this * @return this
* @see #setPipeline(us.codecraft.webmagic.pipeline.Pipeline) * @see #setPipeline(us.codecraft.webmagic.pipeline.Pipeline)
* @deprecated * @deprecated
*/ */
public Spider pipeline(Pipeline pipeline) { public Spider pipeline(Pipeline pipeline) {
return addPipeline(pipeline); return addPipeline(pipeline);
} }
/** /**
* add a pipeline for Spider * add a pipeline for Spider
* *
* @param pipeline * @param pipeline
* @return this * @return this
* @see Pipeline * @see Pipeline
* @since 0.2.1 * @since 0.2.1
*/ */
public Spider addPipeline(Pipeline pipeline) { public Spider addPipeline(Pipeline pipeline) {
checkIfRunning(); checkIfRunning();
this.pipelines.add(pipeline); this.pipelines.add(pipeline);
return this; return this;
} }
/** /**
* clear the pipelines set * clear the pipelines set
* *
* @return this * @return this
*/ */
public Spider clearPipeline() { public Spider clearPipeline() {
pipelines = new ArrayList<Pipeline>(); pipelines = new ArrayList<Pipeline>();
return this; return this;
} }
/** /**
* set the downloader of spider * set the downloader of spider
* *
* @param downloader * @param downloader
* @return this * @return this
* @see #setDownloader(us.codecraft.webmagic.downloader.Downloader) * @see #setDownloader(us.codecraft.webmagic.downloader.Downloader)
* @deprecated * @deprecated
*/ */
public Spider downloader(Downloader downloader) { public Spider downloader(Downloader downloader) {
return setDownloader(downloader); return setDownloader(downloader);
} }
/** /**
* set the downloader of spider * set the downloader of spider
* *
* @param downloader * @param downloader
* @return this * @return this
* @see Downloader * @see Downloader
*/ */
public Spider setDownloader(Downloader downloader) { public Spider setDownloader(Downloader downloader) {
checkIfRunning(); checkIfRunning();
this.downloader = downloader; this.downloader = downloader;
return this; return this;
} }
protected void checkComponent() { protected void checkComponent() {
if (downloader == null) { if (downloader == null) {
this.downloader = new HttpClientDownloader(); this.downloader = new HttpClientDownloader();
} }
if (pipelines.isEmpty()) { if (pipelines.isEmpty()) {
pipelines.add(new ConsolePipeline()); pipelines.add(new ConsolePipeline());
} }
downloader.setThread(threadNum); downloader.setThread(threadNum);
} }
@Override @Override
public void run() { public void run() {
if (!stat.compareAndSet(STAT_INIT, STAT_RUNNING) if (!stat.compareAndSet(STAT_INIT, STAT_RUNNING) && !stat.compareAndSet(STAT_STOPPED, STAT_RUNNING)) {
&& !stat.compareAndSet(STAT_STOPPED, STAT_RUNNING)) { throw new IllegalStateException("Spider is already running!");
throw new IllegalStateException("Spider is already running!"); }
} checkComponent();
checkComponent(); if (startUrls != null) {
if (startUrls != null) { for (String startUrl : startUrls) {
for (String startUrl : startUrls) { scheduler.push(new Request(startUrl), this);
scheduler.push(new Request(startUrl), this); }
} startUrls.clear();
startUrls.clear(); }
} Request request = scheduler.poll(this);
Request request = scheduler.poll(this); logger.info("Spider " + getUUID() + " started!");
//single thread // single thread
if (threadNum <= 1) { if (threadNum <= 1) {
while (request != null && stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) { while (request != null && stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) {
processRequest(request); processRequest(request);
request = scheduler.poll(this); request = scheduler.poll(this);
} }
} else { } else {
synchronized (this) { synchronized (this) {
this.executorService = ThreadUtils.newFixedThreadPool(threadNum); this.executorService = ThreadUtils.newFixedThreadPool(threadNum);
} }
//multi thread // multi thread
final AtomicInteger threadAlive = new AtomicInteger(0); final AtomicInteger threadAlive = new AtomicInteger(0);
while (true && stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) { while (true && stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) {
if (request == null) { if (request == null) {
//when no request found but some thread is alive, sleep a while. // when no request found but some thread is alive, sleep a
try { // while.
Thread.sleep(100); try {
} catch (InterruptedException e) { Thread.sleep(100);
} } catch (InterruptedException e) {
} else { }
final Request requestFinal = request; } else {
threadAlive.incrementAndGet(); final Request requestFinal = request;
executorService.execute(new Runnable() { threadAlive.incrementAndGet();
@Override executorService.execute(new Runnable() {
public void run() { @Override
processRequest(requestFinal); public void run() {
threadAlive.decrementAndGet(); processRequest(requestFinal);
} threadAlive.decrementAndGet();
}); }
} });
request = scheduler.poll(this); }
if (threadAlive.get() == 0) { request = scheduler.poll(this);
request = scheduler.poll(this); if (threadAlive.get() == 0) {
if (request == null) { request = scheduler.poll(this);
break; if (request == null) {
} break;
} }
} }
executorService.shutdown(); }
} executorService.shutdown();
stat.compareAndSet(STAT_RUNNING, STAT_STOPPED); }
//release some resources stat.compareAndSet(STAT_RUNNING, STAT_STOPPED);
destroy(); // release some resources
} destroy();
}
protected void destroy() {
destroyEach(downloader); protected void destroy() {
destroyEach(pageProcessor); destroyEach(downloader);
for (Pipeline pipeline : pipelines) { destroyEach(pageProcessor);
destroyEach(pipeline); for (Pipeline pipeline : pipelines) {
} destroyEach(pipeline);
} }
}
private void destroyEach(Object object) {
if (object instanceof Closeable) { private void destroyEach(Object object) {
try { if (object instanceof Closeable) {
((Closeable) object).close(); try {
} catch (IOException e) { ((Closeable) object).close();
e.printStackTrace(); } catch (IOException e) {
} e.printStackTrace();
} }
} }
}
/**
* Process specific urls without url discovering. /**
* * Process specific urls without url discovering.
* @param urls urls to process *
*/ * @param urls
public void test(String... urls) { * urls to process
checkComponent(); */
if (urls.length > 0) { public void test(String... urls) {
for (String url : urls) { checkComponent();
processRequest(new Request(url)); if (urls.length > 0) {
} for (String url : urls) {
} processRequest(new Request(url));
} }
}
protected void processRequest(Request request) { }
Page page = downloader.download(request, this);
if (page == null) { protected void processRequest(Request request) {
sleep(site.getSleepTime()); Page page = downloader.download(request, this);
return; if (page == null) {
} sleep(site.getSleepTime());
//for cycle retry return;
if (page.getHtml() == null) { }
addRequest(page); // for cycle retry
sleep(site.getSleepTime()); if (page.getHtml() == null) {
return; addRequest(page);
} sleep(site.getSleepTime());
pageProcessor.process(page); return;
addRequest(page); }
if (!page.getResultItems().isSkip()) { pageProcessor.process(page);
for (Pipeline pipeline : pipelines) { addRequest(page);
pipeline.process(page.getResultItems(), this); if (!page.getResultItems().isSkip()) {
} for (Pipeline pipeline : pipelines) {
} pipeline.process(page.getResultItems(), this);
sleep(site.getSleepTime()); }
} }
sleep(site.getSleepTime());
protected void sleep(int time) { }
try {
Thread.sleep(time); protected void sleep(int time) {
} catch (InterruptedException e) { try {
e.printStackTrace(); Thread.sleep(time);
} } catch (InterruptedException e) {
} e.printStackTrace();
}
protected void addRequest(Page page) { }
if (CollectionUtils.isNotEmpty(page.getTargetRequests())) {
for (Request request : page.getTargetRequests()) { protected void addRequest(Page page) {
scheduler.push(request, this); if (CollectionUtils.isNotEmpty(page.getTargetRequests())) {
} for (Request request : page.getTargetRequests()) {
} scheduler.push(request, this);
} }
}
protected void checkIfRunning() { }
if (!stat.compareAndSet(STAT_INIT, STAT_INIT) && !stat.compareAndSet(STAT_STOPPED, STAT_STOPPED)) {
throw new IllegalStateException("Spider is already running!"); protected void checkIfRunning() {
} if (!stat.compareAndSet(STAT_INIT, STAT_INIT) && !stat.compareAndSet(STAT_STOPPED, STAT_STOPPED)) {
} throw new IllegalStateException("Spider is already running!");
}
public void runAsync() { }
Thread thread = new Thread(this);
thread.setDaemon(false); public void runAsync() {
thread.start(); Thread thread = new Thread(this);
} thread.setDaemon(false);
thread.start();
public void start() { }
runAsync();
} public void start() {
runAsync();
public void stop() { }
stat.compareAndSet(STAT_RUNNING, STAT_STOPPED);
executorService.shutdown(); public void stop() {
} if (stat.compareAndSet(STAT_RUNNING, STAT_STOPPED)) {
if (executorService != null) {
public void stopAndDestroy() { executorService.shutdown();
stop(); }
destroy(); logger.info("Spider " + getUUID() + " stop success!");
} } else {
logger.info("Spider " + getUUID() + " stop fail!");
/** }
* start with more than one threads }
*
* @param threadNum public void stopAndDestroy() {
* @return this stop();
*/ destroy();
public Spider thread(int threadNum) { }
checkIfRunning();
this.threadNum = threadNum; /**
if (threadNum <= 0) { * start with more than one threads
throw new IllegalArgumentException("threadNum should be more than one!"); *
} * @param threadNum
if (threadNum == 1) { * @return this
return this; */
} public Spider thread(int threadNum) {
return this; checkIfRunning();
} this.threadNum = threadNum;
if (threadNum <= 0) {
/** throw new IllegalArgumentException("threadNum should be more than one!");
* switch off xsoup }
* if (threadNum == 1) {
* @return return this;
*/ }
public static void xsoupOff() { return this;
EnvironmentUtil.setUseXsoup(false); }
}
/**
@Override * switch off xsoup
public String getUUID() { *
if (uuid != null) { * @return
return uuid; */
} public static void xsoupOff() {
if (site != null) { EnvironmentUtil.setUseXsoup(false);
return site.getDomain(); }
}
return null; @Override
} public String getUUID() {
if (uuid != null) {
@Override return uuid;
public Site getSite() { }
return site; if (site != null) {
} return site.getDomain();
}
return null;
}
@Override
public Site getSite() {
return site;
}
} }
package us.codecraft.webmagic.utils; package us.codecraft.webmagic.utils;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -11,22 +11,11 @@ import java.util.concurrent.TimeUnit; ...@@ -11,22 +11,11 @@ import java.util.concurrent.TimeUnit;
*/ */
public class ThreadUtils { public class ThreadUtils {
public static ExecutorService newFixedThreadPool(int threadSize) { public static ExecutorService newFixedThreadPool(int threadSize) {
return new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS, if (threadSize <= 1) {
new LinkedBlockingQueue<Runnable>(1) { throw new IllegalArgumentException("ThreadSize must be greater than 1!");
}
private static final long serialVersionUID = -9028058603126367678L; return new ThreadPoolExecutor(threadSize - 1, threadSize - 1, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
@Override }
public boolean offer(Runnable e) {
try {
put(e);
return true;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}
});
}
} }
...@@ -18,11 +18,12 @@ public class SpiderTest { ...@@ -18,11 +18,12 @@ public class SpiderTest {
public void process(ResultItems resultItems, Task task) { public void process(ResultItems resultItems, Task task) {
System.out.println(1); System.out.println(1);
} }
}); }).thread(2);
spider.start(); spider.start();
Thread.sleep(10000); Thread.sleep(10000);
spider.stop(); spider.stop();
// spider.run(); Thread.sleep(10000);
spider.start();
Thread.sleep(10000); Thread.sleep(10000);
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment