Commit d1140b9e authored by yihua.huang's avatar yihua.huang

add bloom filter for scheduler #118

parent 64293cba
...@@ -4,6 +4,7 @@ import org.slf4j.Logger; ...@@ -4,6 +4,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task; import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
/** /**
* Remove duplicate urls and only push urls which are not duplicate.<br></br> * Remove duplicate urls and only push urls which are not duplicate.<br></br>
...@@ -11,30 +12,29 @@ import us.codecraft.webmagic.Task; ...@@ -11,30 +12,29 @@ import us.codecraft.webmagic.Task;
* @author code4crafer@gmail.com * @author code4crafer@gmail.com
* @since 0.5.0 * @since 0.5.0
*/ */
public abstract class DuplicatedRemoveScheduler implements Scheduler { public abstract class DuplicateRemovedScheduler implements Scheduler {
protected Logger logger = LoggerFactory.getLogger(getClass()); protected Logger logger = LoggerFactory.getLogger(getClass());
private DuplicateRemover duplicatedRemover;
public DuplicateRemover getDuplicateRemover() {
return duplicatedRemover;
}
public void setDuplicateRemover(DuplicateRemover duplicatedRemover) {
this.duplicatedRemover = duplicatedRemover;
}
@Override @Override
public void push(Request request, Task task) { public void push(Request request, Task task) {
logger.trace("get a candidate url {}", request.getUrl()); logger.trace("get a candidate url {}", request.getUrl());
if (!isDuplicate(request, task) || shouldReserved(request)) { if (!duplicatedRemover.isDuplicate(request, task) || shouldReserved(request)) {
logger.debug("push to queue {}", request.getUrl()); logger.debug("push to queue {}", request.getUrl());
pushWhenNoDuplicate(request, task); pushWhenNoDuplicate(request, task);
} }
} }
/**
* Reset duplicate check.
*/
public abstract void resetDuplicateCheck(Task task);
/**
* @param request
* @return
*/
protected abstract boolean isDuplicate(Request request, Task task);
protected boolean shouldReserved(Request request) { protected boolean shouldReserved(Request request) {
return request.getExtra(Request.CYCLE_TRIED_TIMES) != null; return request.getExtra(Request.CYCLE_TRIED_TIMES) != null;
} }
......
...@@ -17,7 +17,7 @@ import java.util.concurrent.PriorityBlockingQueue; ...@@ -17,7 +17,7 @@ import java.util.concurrent.PriorityBlockingQueue;
* @since 0.2.1 * @since 0.2.1
*/ */
@ThreadSafe @ThreadSafe
public class PriorityScheduler extends LocalDuplicatedRemoveScheduler { public class PriorityScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {
public static final int INITIAL_CAPACITY = 5; public static final int INITIAL_CAPACITY = 5;
...@@ -65,4 +65,9 @@ public class PriorityScheduler extends LocalDuplicatedRemoveScheduler { ...@@ -65,4 +65,9 @@ public class PriorityScheduler extends LocalDuplicatedRemoveScheduler {
public int getLeftRequestsCount(Task task) { public int getLeftRequestsCount(Task task) {
return noPriorityQueue.size(); return noPriorityQueue.size();
} }
@Override
public int getTotalRequestsCount(Task task) {
return getDuplicateRemover().getTotalRequestsCount(task);
}
} }
...@@ -16,7 +16,7 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -16,7 +16,7 @@ import java.util.concurrent.LinkedBlockingQueue;
* @since 0.1.0 * @since 0.1.0
*/ */
@ThreadSafe @ThreadSafe
public class QueueScheduler extends LocalDuplicatedRemoveScheduler { public class QueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {
private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>(); private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();
...@@ -34,4 +34,9 @@ public class QueueScheduler extends LocalDuplicatedRemoveScheduler { ...@@ -34,4 +34,9 @@ public class QueueScheduler extends LocalDuplicatedRemoveScheduler {
public int getLeftRequestsCount(Task task) { public int getLeftRequestsCount(Task task) {
return queue.size(); return queue.size();
} }
@Override
public int getTotalRequestsCount(Task task) {
return getDuplicateRemover().getTotalRequestsCount(task);
}
} }
package us.codecraft.webmagic.scheduler.component;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicInteger;
/**
* BloomFilterDuplicateRemover for huge number of urls.
*
* @author code4crafer@gmail.com
* @since 0.5.1
*/
public class BloomFilterDuplicateRemover implements DuplicateRemover {
private int expectedInsertions;
private double fpp;
private AtomicInteger counter;
public BloomFilterDuplicateRemover(int expectedInsertions) {
this(expectedInsertions, 0.03);
}
public BloomFilterDuplicateRemover(int expectedInsertions, double fpp) {
this.expectedInsertions = expectedInsertions;
this.fpp = fpp;
this.bloomFilter = rebuildBloomFilter();
}
protected BloomFilter<CharSequence> rebuildBloomFilter() {
counter = new AtomicInteger(0);
return BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), expectedInsertions, fpp);
}
private final BloomFilter<CharSequence> bloomFilter;
@Override
public boolean isDuplicate(Request request, Task task) {
boolean isDuplicate = bloomFilter.mightContain(request.getUrl());
if (!isDuplicate) {
bloomFilter.apply(request.getUrl());
counter.incrementAndGet();
}
return isDuplicate;
}
@Override
public void resetDuplicateCheck(Task task) {
rebuildBloomFilter();
}
@Override
public int getTotalRequestsCount(Task task) {
return counter.get();
}
}
package us.codecraft.webmagic.scheduler.component;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
/**
* Remove duplicate requests.
* @author code4crafer@gmail.com
* @since 0.5.1
*/
public interface DuplicateRemover {
/**
*
* Check whether the request is duplicate.
*
* @param request
* @param task
* @return
*/
public boolean isDuplicate(Request request, Task task);
/**
* Reset duplicate check.
* @param task
*/
public void resetDuplicateCheck(Task task);
/**
* Get TotalRequestsCount for monitor.
* @param task
* @return
*/
public int getTotalRequestsCount(Task task);
}
package us.codecraft.webmagic.scheduler; package us.codecraft.webmagic.scheduler.component;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Request;
...@@ -8,23 +8,20 @@ import java.util.Set; ...@@ -8,23 +8,20 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
* Base Scheduler with duplicated urls removed by hash set.<br></br> * @author code4crafer@gmail.com
*
* @author code4crafter@gmail.com
* @since 0.5.0
*/ */
public abstract class LocalDuplicatedRemoveScheduler extends DuplicatedRemoveScheduler implements MonitorableScheduler { public class HashSetDuplicateRemover implements DuplicateRemover {
private Set<String> urls = Sets.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); private Set<String> urls = Sets.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
@Override @Override
public void resetDuplicateCheck(Task task) { public boolean isDuplicate(Request request, Task task) {
urls.clear(); return !urls.add(request.getUrl());
} }
@Override @Override
protected boolean isDuplicate(Request request, Task task) { public void resetDuplicateCheck(Task task) {
return !urls.add(request.getUrl()); urls.clear();
} }
@Override @Override
......
package us.codecraft.webmagic.scheduler;
import org.junit.Test;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.scheduler.component.BloomFilterDuplicateRemover;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author code4crafer@gmail.com
*/
public class BloomFilterDuplicateRemoverTest {
@Test
public void testRemove() throws Exception {
BloomFilterDuplicateRemover bloomFilterDuplicateRemover = new BloomFilterDuplicateRemover(10);
boolean isDuplicate = bloomFilterDuplicateRemover.isDuplicate(new Request("a"), null);
assertThat(isDuplicate).isFalse();
isDuplicate = bloomFilterDuplicateRemover.isDuplicate(new Request("a"), null);
assertThat(isDuplicate);
isDuplicate = bloomFilterDuplicateRemover.isDuplicate(new Request("b"), null);
assertThat(isDuplicate).isFalse();
isDuplicate = bloomFilterDuplicateRemover.isDuplicate(new Request("b"), null);
assertThat(isDuplicate);
}
}
...@@ -2,8 +2,6 @@ package us.codecraft.webmagic.scheduler; ...@@ -2,8 +2,6 @@ package us.codecraft.webmagic.scheduler;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task; import us.codecraft.webmagic.Task;
...@@ -23,9 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -23,9 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author code4crafter@gmail.com <br> * @author code4crafter@gmail.com <br>
* @since 0.2.0 * @since 0.2.0
*/ */
public class FileCacheQueueScheduler extends LocalDuplicatedRemoveScheduler { public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {
private Logger logger = LoggerFactory.getLogger(getClass());
private String filePath = System.getProperty("java.io.tmpdir"); private String filePath = System.getProperty("java.io.tmpdir");
...@@ -166,4 +162,9 @@ public class FileCacheQueueScheduler extends LocalDuplicatedRemoveScheduler { ...@@ -166,4 +162,9 @@ public class FileCacheQueueScheduler extends LocalDuplicatedRemoveScheduler {
public int getLeftRequestsCount(Task task) { public int getLeftRequestsCount(Task task) {
return queue.size(); return queue.size();
} }
@Override
public int getTotalRequestsCount(Task task) {
return getDuplicateRemover().getTotalRequestsCount(task);
}
} }
...@@ -7,6 +7,7 @@ import redis.clients.jedis.JedisPool; ...@@ -7,6 +7,7 @@ import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.JedisPoolConfig;
import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task; import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.component.DuplicateRemover;
/** /**
* Use Redis as url scheduler for distributed crawlers.<br> * Use Redis as url scheduler for distributed crawlers.<br>
...@@ -14,7 +15,7 @@ import us.codecraft.webmagic.Task; ...@@ -14,7 +15,7 @@ import us.codecraft.webmagic.Task;
* @author code4crafter@gmail.com <br> * @author code4crafter@gmail.com <br>
* @since 0.2.0 * @since 0.2.0
*/ */
public class RedisScheduler extends DuplicatedRemoveScheduler implements MonitorableScheduler { public class RedisScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, DuplicateRemover {
private JedisPool pool; private JedisPool pool;
...@@ -43,7 +44,7 @@ public class RedisScheduler extends DuplicatedRemoveScheduler implements Monitor ...@@ -43,7 +44,7 @@ public class RedisScheduler extends DuplicatedRemoveScheduler implements Monitor
} }
@Override @Override
protected boolean isDuplicate(Request request, Task task) { public boolean isDuplicate(Request request, Task task) {
Jedis jedis = pool.getResource(); Jedis jedis = pool.getResource();
try { try {
boolean isDuplicate = jedis.sismember(getSetKey(task), request.getUrl()); boolean isDuplicate = jedis.sismember(getSetKey(task), request.getUrl());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment