Commit 57106145 authored by yihua.huang's avatar yihua.huang

#58 add CYCLE_TRIED_TIMES support to QueueScheduler and PriorityScheduler

parent a79ae6a9
package us.codecraft.webmagic.scheduler;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Base Scheduler with duplicated urls removed locally.
*
* @author code4crafter@gmail.com
* @since 0.5.0
*/
public abstract class LocalDuplicatedRemovedScheduler implements Scheduler {
protected Logger logger = LoggerFactory.getLogger(getClass());
private Set<String> urls = Sets.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
@Override
public void push(Request request, Task task) {
logger.debug("push to queue " + request.getUrl());
if (request.getExtra(Request.CYCLE_TRIED_TIMES) != null || urls.add(request.getUrl())) {
pushWhenNoDuplicate(request, task);
}
}
protected abstract void pushWhenNoDuplicate(Request request, Task task);
}
package us.codecraft.webmagic.scheduler;
import org.apache.http.annotation.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.utils.NumberUtils;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
......@@ -21,12 +17,10 @@ import java.util.concurrent.PriorityBlockingQueue;
* @since 0.2.1
*/
@ThreadSafe
public class PriorityScheduler implements Scheduler {
public class PriorityScheduler extends LocalDuplicatedRemovedScheduler {
public static final int INITIAL_CAPACITY = 5;
private Logger logger = LoggerFactory.getLogger(getClass());
private BlockingQueue<Request> noPriorityQueue = new LinkedBlockingQueue<Request>();
private PriorityBlockingQueue<Request> priorityQueuePlus = new PriorityBlockingQueue<Request>(INITIAL_CAPACITY, new Comparator<Request>() {
......@@ -43,12 +37,9 @@ public class PriorityScheduler implements Scheduler {
}
});
private Set<String> urls = new HashSet<String>();
@Override
public synchronized void push(Request request, Task task) {
logger.debug("push to queue " + request.getUrl());
if (urls.add(request.getUrl())) {
public void pushWhenNoDuplicate(Request request, Task task) {
if (request.getPriority() == 0) {
noPriorityQueue.add(request);
} else if (request.getPriority() > 0) {
......@@ -57,7 +48,6 @@ public class PriorityScheduler implements Scheduler {
priorityQueueMinus.put(request);
}
}
}
@Override
public synchronized Request poll(Task task) {
......
package us.codecraft.webmagic.scheduler;
import org.apache.http.annotation.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
......@@ -20,25 +16,15 @@ import java.util.concurrent.LinkedBlockingQueue;
* @since 0.1.0
*/
@ThreadSafe
public class QueueScheduler implements Scheduler {
private Logger logger = LoggerFactory.getLogger(getClass());
public class QueueScheduler extends LocalDuplicatedRemovedScheduler {
private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();
private Set<String> urls = new HashSet<String>();
@Override
public synchronized void push(Request request, Task task) {
if (logger.isDebugEnabled()) {
logger.debug("push to queue " + request.getUrl());
}
if (urls.add(request.getUrl())) {
public void pushWhenNoDuplicate(Request request, Task task) {
queue.add(request);
}
}
@Override
public synchronized Request poll(Task task) {
return queue.poll();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment