Commit c6661899 authored by yihua.huang's avatar yihua.huang

new thread pool #110

parent 179baa7a
package us.codecraft.webmagic; package us.codecraft.webmagic;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.downloader.Downloader; import us.codecraft.webmagic.downloader.Downloader;
import us.codecraft.webmagic.downloader.HttpClientDownloader; import us.codecraft.webmagic.downloader.HttpClientDownloader;
import us.codecraft.webmagic.pipeline.CollectorPipeline; import us.codecraft.webmagic.pipeline.CollectorPipeline;
...@@ -15,7 +13,7 @@ import us.codecraft.webmagic.pipeline.ResultItemsCollectorPipeline; ...@@ -15,7 +13,7 @@ import us.codecraft.webmagic.pipeline.ResultItemsCollectorPipeline;
import us.codecraft.webmagic.processor.PageProcessor; import us.codecraft.webmagic.processor.PageProcessor;
import us.codecraft.webmagic.scheduler.QueueScheduler; import us.codecraft.webmagic.scheduler.QueueScheduler;
import us.codecraft.webmagic.scheduler.Scheduler; import us.codecraft.webmagic.scheduler.Scheduler;
import us.codecraft.webmagic.utils.ThreadUtils; import us.codecraft.webmagic.selector.thread.ThreadPool;
import us.codecraft.webmagic.utils.UrlUtils; import us.codecraft.webmagic.utils.UrlUtils;
import java.io.Closeable; import java.io.Closeable;
...@@ -79,7 +77,7 @@ public class Spider implements Runnable, Task { ...@@ -79,7 +77,7 @@ public class Spider implements Runnable, Task {
protected Logger logger = LoggerFactory.getLogger(getClass()); protected Logger logger = LoggerFactory.getLogger(getClass());
protected ExecutorService executorService; protected ThreadPool threadPool;
protected int threadNum = 1; protected int threadNum = 1;
...@@ -101,8 +99,6 @@ public class Spider implements Runnable, Task { ...@@ -101,8 +99,6 @@ public class Spider implements Runnable, Task {
private Condition newUrlCondition = newUrlLock.newCondition(); private Condition newUrlCondition = newUrlLock.newCondition();
private final AtomicInteger threadAlive = new AtomicInteger(0);
private List<SpiderListener> spiderListeners; private List<SpiderListener> spiderListeners;
private final AtomicLong pageCount = new AtomicLong(0); private final AtomicLong pageCount = new AtomicLong(0);
...@@ -283,8 +279,8 @@ public class Spider implements Runnable, Task { ...@@ -283,8 +279,8 @@ public class Spider implements Runnable, Task {
pipelines.add(new ConsolePipeline()); pipelines.add(new ConsolePipeline());
} }
downloader.setThread(threadNum); downloader.setThread(threadNum);
if (executorService == null || executorService.isShutdown()) { if (threadPool == null || threadPool.isShutdown()) {
executorService = ThreadUtils.newFixedThreadPool(threadNum); threadPool = new ThreadPool(threadNum);
} }
if (startRequests != null) { if (startRequests != null) {
for (Request request : startRequests) { for (Request request : startRequests) {
...@@ -292,7 +288,6 @@ public class Spider implements Runnable, Task { ...@@ -292,7 +288,6 @@ public class Spider implements Runnable, Task {
} }
startRequests.clear(); startRequests.clear();
} }
threadAlive.set(0);
} }
@Override @Override
...@@ -303,15 +298,14 @@ public class Spider implements Runnable, Task { ...@@ -303,15 +298,14 @@ public class Spider implements Runnable, Task {
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
Request request = scheduler.poll(this); Request request = scheduler.poll(this);
if (request == null) { if (request == null) {
if (threadAlive.get() == 0 && exitWhenComplete) { if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
break; break;
} }
// wait until new url added // wait until new url added
waitNewUrl(); waitNewUrl();
} else { } else {
final Request requestFinal = request; final Request requestFinal = request;
threadAlive.incrementAndGet(); threadPool.execute(new Runnable() {
executorService.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
...@@ -321,7 +315,6 @@ public class Spider implements Runnable, Task { ...@@ -321,7 +315,6 @@ public class Spider implements Runnable, Task {
onError(requestFinal); onError(requestFinal);
logger.error("process request " + requestFinal + " error", e); logger.error("process request " + requestFinal + " error", e);
} finally { } finally {
threadAlive.decrementAndGet();
pageCount.incrementAndGet(); pageCount.incrementAndGet();
signalNewUrl(); signalNewUrl();
} }
...@@ -370,7 +363,7 @@ public class Spider implements Runnable, Task { ...@@ -370,7 +363,7 @@ public class Spider implements Runnable, Task {
for (Pipeline pipeline : pipelines) { for (Pipeline pipeline : pipelines) {
destroyEach(pipeline); destroyEach(pipeline);
} }
executorService.shutdown(); threadPool.shutdown();
} }
private void destroyEach(Object object) { private void destroyEach(Object object) {
...@@ -522,7 +515,7 @@ public class Spider implements Runnable, Task { ...@@ -522,7 +515,7 @@ public class Spider implements Runnable, Task {
newUrlLock.lock(); newUrlLock.lock();
try { try {
//double check //double check
if (threadAlive.get() == 0 && exitWhenComplete) { if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
return; return;
} }
newUrlCondition.await(); newUrlCondition.await();
...@@ -644,7 +637,7 @@ public class Spider implements Runnable, Task { ...@@ -644,7 +637,7 @@ public class Spider implements Runnable, Task {
* @since 0.4.1 * @since 0.4.1
*/ */
public int getThreadAlive() { public int getThreadAlive() {
return threadAlive.get(); return threadPool.getThreadAlive();
} }
/** /**
...@@ -674,7 +667,7 @@ public class Spider implements Runnable, Task { ...@@ -674,7 +667,7 @@ public class Spider implements Runnable, Task {
} }
public Spider setExecutorService(ExecutorService executorService) { public Spider setExecutorService(ExecutorService executorService) {
this.executorService = executorService; this.threadPool.setExecutorService(executorService);
return this; return this;
} }
......
...@@ -11,11 +11,12 @@ import us.codecraft.webmagic.processor.PageProcessor; ...@@ -11,11 +11,12 @@ import us.codecraft.webmagic.processor.PageProcessor;
*/ */
public class GithubRepoPageProcessor implements PageProcessor { public class GithubRepoPageProcessor implements PageProcessor {
private Site site = Site.me().setRetryTimes(3).setSleepTime(1000); private Site site = Site.me().setRetryTimes(3).setSleepTime(0);
@Override @Override
public void process(Page page) { public void process(Page page) {
page.addTargetRequests(page.getHtml().links().regex("(https://github\\.com/\\w+/\\w+)").all()); page.addTargetRequests(page.getHtml().links().regex("(https://github\\.com/\\w+/\\w+)").all());
page.addTargetRequests(page.getHtml().links().regex("(https://github\\.com/\\w+)").all());
page.putField("author", page.getUrl().regex("https://github\\.com/(\\w+)/.*").toString()); page.putField("author", page.getUrl().regex("https://github\\.com/(\\w+)/.*").toString());
page.putField("name", page.getHtml().xpath("//h1[@class='entry-title public']/strong/a/text()").toString()); page.putField("name", page.getHtml().xpath("//h1[@class='entry-title public']/strong/a/text()").toString());
if (page.getResultItems().get("name")==null){ if (page.getResultItems().get("name")==null){
......
package us.codecraft.webmagic.selector.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author code4crafer@gmail.com
* @since 0.5.0
*/
public class ThreadPool {
private int threadNum;
private int threadAlive;
private ReentrantLock reentrantLock = new ReentrantLock();
private Condition condition = reentrantLock.newCondition();
public ThreadPool(int threadNum) {
this.threadNum = threadNum;
this.executorService = Executors.newFixedThreadPool(threadNum);
}
public ThreadPool(int threadNum, ExecutorService executorService) {
this.threadNum = threadNum;
this.executorService = executorService;
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
public int getThreadAlive() {
return threadAlive;
}
public int getThreadNum() {
return threadNum;
}
private ExecutorService executorService;
public void execute(Runnable runnable) {
try {
reentrantLock.lock();
while (threadAlive >= threadNum) {
try {
condition.await();
} catch (InterruptedException e) {
}
}
threadAlive++;
executorService.execute(runnable);
} finally {
condition.notify();
threadAlive--;
reentrantLock.unlock();
}
}
public boolean isShutdown() {
return executorService.isShutdown();
}
public void shutdown() {
executorService.shutdown();
}
}
...@@ -19,7 +19,6 @@ public class ThreadUtils { ...@@ -19,7 +19,6 @@ public class ThreadUtils {
} }
if (threadSize == 1) { if (threadSize == 1) {
return MoreExecutors.sameThreadExecutor(); return MoreExecutors.sameThreadExecutor();
} }
return new ThreadPoolExecutor(threadSize - 1, threadSize - 1, 0L, TimeUnit.MILLISECONDS, return new ThreadPoolExecutor(threadSize - 1, threadSize - 1, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
......
...@@ -240,7 +240,7 @@ public class SpiderMonitor { ...@@ -240,7 +240,7 @@ public class SpiderMonitor {
//Others will be registered //Others will be registered
spiderMonitor.server().jmxStart(); spiderMonitor.server().jmxStart();
oschinaSpider.start(); oschinaSpider.start();
githubSpider.start(); githubSpider.thread(10).start();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment