Commit 0f2c5b57 authored by yihua.huang's avatar yihua.huang

update redisscheduler

parent 787b9529
...@@ -148,4 +148,15 @@ public class Page { ...@@ -148,4 +148,15 @@ public class Page {
public ResultItems getResultItems() { public ResultItems getResultItems() {
return resultItems; return resultItems;
} }
@Override
public String toString() {
return "Page{" +
"request=" + request +
", resultItems=" + resultItems +
", html=" + html +
", url=" + url +
", targetRequests=" + targetRequests +
'}';
}
} }
...@@ -113,4 +113,13 @@ public class Request implements Serializable { ...@@ -113,4 +113,13 @@ public class Request implements Serializable {
public void setUrl(String url) { public void setUrl(String url) {
this.url = url; this.url = url;
} }
@Override
public String toString() {
return "Request{" +
"url='" + url + '\'' +
", extras=" + extras +
", priority=" + priority +
'}';
}
} }
...@@ -40,33 +40,33 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -40,33 +40,33 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class Spider implements Runnable, Task { public class Spider implements Runnable, Task {
private Downloader downloader; protected Downloader downloader;
private List<Pipeline> pipelines = new ArrayList<Pipeline>(); protected List<Pipeline> pipelines = new ArrayList<Pipeline>();
private PageProcessor pageProcessor; protected PageProcessor pageProcessor;
private List<String> startUrls; protected List<String> startUrls;
private Site site; protected Site site;
private String uuid; protected String uuid;
private Scheduler scheduler = new QueueScheduler(); protected Scheduler scheduler = new QueueScheduler();
private Logger logger = Logger.getLogger(getClass()); protected Logger logger = Logger.getLogger(getClass());
private ExecutorService executorService; protected ExecutorService executorService;
private int threadNum = 1; protected int threadNum = 1;
private AtomicInteger stat = new AtomicInteger(STAT_INIT); protected AtomicInteger stat = new AtomicInteger(STAT_INIT);
private final static int STAT_INIT = 0; protected final static int STAT_INIT = 0;
private final static int STAT_RUNNING = 1; protected final static int STAT_RUNNING = 1;
private final static int STAT_STOPPED = 2; protected final static int STAT_STOPPED = 2;
/** /**
* 使用已定义的抽取规则新建一个Spider。 * 使用已定义的抽取规则新建一个Spider。
...@@ -206,7 +206,7 @@ public class Spider implements Runnable, Task { ...@@ -206,7 +206,7 @@ public class Spider implements Runnable, Task {
destroy(); destroy();
} }
private void destroy() { protected void destroy() {
destroyEach(downloader); destroyEach(downloader);
destroyEach(pageProcessor); destroyEach(pageProcessor);
for (Pipeline pipeline : pipelines) { for (Pipeline pipeline : pipelines) {
...@@ -233,7 +233,7 @@ public class Spider implements Runnable, Task { ...@@ -233,7 +233,7 @@ public class Spider implements Runnable, Task {
} }
} }
private void processRequest(Request request) { protected void processRequest(Request request) {
Page page = downloader.download(request, this); Page page = downloader.download(request, this);
if (page == null) { if (page == null) {
sleep(site.getSleepTime()); sleep(site.getSleepTime());
...@@ -249,7 +249,7 @@ public class Spider implements Runnable, Task { ...@@ -249,7 +249,7 @@ public class Spider implements Runnable, Task {
sleep(site.getSleepTime()); sleep(site.getSleepTime());
} }
private void sleep(int time) { protected void sleep(int time) {
try { try {
Thread.sleep(time); Thread.sleep(time);
} catch (InterruptedException e) { } catch (InterruptedException e) {
...@@ -257,7 +257,7 @@ public class Spider implements Runnable, Task { ...@@ -257,7 +257,7 @@ public class Spider implements Runnable, Task {
} }
} }
private void addRequest(Page page) { protected void addRequest(Page page) {
if (CollectionUtils.isNotEmpty(page.getTargetRequests())) { if (CollectionUtils.isNotEmpty(page.getTargetRequests())) {
for (Request request : page.getTargetRequests()) { for (Request request : page.getTargetRequests()) {
scheduler.push(request, this); scheduler.push(request, this);
...@@ -265,7 +265,7 @@ public class Spider implements Runnable, Task { ...@@ -265,7 +265,7 @@ public class Spider implements Runnable, Task {
} }
} }
private void checkIfNotRunning() { protected void checkIfNotRunning() {
if (!stat.compareAndSet(STAT_INIT, STAT_INIT)) { if (!stat.compareAndSet(STAT_INIT, STAT_INIT)) {
throw new IllegalStateException("Spider is already running!"); throw new IllegalStateException("Spider is already running!");
} }
......
...@@ -66,13 +66,7 @@ public class HttpClientDownloader implements Downloader { ...@@ -66,13 +66,7 @@ public class HttpClientDownloader implements Downloader {
} }
// //
handleGzip(httpResponse); handleGzip(httpResponse);
String content = IOUtils.toString(httpResponse.getEntity().getContent(), return handleResponse(request, charset, httpResponse,task);
charset);
Page page = new Page();
page.setHtml(new Html(UrlUtils.fixAllRelativeHrefs(content, request.getUrl())));
page.setUrl(new PlainText(request.getUrl()));
page.setRequest(request);
return page;
} else { } else {
logger.warn("code error " + statusCode + "\t" + request.getUrl()); logger.warn("code error " + statusCode + "\t" + request.getUrl());
} }
...@@ -82,6 +76,16 @@ public class HttpClientDownloader implements Downloader { ...@@ -82,6 +76,16 @@ public class HttpClientDownloader implements Downloader {
return null; return null;
} }
protected Page handleResponse(Request request, String charset, HttpResponse httpResponse,Task task) throws IOException {
String content = IOUtils.toString(httpResponse.getEntity().getContent(),
charset);
Page page = new Page();
page.setHtml(new Html(UrlUtils.fixAllRelativeHrefs(content, request.getUrl())));
page.setUrl(new PlainText(request.getUrl()));
page.setRequest(request);
return page;
}
@Override @Override
public void setThread(int thread) { public void setThread(int thread) {
poolSize=thread; poolSize=thread;
......
...@@ -4,8 +4,8 @@ import org.apache.commons.codec.digest.DigestUtils; ...@@ -4,8 +4,8 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import us.codecraft.webmagic.ResultItems; import us.codecraft.webmagic.ResultItems;
import us.codecraft.webmagic.Task; import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.utils.FilePersistentBase;
import java.io.File;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
...@@ -18,9 +18,7 @@ import java.util.Map; ...@@ -18,9 +18,7 @@ import java.util.Map;
* Date: 13-4-21 * Date: 13-4-21
* Time: 下午6:28 * Time: 下午6:28
*/ */
public class FilePipeline implements Pipeline { public class FilePipeline extends FilePersistentBase implements Pipeline {
private String path = "/data/webmagic/";
private Logger logger = Logger.getLogger(getClass()); private Logger logger = Logger.getLogger(getClass());
...@@ -28,7 +26,7 @@ public class FilePipeline implements Pipeline { ...@@ -28,7 +26,7 @@ public class FilePipeline implements Pipeline {
* 新建一个FilePipeline,使用默认保存路径"/data/webmagic/" * 新建一个FilePipeline,使用默认保存路径"/data/webmagic/"
*/ */
public FilePipeline() { public FilePipeline() {
setPath("/data/webmagic/");
} }
/** /**
...@@ -37,21 +35,14 @@ public class FilePipeline implements Pipeline { ...@@ -37,21 +35,14 @@ public class FilePipeline implements Pipeline {
* @param path 文件保存路径 * @param path 文件保存路径
*/ */
public FilePipeline(String path) { public FilePipeline(String path) {
if (!path.endsWith("/")&&!path.endsWith("\\")){ setPath(path);
path+="/";
}
this.path = path;
} }
@Override @Override
public void process(ResultItems resultItems, Task task) { public void process(ResultItems resultItems, Task task) {
String path = this.path + "/" + task.getUUID() + "/"; String path = this.path + PATH_SEPERATOR + task.getUUID() + PATH_SEPERATOR;
File file = new File(path);
if (!file.exists()) {
file.mkdirs();
}
try { try {
PrintWriter printWriter = new PrintWriter(new FileWriter(path + DigestUtils.md5Hex(resultItems.getRequest().getUrl()) + ".html")); PrintWriter printWriter = new PrintWriter(new FileWriter(getFile(path + DigestUtils.md5Hex(resultItems.getRequest().getUrl()) + ".html")));
printWriter.println("url:\t" + resultItems.getRequest().getUrl()); printWriter.println("url:\t" + resultItems.getRequest().getUrl());
for (Map.Entry<String, Object> entry : resultItems.getAll().entrySet()) { for (Map.Entry<String, Object> entry : resultItems.getAll().entrySet()) {
if (entry.getValue() instanceof Iterable) { if (entry.getValue() instanceof Iterable) {
......
package us.codecraft.webmagic.utils;
import java.io.File;
/**
* 文件持久化的基础类。<br>
*
* @author code4crafter@gmail.com <br>
* Date: 13-8-11 <br>
* Time: 下午4:21 <br>
*/
public class FilePersistentBase {
protected String path;
public static String PATH_SEPERATOR = "/";
static {
String property = System.getProperties().getProperty("file.separator");
if (property != null) {
PATH_SEPERATOR = property;
}
}
public void setPath(String path) {
this.path = path;
if (!path.endsWith(PATH_SEPERATOR)) {
path += PATH_SEPERATOR;
}
}
public File getFile(String fullName) {
checkAndMakeParentDirecotry(fullName);
return new File(fullName);
}
public void checkAndMakeParentDirecotry(String fullName) {
int index = fullName.lastIndexOf(PATH_SEPERATOR);
if (index > 0) {
String path = fullName.substring(0, index);
File file = new File(path);
if (!file.exists()) {
file.mkdirs();
}
}
}
public String getPath() {
return path;
}
}
...@@ -2,6 +2,7 @@ package us.codecraft.webmagic.model; ...@@ -2,6 +2,7 @@ package us.codecraft.webmagic.model;
import us.codecraft.webmagic.Site; import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.Spider; import us.codecraft.webmagic.Spider;
import us.codecraft.webmagic.processor.PageProcessor;
/** /**
* 基于Model的Spider,封装后的入口类。<br> * 基于Model的Spider,封装后的入口类。<br>
...@@ -20,6 +21,10 @@ public class OOSpider extends Spider { ...@@ -20,6 +21,10 @@ public class OOSpider extends Spider {
this.modelPageProcessor = modelPageProcessor; this.modelPageProcessor = modelPageProcessor;
} }
public OOSpider(PageProcessor pageProcessor) {
super(pageProcessor);
}
/** /**
* 创建一个爬虫。<br> * 创建一个爬虫。<br>
* @param site * @param site
......
...@@ -7,8 +7,8 @@ import org.apache.log4j.Logger; ...@@ -7,8 +7,8 @@ import org.apache.log4j.Logger;
import us.codecraft.webmagic.Task; import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.model.HasKey; import us.codecraft.webmagic.model.HasKey;
import us.codecraft.webmagic.model.PageModelPipeline; import us.codecraft.webmagic.model.PageModelPipeline;
import us.codecraft.webmagic.utils.FilePersistentBase;
import java.io.File;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
...@@ -21,38 +21,29 @@ import java.io.PrintWriter; ...@@ -21,38 +21,29 @@ import java.io.PrintWriter;
* Date: 13-4-21 * Date: 13-4-21
* Time: 下午6:28 * Time: 下午6:28
*/ */
public class JsonFilePageModelPipeline implements PageModelPipeline { public class JsonFilePageModelPipeline extends FilePersistentBase implements PageModelPipeline {
private String path = "/data/webmagic/";
private Logger logger = Logger.getLogger(getClass()); private Logger logger = Logger.getLogger(getClass());
/** /**
* 新建一个FilePipeline,使用默认保存路径"/data/webmagic/" * 新建一个JsonFilePageModelPipeline,使用默认保存路径"/data/webmagic/"
*/ */
public JsonFilePageModelPipeline() { public JsonFilePageModelPipeline() {
setPath("/data/webmagic/");
} }
/** /**
* 新建一个FilePipeline * 新建一个JsonFilePageModelPipeline
* *
* @param path 文件保存路径 * @param path 文件保存路径
*/ */
public JsonFilePageModelPipeline(String path) { public JsonFilePageModelPipeline(String path) {
if (!path.endsWith("/") && !path.endsWith("\\")) { setPath(path);
path += "/";
}
this.path = path;
} }
@Override @Override
public void process(Object o, Task task) { public void process(Object o, Task task) {
String path = this.path + "/" + task.getUUID() + "/"; String path = this.path + "/" + task.getUUID() + "/";
File file = new File(path);
if (!file.exists()) {
file.mkdirs();
}
try { try {
String filename; String filename;
if (o instanceof HasKey) { if (o instanceof HasKey) {
...@@ -60,7 +51,7 @@ public class JsonFilePageModelPipeline implements PageModelPipeline { ...@@ -60,7 +51,7 @@ public class JsonFilePageModelPipeline implements PageModelPipeline {
} else { } else {
filename = path + DigestUtils.md5Hex(ToStringBuilder.reflectionToString(o)) + ".json"; filename = path + DigestUtils.md5Hex(ToStringBuilder.reflectionToString(o)) + ".json";
} }
PrintWriter printWriter = new PrintWriter(new FileWriter(filename)); PrintWriter printWriter = new PrintWriter(new FileWriter(getFile(filename)));
printWriter.write(JSON.toJSONString(o)); printWriter.write(JSON.toJSONString(o));
printWriter.close(); printWriter.close();
} catch (IOException e) { } catch (IOException e) {
......
...@@ -5,6 +5,7 @@ import org.apache.commons.codec.digest.DigestUtils; ...@@ -5,6 +5,7 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import us.codecraft.webmagic.ResultItems; import us.codecraft.webmagic.ResultItems;
import us.codecraft.webmagic.Task; import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.utils.FilePersistentBase;
import java.io.File; import java.io.File;
import java.io.FileWriter; import java.io.FileWriter;
...@@ -18,40 +19,31 @@ import java.io.PrintWriter; ...@@ -18,40 +19,31 @@ import java.io.PrintWriter;
* Date: 13-4-21 * Date: 13-4-21
* Time: 下午6:28 * Time: 下午6:28
*/ */
public class JsonFilePipeline implements Pipeline { public class JsonFilePipeline extends FilePersistentBase implements Pipeline {
private String path = "/data/webmagic/";
private Logger logger = Logger.getLogger(getClass()); private Logger logger = Logger.getLogger(getClass());
/** /**
* 新建一个FilePipeline,使用默认保存路径"/data/webmagic/" * 新建一个JsonFilePipeline,使用默认保存路径"/data/webmagic/"
*/ */
public JsonFilePipeline() { public JsonFilePipeline() {
setPath("/data/webmagic");
} }
/** /**
* 新建一个FilePipeline * 新建一个JsonFilePipeline
* *
* @param path 文件保存路径 * @param path 文件保存路径
*/ */
public JsonFilePipeline(String path) { public JsonFilePipeline(String path) {
if (!path.endsWith("/")&&!path.endsWith("\\")){ setPath(path);
path+="/";
}
this.path = path;
} }
@Override @Override
public void process(ResultItems resultItems, Task task) { public void process(ResultItems resultItems, Task task) {
String path = this.path + "/" + task.getUUID() + "/"; String path = this.path + "/" + task.getUUID() + "/";
File file = new File(path);
if (!file.exists()) {
file.mkdirs();
}
try { try {
PrintWriter printWriter = new PrintWriter(new FileWriter(path + DigestUtils.md5Hex(resultItems.getRequest().getUrl()) + ".json")); PrintWriter printWriter = new PrintWriter(new FileWriter(new File(path + DigestUtils.md5Hex(resultItems.getRequest().getUrl()) + ".json")));
printWriter.write(JSON.toJSONString(resultItems.getAll())); printWriter.write(JSON.toJSONString(resultItems.getAll()));
printWriter.close(); printWriter.close();
} catch (IOException e) { } catch (IOException e) {
......
...@@ -32,34 +32,42 @@ public class RedisScheduler implements Scheduler { ...@@ -32,34 +32,42 @@ public class RedisScheduler implements Scheduler {
@Override @Override
public synchronized void push(Request request, Task task) { public synchronized void push(Request request, Task task) {
Jedis jedis = pool.getResource(); Jedis jedis = pool.getResource();
//使用SortedSet进行url去重 try {
if (jedis.zrank(SET_PREFIX + task.getUUID(), request.getUrl()) == null) { //使用Set进行url去重
if (!jedis.sismember(SET_PREFIX + task.getUUID(), request.getUrl())) {
//使用List保存队列 //使用List保存队列
jedis.rpush(QUEUE_PREFIX + task.getUUID(), request.getUrl()); jedis.rpush(QUEUE_PREFIX + task.getUUID(), request.getUrl());
jedis.zadd(SET_PREFIX + task.getUUID(), request.getPriority(), request.getUrl()); jedis.sadd(SET_PREFIX + task.getUUID(), request.getUrl());
if (request.getExtras() != null) { if (request.getExtras() != null) {
String key = ITEM_PREFIX + DigestUtils.shaHex(request.getUrl()); String field = DigestUtils.shaHex(request.getUrl());
byte[] bytes = JSON.toJSONString(request).getBytes(); String value = JSON.toJSONString(request);
jedis.set(key.getBytes(), bytes); jedis.hset((ITEM_PREFIX + task.getUUID()), field, value);
} }
} }
} finally {
pool.returnResource(jedis); pool.returnResource(jedis);
} }
}
@Override @Override
public synchronized Request poll(Task task) { public synchronized Request poll(Task task) {
Jedis jedis = pool.getResource(); Jedis jedis = pool.getResource();
try {
String url = jedis.lpop(QUEUE_PREFIX + task.getUUID()); String url = jedis.lpop(QUEUE_PREFIX + task.getUUID());
if (url == null) { if (url == null) {
return null; return null;
} }
String key = ITEM_PREFIX + DigestUtils.shaHex(url); String key = ITEM_PREFIX + task.getUUID();
byte[] bytes = jedis.get(key.getBytes()); String field = DigestUtils.shaHex(url);
byte[] bytes = jedis.hget(key.getBytes(),field.getBytes());
if (bytes != null) { if (bytes != null) {
Request o = JSON.parseObject(new String(bytes),Request.class); Request o = JSON.parseObject(new String(bytes), Request.class);
return o; return o;
} }
Request request = new Request(url);
return request;
} finally {
pool.returnResource(jedis); pool.returnResource(jedis);
return new Request(url); }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment