Commit 05eb7831 authored by yihua.huang's avatar yihua.huang

refactor and comments #110

parent 375e64e8
...@@ -13,7 +13,7 @@ import us.codecraft.webmagic.pipeline.ResultItemsCollectorPipeline; ...@@ -13,7 +13,7 @@ import us.codecraft.webmagic.pipeline.ResultItemsCollectorPipeline;
import us.codecraft.webmagic.processor.PageProcessor; import us.codecraft.webmagic.processor.PageProcessor;
import us.codecraft.webmagic.scheduler.QueueScheduler; import us.codecraft.webmagic.scheduler.QueueScheduler;
import us.codecraft.webmagic.scheduler.Scheduler; import us.codecraft.webmagic.scheduler.Scheduler;
import us.codecraft.webmagic.selector.thread.ThreadPool; import us.codecraft.webmagic.selector.thread.CountableThreadPool;
import us.codecraft.webmagic.utils.UrlUtils; import us.codecraft.webmagic.utils.UrlUtils;
import java.io.Closeable; import java.io.Closeable;
...@@ -74,7 +74,9 @@ public class Spider implements Runnable, Task { ...@@ -74,7 +74,9 @@ public class Spider implements Runnable, Task {
protected Logger logger = LoggerFactory.getLogger(getClass()); protected Logger logger = LoggerFactory.getLogger(getClass());
protected ThreadPool threadPool; protected CountableThreadPool threadPool;
protected ExecutorService executorService;
protected int threadNum = 1; protected int threadNum = 1;
...@@ -279,7 +281,11 @@ public class Spider implements Runnable, Task { ...@@ -279,7 +281,11 @@ public class Spider implements Runnable, Task {
} }
downloader.setThread(threadNum); downloader.setThread(threadNum);
if (threadPool == null || threadPool.isShutdown()) { if (threadPool == null || threadPool.isShutdown()) {
threadPool = new ThreadPool(threadNum); if (executorService != null && !executorService.isShutdown()) {
threadPool = new CountableThreadPool(threadNum, executorService);
} else {
threadPool = new CountableThreadPool(threadNum);
}
} }
if (startRequests != null) { if (startRequests != null) {
for (Request request : startRequests) { for (Request request : startRequests) {
...@@ -330,7 +336,7 @@ public class Spider implements Runnable, Task { ...@@ -330,7 +336,7 @@ public class Spider implements Runnable, Task {
} }
protected void onError(Request request) { protected void onError(Request request) {
if (CollectionUtils.isNotEmpty(spiderListeners)){ if (CollectionUtils.isNotEmpty(spiderListeners)) {
for (SpiderListener spiderListener : spiderListeners) { for (SpiderListener spiderListener : spiderListeners) {
spiderListener.onError(request); spiderListener.onError(request);
} }
...@@ -338,7 +344,7 @@ public class Spider implements Runnable, Task { ...@@ -338,7 +344,7 @@ public class Spider implements Runnable, Task {
} }
protected void onSuccess(Request request) { protected void onSuccess(Request request) {
if (CollectionUtils.isNotEmpty(spiderListeners)){ if (CollectionUtils.isNotEmpty(spiderListeners)) {
for (SpiderListener spiderListener : spiderListeners) { for (SpiderListener spiderListener : spiderListeners) {
spiderListener.onSuccess(request); spiderListener.onSuccess(request);
} }
...@@ -521,8 +527,7 @@ public class Spider implements Runnable, Task { ...@@ -521,8 +527,7 @@ public class Spider implements Runnable, Task {
newUrlCondition.await(); newUrlCondition.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.warn("waitNewUrl - interrupted, error {}", e); logger.warn("waitNewUrl - interrupted, error {}", e);
} } finally {
finally {
newUrlLock.unlock(); newUrlLock.unlock();
} }
} }
...@@ -563,6 +568,21 @@ public class Spider implements Runnable, Task { ...@@ -563,6 +568,21 @@ public class Spider implements Runnable, Task {
return this; return this;
} }
/**
* start with more than one threads
*
* @param threadNum
* @return this
*/
public Spider thread(ExecutorService executorService, int threadNum) {
checkIfRunning();
this.threadNum = threadNum;
if (threadNum <= 0) {
throw new IllegalArgumentException("threadNum should be more than one!");
}
return this;
}
public boolean isExitWhenComplete() { public boolean isExitWhenComplete() {
return exitWhenComplete; return exitWhenComplete;
} }
...@@ -637,6 +657,9 @@ public class Spider implements Runnable, Task { ...@@ -637,6 +657,9 @@ public class Spider implements Runnable, Task {
* @since 0.4.1 * @since 0.4.1
*/ */
public int getThreadAlive() { public int getThreadAlive() {
if (threadPool == null) {
return 0;
}
return threadPool.getThreadAlive(); return threadPool.getThreadAlive();
} }
...@@ -667,7 +690,8 @@ public class Spider implements Runnable, Task { ...@@ -667,7 +690,8 @@ public class Spider implements Runnable, Task {
} }
public Spider setExecutorService(ExecutorService executorService) { public Spider setExecutorService(ExecutorService executorService) {
this.threadPool.setExecutorService(executorService); checkIfRunning();
this.executorService = executorService;
return this; return this;
} }
......
...@@ -7,10 +7,16 @@ import java.util.concurrent.locks.Condition; ...@@ -7,10 +7,16 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/** /**
* Thread pool for workers.<br></br>
* Use {@link java.util.concurrent.ExecutorService} as inner implement. <br></br>
* New feature: <br></br>
* 1. Block when thread pool is full to avoid poll many urls but not process. <br></br>
* 2. Count of thread alive for monitor.
*
* @author code4crafer@gmail.com * @author code4crafer@gmail.com
* @since 0.5.0 * @since 0.5.0
*/ */
public class ThreadPool { public class CountableThreadPool {
private int threadNum; private int threadNum;
...@@ -20,12 +26,12 @@ public class ThreadPool { ...@@ -20,12 +26,12 @@ public class ThreadPool {
private Condition condition = reentrantLock.newCondition(); private Condition condition = reentrantLock.newCondition();
public ThreadPool(int threadNum) { public CountableThreadPool(int threadNum) {
this.threadNum = threadNum; this.threadNum = threadNum;
this.executorService = Executors.newFixedThreadPool(threadNum); this.executorService = Executors.newFixedThreadPool(threadNum);
} }
public ThreadPool(int threadNum, ExecutorService executorService) { public CountableThreadPool(int threadNum, ExecutorService executorService) {
this.threadNum = threadNum; this.threadNum = threadNum;
this.executorService = executorService; this.executorService = executorService;
} }
......
...@@ -13,6 +13,12 @@ import java.util.concurrent.TimeUnit; ...@@ -13,6 +13,12 @@ import java.util.concurrent.TimeUnit;
*/ */
public class ThreadUtils { public class ThreadUtils {
/**
* @Deprecated
* @param threadSize
* @return
* @see us.codecraft.webmagic.selector.thread.CountableThreadPool
*/
public static ExecutorService newFixedThreadPool(int threadSize) { public static ExecutorService newFixedThreadPool(int threadSize) {
if (threadSize <= 0) { if (threadSize <= 0) {
throw new IllegalArgumentException("ThreadSize must be greater than 0!"); throw new IllegalArgumentException("ThreadSize must be greater than 0!");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment