Commit 1b04a7f2 authored by yihua.huang's avatar yihua.huang

#527 move logic check from downloaderto spider

parent 6ead04a7
...@@ -4,6 +4,7 @@ import org.apache.commons.lang3.StringUtils; ...@@ -4,6 +4,7 @@ import org.apache.commons.lang3.StringUtils;
import us.codecraft.webmagic.selector.Html; import us.codecraft.webmagic.selector.Html;
import us.codecraft.webmagic.selector.Json; import us.codecraft.webmagic.selector.Json;
import us.codecraft.webmagic.selector.Selectable; import us.codecraft.webmagic.selector.Selectable;
import us.codecraft.webmagic.utils.HttpConstant;
import us.codecraft.webmagic.utils.UrlUtils; import us.codecraft.webmagic.utils.UrlUtils;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -41,15 +42,21 @@ public class Page { ...@@ -41,15 +42,21 @@ public class Page {
private Map<String,List<String>> headers; private Map<String,List<String>> headers;
private int statusCode; private int statusCode = HttpConstant.StatusCode.CODE_200;
private boolean needCycleRetry; private boolean downloadSuccess = true;
private List<Request> targetRequests = new ArrayList<Request>(); private List<Request> targetRequests = new ArrayList<Request>();
public Page() { public Page() {
} }
public static Page fail(){
Page page = new Page();
page.setDownloadSuccess(false);
return page;
}
public Page setSkip(boolean skip) { public Page setSkip(boolean skip) {
resultItems.setSkip(skip); resultItems.setSkip(skip);
return this; return this;
...@@ -179,14 +186,6 @@ public class Page { ...@@ -179,14 +186,6 @@ public class Page {
return request; return request;
} }
public boolean isNeedCycleRetry() {
return needCycleRetry;
}
public void setNeedCycleRetry(boolean needCycleRetry) {
this.needCycleRetry = needCycleRetry;
}
public void setRequest(Request request) { public void setRequest(Request request) {
this.request = request; this.request = request;
this.resultItems.setRequest(request); this.resultItems.setRequest(request);
...@@ -221,22 +220,27 @@ public class Page { ...@@ -221,22 +220,27 @@ public class Page {
this.headers = headers; this.headers = headers;
} }
public boolean isDownloadSuccess() {
return downloadSuccess;
}
public void setDownloadSuccess(boolean downloadSuccess) {
this.downloadSuccess = downloadSuccess;
}
@Override @Override
public String toString() { public String toString() {
return "Page{" + return "Page{" +
"request=" + request + "request=" + request +
", resultItems=" + resultItems + ", resultItems=" + resultItems +
", html=" + html +
", json=" + json +
", rawText='" + rawText + '\'' + ", rawText='" + rawText + '\'' +
", url=" + url + ", url=" + url +
", headers=" + headers + ", headers=" + headers +
", statusCode=" + statusCode + ", statusCode=" + statusCode +
", needCycleRetry=" + needCycleRetry + ", success=" + downloadSuccess +
", targetRequests=" + targetRequests + ", targetRequests=" + targetRequests +
", headers=" + headers+
'}'; '}';
} }
} }
package us.codecraft.webmagic; package us.codecraft.webmagic;
import us.codecraft.webmagic.utils.HttpConstant;
import java.util.*; import java.util.*;
/** /**
...@@ -40,7 +42,7 @@ public class Site { ...@@ -40,7 +42,7 @@ public class Site {
private boolean useGzip = true; private boolean useGzip = true;
static { static {
DEFAULT_STATUS_CODE_SET.add(200); DEFAULT_STATUS_CODE_SET.add(HttpConstant.StatusCode.CODE_200);
} }
/** /**
......
package us.codecraft.webmagic; package us.codecraft.webmagic;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.downloader.Downloader; import us.codecraft.webmagic.downloader.Downloader;
...@@ -398,19 +399,18 @@ public class Spider implements Runnable, Task { ...@@ -398,19 +399,18 @@ public class Spider implements Runnable, Task {
} }
} }
protected void processRequest(Request request) { private void processRequest(Request request) {
Page page = downloader.download(request, this); Page page = downloader.download(request, this);
if (page == null) { if (page.isDownloadSuccess()){
sleep(site.getSleepTime()); onDownloadSuccess(request, page);
onError(request); } else {
return; onDownloaderFail(request);
} }
// for cycle retry
if (page.isNeedCycleRetry()) {
extractAndAddRequests(page, true);
sleep(site.getRetrySleepTime());
return;
} }
private void onDownloadSuccess(Request request, Page page) {
onSuccess(request);
if (site.getAcceptStatCode().contains(page.getStatusCode())){
pageProcessor.process(page); pageProcessor.process(page);
extractAndAddRequests(page, spawnUrl); extractAndAddRequests(page, spawnUrl);
if (!page.getResultItems().isSkip()) { if (!page.getResultItems().isSkip()) {
...@@ -418,14 +418,40 @@ public class Spider implements Runnable, Task { ...@@ -418,14 +418,40 @@ public class Spider implements Runnable, Task {
pipeline.process(page.getResultItems(), this); pipeline.process(page.getResultItems(), this);
} }
} }
}
sleep(site.getSleepTime()); sleep(site.getSleepTime());
return;
}
private void onDownloaderFail(Request request) {
if (site.getCycleRetryTimes() == 0) {
sleep(site.getSleepTime());
} else {
// for cycle retry
doCycleRetry(request);
}
onError(request);
}
private void doCycleRetry(Request request) {
Object cycleTriedTimesObject = request.getExtra(Request.CYCLE_TRIED_TIMES);
if (cycleTriedTimesObject == null) {
addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, 1));
} else {
int cycleTriedTimes = (Integer) cycleTriedTimesObject;
cycleTriedTimes++;
if (cycleTriedTimes < site.getCycleRetryTimes()) {
addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, cycleTriedTimes));
}
}
sleep(site.getRetrySleepTime());
} }
protected void sleep(int time) { protected void sleep(int time) {
try { try {
Thread.sleep(time); Thread.sleep(time);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); logger.error("Thread interrupted when sleep",e);
} }
} }
......
...@@ -41,20 +41,4 @@ public abstract class AbstractDownloader implements Downloader { ...@@ -41,20 +41,4 @@ public abstract class AbstractDownloader implements Downloader {
protected void onError(Request request) { protected void onError(Request request) {
} }
protected Page addToCycleRetry(Request request, Site site) {
Page page = new Page();
Object cycleTriedTimesObject = request.getExtra(Request.CYCLE_TRIED_TIMES);
if (cycleTriedTimesObject == null) {
page.addTargetRequest(request.setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, 1));
} else {
int cycleTriedTimes = (Integer) cycleTriedTimesObject;
cycleTriedTimes++;
if (cycleTriedTimes >= site.getCycleRetryTimes()) {
return null;
}
page.addTargetRequest(request.setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, cycleTriedTimes));
}
page.setNeedCycleRetry(true);
return page;
}
} }
...@@ -77,27 +77,18 @@ public class HttpClientDownloader extends AbstractDownloader { ...@@ -77,27 +77,18 @@ public class HttpClientDownloader extends AbstractDownloader {
} }
logger.debug("downloading page {}", request.getUrl()); logger.debug("downloading page {}", request.getUrl());
CloseableHttpResponse httpResponse = null; CloseableHttpResponse httpResponse = null;
Site site = task.getSite(); CloseableHttpClient httpClient = getHttpClient(task.getSite());
CloseableHttpClient httpClient = getHttpClient(site); HttpClientRequestContext requestContext = httpUriRequestConverter.convert(request, task.getSite(), proxyProvider != null ? proxyProvider.getProxy(task) : null);
HttpClientRequestContext requestContext = httpUriRequestConverter.convert(request, site, proxyProvider != null ? proxyProvider.getProxy(task) : null);
try { try {
httpResponse = httpClient.execute(requestContext.getHttpUriRequest(), requestContext.getHttpClientContext()); httpResponse = httpClient.execute(requestContext.getHttpUriRequest(), requestContext.getHttpClientContext());
int statusCode = httpResponse.getStatusLine().getStatusCode(); Page page = handleResponse(request, task.getSite().getCharset(), httpResponse, task);
if (site.getAcceptStatCode().contains(statusCode)) {
Page page = handleResponse(request, site.getCharset(), httpResponse, task);
onSuccess(request); onSuccess(request);
logger.debug("downloading page success {}", page);
return page; return page;
} else {
logger.warn("get page {} error, status code {} ",request.getUrl(),statusCode);
return null;
}
} catch (IOException e) { } catch (IOException e) {
logger.warn("download page {} error", request.getUrl(), e); logger.warn("download page {} error", request.getUrl(), e);
if (site != null && site.getCycleRetryTimes() > 0) {
return addToCycleRetry(request, site);
}
onError(request); onError(request);
return null; return Page.fail();
} finally { } finally {
if (httpResponse != null) { if (httpResponse != null) {
//ensure the connection is released back to pool //ensure the connection is released back to pool
...@@ -118,6 +109,7 @@ public class HttpClientDownloader extends AbstractDownloader { ...@@ -118,6 +109,7 @@ public class HttpClientDownloader extends AbstractDownloader {
page.setUrl(new PlainText(request.getUrl())); page.setUrl(new PlainText(request.getUrl()));
page.setRequest(request); page.setRequest(request);
page.setStatusCode(httpResponse.getStatusLine().getStatusCode()); page.setStatusCode(httpResponse.getStatusLine().getStatusCode());
page.setDownloadSuccess(true);
if (responseHeader) { if (responseHeader) {
page.setHeaders(HttpClientUtils.convertHeaders(httpResponse.getAllHeaders())); page.setHeaders(HttpClientUtils.convertHeaders(httpResponse.getAllHeaders()));
} }
......
package us.codecraft.webmagic.proxy; package us.codecraft.webmagic.proxy;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Task; import us.codecraft.webmagic.Task;
/** /**
* Created by edwardsbean on 15-2-28. * Proxy provider. <br>
*
* @since 0.7.0
*/ */
public interface ProxyProvider { public interface ProxyProvider {
void returnProxy(Proxy proxy, boolean banned, Task task); /**
*
* @param proxy
* @param page
* @param task
*/
void returnProxy(Proxy proxy, Page page, Task task);
/**
* Get a proxy for task by some strategy.
* @param task task
* @return
*/
Proxy getProxy(Task task); Proxy getProxy(Task task);
} }
package us.codecraft.webmagic.proxy;
import org.apache.http.HttpResponse;
/**
* @author code4crafter@gmail.com
* Date: 17/3/20
* Time: 下午10:52
*/
public interface ResponseChecker {
boolean isBanned(HttpResponse httpResponse);
}
package us.codecraft.webmagic.proxy;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* >>>> Proxy lifecycle
+----------+ +-----+
| last use | | new |
+-----+----+ +---+-+
| +------+ |
+->| init |<--+
+--+---+
|
v
+--------+
+--->| borrow |
| +---+----+
| |+------------------+
| v
| +--------+
| | in use | Respone Time
| +---+----+
| |+------------------+
| v
| +--------+
| | return |
| +---+----+
| |+-------------------+
| v
| +-------+ reuse interval
| | delay | (delay time)
| +---+---+
| |+-------------------+
| v
| +------+
| | idle | idle time
| +---+--+
| |+-------------------+
+--------+
*/
/**
* Object has these status of lifecycle above.<br>
*
* @author yxssfxwzy@sina.com <br>
* @since 0.5.1
* @see TimerReuseProxyPool
*/
public class TimerReuseProxy extends Proxy implements Delayed, Serializable {
private static final long serialVersionUID = 228939737383625551L;
public static final int ERROR_403 = 403;
public static final int ERROR_404 = 404;
public static final int ERROR_BANNED = 10000;// banned by website
public static final int ERROR_Proxy = 10001;// the proxy itself failed
public static final int SUCCESS = 200;
private int reuseTimeInterval = 1500;// ms
private Long canReuseTime = 0L;
private Long lastBorrowTime = System.currentTimeMillis();
private Long responseTime = 0L;
private int failedNum = 0;
private int successNum = 0;
private int borrowNum = 0;
private List<Integer> failedErrorType = new ArrayList<Integer>();
public TimerReuseProxy(String host, int port, String username, String password) {
super(host, port, username, password);
}
public int getSuccessNum() {
return successNum;
}
public void successNumIncrement(int increment) {
this.successNum += increment;
}
public Long getLastUseTime() {
return lastBorrowTime;
}
public void setLastBorrowTime(Long lastBorrowTime) {
this.lastBorrowTime = lastBorrowTime;
}
public void recordResponse() {
this.responseTime = (System.currentTimeMillis() - lastBorrowTime + responseTime) / 2;
this.lastBorrowTime = System.currentTimeMillis();
}
public List<Integer> getFailedErrorType() {
return failedErrorType;
}
public void setFailedErrorType(List<Integer> failedErrorType) {
this.failedErrorType = failedErrorType;
}
public void fail(int failedErrorType) {
this.failedNum++;
this.failedErrorType.add(failedErrorType);
}
public void setFailedNum(int failedNum) {
this.failedNum = failedNum;
}
public int getFailedNum() {
return failedNum;
}
public String getFailedType() {
String re = "";
for (Integer i : this.failedErrorType) {
re += i + " . ";
}
return re;
}
public int getReuseTimeInterval() {
return reuseTimeInterval;
}
public void setReuseTimeInterval(int reuseTimeInterval) {
this.reuseTimeInterval = reuseTimeInterval;
this.canReuseTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(reuseTimeInterval, TimeUnit.MILLISECONDS);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(canReuseTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
TimerReuseProxy that = (TimerReuseProxy) o;
return canReuseTime > that.canReuseTime ? 1 : (canReuseTime < that.canReuseTime ? -1 : 0);
}
public void borrowNumIncrement(int increment) {
this.borrowNum += increment;
}
public int getBorrowNum() {
return borrowNum;
}
}
package us.codecraft.webmagic.proxy;
import us.codecraft.webmagic.Task;
/**
* Pooled Proxy Object
*
* @author yxssfxwzy@sina.com <br>
* @see Proxy
* @since 0.5.1
*/
public class TimerReuseProxyPool implements ProxyProvider {
@Override
public void returnProxy(Proxy proxy, boolean banned, Task task) {
}
@Override
public Proxy getProxy(Task task) {
return null;
}
// private Logger logger = LoggerFactory.getLogger(getClass());
//
// private BlockingQueue<TimerReuseProxy> proxyQueue = new DelayQueue<TimerReuseProxy>();
// private Map<String, TimerReuseProxy> allProxy = new ConcurrentHashMap<String, TimerReuseProxy>();
//
// private int reuseInterval = 1500;// ms
// private int reviveTime = 2 * 60 * 60 * 1000;// ms
// private int saveProxyInterval = 10 * 60 * 1000;// ms
//
// private boolean isEnable = false;
// private boolean validateWhenInit = false;
// // private boolean isUseLastProxy = true;
//
// public TimerReuseProxyPool(List<String[]> httpProxyList) {
// this(httpProxyList, true);
// }
//
// private void addProxy(Map<String, Proxy> httpProxyMap) {
// isEnable = true;
// for (Entry<String, Proxy> entry : httpProxyMap.entrySet()) {
// try {
// if (allProxy.containsKey(entry.getKey())) {
// continue;
// }
// if (!validateWhenInit || ProxyUtils.validateProxy(entry.getValue().getHttpHost())) {
// entry.getValue().setFailedNum(0);
// entry.getValue().setReuseTimeInterval(reuseInterval);
// proxyQueue.add(entry.getValue());
// allProxy.put(entry.getKey(), entry.getValue());
// }
// } catch (NumberFormatException e) {
// logger.error("HttpHost init error:", e);
// }
// }
// logger.info("proxy pool size>>>>" + allProxy.size());
// }
//
// public void addProxy(Proxy... httpProxyList) {
// isEnable = true;
// for (Proxy proxy : httpProxyList) {
// if (!validateWhenInit || ProxyUtils.validateProxy(proxy.getProxyHost())) {
// TimerReuseProxy p = new TimerReuseProxy(proxy.getProxyHost(), proxy.getUsername(), proxy.getPassword(), reuseInterval);
// proxyQueue.add(p);
// allProxy.put(p.getProxyHost().getHost(), p);
// }
// }
// logger.info("proxy pool size>>>>" + allProxy.size());
// }
//
// public TimerReuseProxy getProxy() {
// TimerReuseProxy proxy = null;
// try {
// Long time = System.currentTimeMillis();
// proxy = proxyQueue.take();
// double costTime = (System.currentTimeMillis() - time) / 1000.0;
// if (costTime > reuseInterval) {
// logger.info("get proxy time >>>> " + costTime);
// }
// TimerReuseProxy p = allProxy.get(proxy.getProxyHost().getHost());
// p.setLastBorrowTime(System.currentTimeMillis());
// p.borrowNumIncrement(1);
// } catch (InterruptedException e) {
// logger.error("get proxy error", e);
// }
// if (proxy == null) {
// throw new NoSuchElementException();
// }
// return proxy;
// }
//
// public void returnProxy(Proxy proxy, int statusCode) {
// TimerReuseProxy p = allProxy.get(proxy.getProxyHost());
// if (p == null) {
// return;
// }
// switch (statusCode) {
// case TimerReuseProxy.SUCCESS:
// p.setReuseTimeInterval(reuseInterval);
// p.setFailedNum(0);
// p.setFailedErrorType(new ArrayList<Integer>());
// p.recordResponse();
// p.successNumIncrement(1);
// break;
// case TimerReuseProxy.ERROR_403:
// // banned,try longer interval
// p.fail(TimerReuseProxy.ERROR_403);
// p.setReuseTimeInterval(reuseInterval * p.getFailedNum());
// logger.info(proxy + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0);
// break;
// case TimerReuseProxy.ERROR_BANNED:
// p.fail(TimerReuseProxy.ERROR_BANNED);
// p.setReuseTimeInterval(10 * 60 * 1000 * p.getFailedNum());
// logger.info(proxy + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0);
// break;
// case TimerReuseProxy.ERROR_404:
// // p.fail(Proxy.ERROR_404);
// // p.setReuseTimeInterval(reuseInterval * p.getFailedNum());
// break;
// default:
// p.fail(statusCode);
// break;
// }
// if (p.getFailedNum() > 20) {
// p.setReuseTimeInterval(reviveTime);
// logger.error("remove proxy >>>> " + proxy + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size());
// return;
// }
// if (p.getFailedNum() > 0 && p.getFailedNum() % 5 == 0) {
// if (!ProxyUtils.validateProxy(proxy)) {
// p.setReuseTimeInterval(reviveTime);
// logger.error("remove proxy >>>> " + proxy + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size());
// return;
// }
// }
// try {
// proxyQueue.put(p);
// } catch (InterruptedException e) {
// logger.warn("proxyQueue return proxy error", e);
// }
// }
//
// public String allProxyStatus() {
// String re = "all proxy info >>>> \n";
// for (Entry<String, Proxy> entry : allProxy.entrySet()) {
// re += entry.getValue().toString() + "\n";
// }
// return re;
// }
//
// public int getIdleNum() {
// return proxyQueue.size();
// }
//
// public int getReuseInterval() {
// return reuseInterval;
// }
//
// public void setReuseInterval(int reuseInterval) {
// this.reuseInterval = reuseInterval;
// }
//
// public void enable(boolean isEnable) {
// this.isEnable = isEnable;
// }
//
// public boolean isEnable() {
// return isEnable;
// }
//
// public int getReviveTime() {
// return reviveTime;
// }
//
// public void setReviveTime(int reviveTime) {
// this.reviveTime = reviveTime;
// }
//
// public boolean isValidateWhenInit() {
// return validateWhenInit;
// }
//
// public void validateWhenInit(boolean validateWhenInit) {
// this.validateWhenInit = validateWhenInit;
// }
//
// public int getSaveProxyInterval() {
// return saveProxyInterval;
// }
//
// public void setSaveProxyInterval(int saveProxyInterval) {
// this.saveProxyInterval = saveProxyInterval;
// }
//
// public String getProxyFilePath() {
// return proxyFilePath;
// }
//
// public void setProxyFilePath(String proxyFilePath) {
// this.proxyFilePath = proxyFilePath;
// }
}
...@@ -25,6 +25,12 @@ public abstract class HttpConstant { ...@@ -25,6 +25,12 @@ public abstract class HttpConstant {
} }
public static abstract class StatusCode {
public static final int CODE_200 = 200;
}
public static abstract class Header { public static abstract class Header {
public static final String REFERER = "Referer"; public static final String REFERER = "Referer";
......
...@@ -50,15 +50,12 @@ public class HttpClientDownloaderTest { ...@@ -50,15 +50,12 @@ public class HttpClientDownloaderTest {
} }
@Test @Test
public void testCycleTriedTimes() { public void test_download_fail() {
HttpClientDownloader httpClientDownloader = new HttpClientDownloader(); HttpClientDownloader httpClientDownloader = new HttpClientDownloader();
Task task = Site.me().setDomain("localhost").setCycleRetryTimes(5).toTask(); Task task = Site.me().setDomain("localhost").setCycleRetryTimes(5).toTask();
Request request = new Request(PAGE_ALWAYS_NOT_EXISTS); Request request = new Request(PAGE_ALWAYS_NOT_EXISTS);
Page page = httpClientDownloader.download(request, task); Page page = httpClientDownloader.download(request, task);
assertThat(page.getTargetRequests().size() > 0); assertThat(page.isDownloadSuccess()).isFalse();
assertThat((Integer) page.getTargetRequests().get(0).getExtra(Request.CYCLE_TRIED_TIMES)).isEqualTo(1);
page = httpClientDownloader.download(page.getTargetRequests().get(0), task);
assertThat((Integer) page.getTargetRequests().get(0).getExtra(Request.CYCLE_TRIED_TIMES)).isEqualTo(2);
} }
@Test @Test
......
...@@ -3,7 +3,6 @@ package us.codecraft.webmagic.downloader; ...@@ -3,7 +3,6 @@ package us.codecraft.webmagic.downloader;
import us.codecraft.webmagic.Page; import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task; import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.selector.Html;
import us.codecraft.webmagic.selector.PlainText; import us.codecraft.webmagic.selector.PlainText;
/** /**
...@@ -937,7 +936,8 @@ public class MockGithubDownloader implements Downloader{ ...@@ -937,7 +936,8 @@ public class MockGithubDownloader implements Downloader{
@Override @Override
public Page download(Request request, Task task) { public Page download(Request request, Task task) {
Page page = new Page(); Page page = new Page();
page.setHtml(new Html(html)); page.setRawText(html);
page.setStatusCode(200);
page.setRequest(new Request("https://github.com/code4craft/webmagic")); page.setRequest(new Request("https://github.com/code4craft/webmagic"));
page.setUrl(new PlainText("https://github.com/code4craft/webmagic")); page.setUrl(new PlainText("https://github.com/code4craft/webmagic"));
return page; return page;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment