Commit e310139d authored by Yihua Huang's avatar Yihua Huang

Merge pull request #128 from yxssfxwzy/proxy

多个代理的管理
parents b1650904 c146e2c7
......@@ -4,3 +4,6 @@ out/
.idea
.classpath
.project
.settings/
bin/
.myeclipse
......@@ -18,6 +18,8 @@ public class Request implements Serializable {
private static final long serialVersionUID = 2062192774891352043L;
public static final String CYCLE_TRIED_TIMES = "_cycle_tried_times";
public static final String STATUS_CODE = "statusCode";
public static final String PROXY = "proxy";
private String url;
......
......@@ -3,6 +3,8 @@ package us.codecraft.webmagic;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import org.apache.http.HttpHost;
import us.codecraft.webmagic.proxy.ProxyPool;
import us.codecraft.webmagic.utils.UrlUtils;
import java.util.*;
......@@ -47,6 +49,8 @@ public class Site {
private HttpHost httpProxy;
private ProxyPool httpProxyPool=new ProxyPool();
private boolean useGzip = true;
/**
......@@ -438,4 +442,32 @@ public class Site {
", headers=" + headers +
'}';
}
/**
* Set httpProxyPool, String[0]:ip, String[1]:port <br>
*
* @return this
*/
public Site setHttpProxyPool(List<String[]> httpProxyList) {
this.httpProxyPool=new ProxyPool(httpProxyList);
return this;
}
public ProxyPool getHttpProxyPool() {
return httpProxyPool;
}
public HttpHost getHttpProxyFromPool() {
return httpProxyPool.getProxy();
}
public void returnHttpProxyToPool(HttpHost proxy,int statusCode) {
httpProxyPool.returnProxy(proxy,statusCode);
}
public Site setProxyReuseInterval(int reuseInterval) {
this.httpProxyPool.setReuseInterval(reuseInterval);
return this;
}
}
......@@ -2,6 +2,7 @@ package us.codecraft.webmagic;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.downloader.Downloader;
......@@ -324,6 +325,10 @@ public class Spider implements Runnable, Task {
onError(requestFinal);
logger.error("process request " + requestFinal + " error", e);
} finally {
if (site.getHttpProxyPool().isEnable()) {
site.returnHttpProxyToPool((HttpHost) requestFinal.getExtra(Request.PROXY), (Integer) requestFinal
.getExtra(Request.STATUS_CODE));
}
pageCount.incrementAndGet();
signalNewUrl();
}
......
......@@ -3,6 +3,7 @@ package us.codecraft.webmagic.downloader;
import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.annotation.ThreadSafe;
......@@ -85,10 +86,12 @@ public class HttpClientDownloader extends AbstractDownloader {
}
logger.info("downloading page {}", request.getUrl());
CloseableHttpResponse httpResponse = null;
int statusCode=0;
try {
HttpUriRequest httpUriRequest = getHttpUriRequest(request, site, headers);
httpResponse = getHttpClient(site).execute(httpUriRequest);
int statusCode = httpResponse.getStatusLine().getStatusCode();
statusCode = httpResponse.getStatusLine().getStatusCode();
request.putExtra(Request.STATUS_CODE, statusCode);
if (statusAccept(acceptStatCode, statusCode)) {
Page page = handleResponse(request, charset, httpResponse, task);
onSuccess(request);
......@@ -105,6 +108,7 @@ public class HttpClientDownloader extends AbstractDownloader {
onError(request);
return null;
} finally {
request.putExtra(Request.STATUS_CODE, statusCode);
try {
if (httpResponse != null) {
//ensure the connection is released back to pool
......@@ -137,8 +141,10 @@ public class HttpClientDownloader extends AbstractDownloader {
.setSocketTimeout(site.getTimeOut())
.setConnectTimeout(site.getTimeOut())
.setCookieSpec(CookieSpecs.BEST_MATCH);
if (site != null && site.getHttpProxy() != null) {
requestConfigBuilder.setProxy(site.getHttpProxy());
if (site.getHttpProxyPool().isEnable()) {
HttpHost host = site.getHttpProxyFromPool();
requestConfigBuilder.setProxy(host);
request.putExtra(Request.PROXY, host);
}
requestBuilder.setConfig(requestConfigBuilder.build());
return requestBuilder.build();
......
package us.codecraft.webmagic.proxy;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHost;
/**
* >>>>Proxy Status
+----------+ +-----+
| last use | | new |
+-----+----+ +---+-+
| +------+ |
+->| init |<--+
+--+---+
|
v
+--------+
+--->| borrow |
| +---+----+
| |+------------------+
| v
| +--------+
| | in use | Respone Time
| +---+----+
| |+------------------+
| v
| +--------+
| | return |
| +---+----+
| |+-------------------+
| v
| +-------+ reuse interval
| | delay | (delay time)
| +---+---+
| |+-------------------+
| v
| +------+
| | idle | idle time
| +---+--+
| |+-------------------+
+--------+
*/
public class Proxy implements Delayed, Serializable {
private static final long serialVersionUID = 228939737383625551L;
public static final int ERROR_403 = 403;
public static final int ERROR_404 = 404;
public static final int ERROR_BANNED = 10000;
public static final int ERROR_Proxy = 10001;
public static final int SUCCESS = 200;
private final HttpHost httpHost;
private int reuseTimeInterval = 1500;// ms
private Long canReuseTime = 0L;
private Long lastBorrowTime = System.currentTimeMillis();
private Long responseTime = 0L;
private Long idleTime = 0L;
private int failedNum = 0;
private int successNum = 0;
private int borrowNum = 0;
private List<Integer> failedErrorType = new ArrayList<Integer>();
Proxy(HttpHost httpHost) {
this.httpHost = httpHost;
this.canReuseTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(reuseTimeInterval, TimeUnit.MILLISECONDS);
}
Proxy(HttpHost httpHost, int reuseInterval) {
this.httpHost = httpHost;
this.canReuseTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(reuseInterval, TimeUnit.MILLISECONDS);
}
public int getSuccessNum() {
return successNum;
}
public void successNumIncrement(int increment) {
this.successNum += increment;
}
public Long getLastUseTime() {
return lastBorrowTime;
}
public void setLastBorrowTime(Long lastBorrowTime) {
this.lastBorrowTime = lastBorrowTime;
}
public void recordResponse() {
this.responseTime = (System.currentTimeMillis() - lastBorrowTime + responseTime) / 2;
this.lastBorrowTime = System.currentTimeMillis();
}
public List<Integer> getFailedErrorType() {
return failedErrorType;
}
public void setFailedErrorType(List<Integer> failedErrorType) {
this.failedErrorType = failedErrorType;
}
public void fail(int failedErrorType) {
this.failedNum++;
this.failedErrorType.add(failedErrorType);
}
public void setFailedNum(int failedNum) {
this.failedNum = failedNum;
}
public int getFailedNum() {
return failedNum;
}
public String getFailedType() {
String re = "";
for (Integer i : this.failedErrorType) {
re += i + " . ";
}
return re;
}
public HttpHost getHttpHost() {
return httpHost;
}
public int getReuseTimeInterval() {
return reuseTimeInterval;
}
public void setReuseTimeInterval(int reuseTimeInterval) {
this.reuseTimeInterval = reuseTimeInterval;
this.canReuseTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(reuseTimeInterval, TimeUnit.MILLISECONDS);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(canReuseTime - System.nanoTime(), unit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
Proxy that = (Proxy) o;
return canReuseTime > that.canReuseTime ? 1 : (canReuseTime < that.canReuseTime ? -1 : 0);
}
@Override
public String toString() {
String re = String.format("host: %15s >> %5dms >> success: %-3.2f%% >> borrow: %d", httpHost.getAddress().getHostAddress(), responseTime,
successNum * 100.0 / borrowNum, borrowNum);
return re;
}
public void borrowNumIncrement(int increment) {
this.borrowNum += increment;
}
public int getBorrowNum() {
return borrowNum;
}
}
package us.codecraft.webmagic.proxy;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ClassName:ProxyPool
*
* @see
* @Function: TODO ADD FUNCTION
* @author ch
* @version Ver 1.0
* @Date 2014-2-14 下午01:10:04
*/
public class ProxyPool {
private Logger logger = LoggerFactory.getLogger(getClass());
private BlockingQueue<Proxy> proxyQueue = new DelayQueue<Proxy>();
private Map<String, Proxy> allProxy = new ConcurrentHashMap<String, Proxy>();
private int reuseInterval = 1500;// ms
private int reviveTime = 2 * 60 * 60 * 1000;// ms
private boolean isEnable = false;
private boolean validateWhenInit = false;
private String proxyFile = "data/lastUse.proxy";
private Timer timer = new Timer(true);
private TimerTask saveProxyTask = new TimerTask() {
@Override
public void run() {
saveProxyList();
logger.info(allProxyStutus());
}
};
public ProxyPool() {
}
public ProxyPool(List<String[]> httpProxyList) {
readProxyList();
addProxy(httpProxyList.toArray(new String[httpProxyList.size()][]));
timer.schedule(saveProxyTask, 10 * 60 * 1000L, 10 * 60 * 1000);
}
private void saveProxyList() {
if (allProxy.size() == 0) {
return;
}
try {
ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(proxyFile));
os.writeObject(prepareForSaving());
os.close();
logger.info("save proxy");
} catch (FileNotFoundException e) {
logger.error("proxy file not found", e);
} catch (IOException e) {
e.printStackTrace();
}
}
private Map<String, Proxy> prepareForSaving() {
Map<String, Proxy> tmp = new HashMap<String, Proxy>();
for (Entry<String, Proxy> e : allProxy.entrySet()) {
Proxy p = e.getValue();
p.setFailedNum(0);
tmp.put(e.getKey(), p);
}
return tmp;
}
private void readProxyList() {
try {
ObjectInputStream is = new ObjectInputStream(new FileInputStream(proxyFile));
addProxy((Map<String, Proxy>) is.readObject());
is.close();
} catch (FileNotFoundException e) {
logger.error("proxy file not found", e);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
private void addProxy(Map<String, Proxy> httpProxyMap) {
isEnable = true;
for (Entry<String, Proxy> entry : httpProxyMap.entrySet()) {
try {
if (allProxy.containsKey(entry.getKey())) {
continue;
}
if (!validateWhenInit || ProxyUtil.validateProxy(entry.getValue().getHttpHost())) {
entry.getValue().setFailedNum(0);
entry.getValue().setReuseTimeInterval(reuseInterval);
proxyQueue.add(entry.getValue());
allProxy.put(entry.getKey(), entry.getValue());
}
} catch (NumberFormatException e) {
logger.error("HttpHost init error:", e);
}
}
logger.info("proxy pool size>>>>" + allProxy.size());
}
public void addProxy(String[]... httpProxyList) {
isEnable = true;
for (String[] s : httpProxyList) {
try {
if (allProxy.containsKey(s[0])) {
continue;
}
HttpHost item = new HttpHost(InetAddress.getByName(s[0]), Integer.valueOf(s[1]));
if (!validateWhenInit || ProxyUtil.validateProxy(item)) {
Proxy p = new Proxy(item, reuseInterval);
proxyQueue.add(p);
allProxy.put(s[0], p);
}
} catch (NumberFormatException e) {
logger.error("HttpHost init error:", e);
} catch (UnknownHostException e) {
logger.error("HttpHost init error:", e);
}
}
logger.info("proxy pool size>>>>" + allProxy.size());
}
public HttpHost getProxy() {
Proxy proxy = null;
try {
Long time = System.currentTimeMillis();
proxy = proxyQueue.take();
double costTime = (System.currentTimeMillis() - time) / 1000.0;
if (costTime > reuseInterval) {
logger.info("get proxy time >>>> " + costTime);
}
Proxy p = allProxy.get(proxy.getHttpHost().getAddress().getHostAddress());
p.setLastBorrowTime(System.currentTimeMillis());
p.borrowNumIncrement(1);
} catch (InterruptedException e) {
logger.error("get proxy error", e);
}
if (proxy == null) {
throw new NoSuchElementException();
}
return proxy.getHttpHost();
}
public void returnProxy(HttpHost host, int statusCode) {
Proxy p = allProxy.get(host.getAddress().getHostAddress());
if (p == null) {
return;
}
switch (statusCode) {
case Proxy.SUCCESS:
p.setReuseTimeInterval(reuseInterval);
p.setFailedNum(0);
p.setFailedErrorType(new ArrayList<Integer>());
p.recordResponse();
p.successNumIncrement(1);
break;
case Proxy.ERROR_403:
// banned,try larger interval
p.fail(Proxy.ERROR_403);
p.setReuseTimeInterval(reuseInterval * p.getFailedNum());
logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0);
break;
case Proxy.ERROR_BANNED:
p.fail(Proxy.ERROR_BANNED);
p.setReuseTimeInterval(10 * 60 * 1000 * p.getFailedNum());
logger.warn("this proxy is banned >>>> " + p.getHttpHost());
logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0);
break;
case Proxy.ERROR_404:
//p.fail(Proxy.ERROR_404);
// p.setReuseTimeInterval(reuseInterval * p.getFailedNum());
break;
default:
p.fail(statusCode);
break;
}
if (p.getFailedNum() > 20) {
// allProxy.remove(host.getAddress().getHostAddress());
p.setReuseTimeInterval(reviveTime);
logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size());
return;
}
if (p.getFailedNum()%5==0) {
if (!ProxyUtil.validateProxy(host)) {
// allProxy.remove(host.getAddress().getHostAddress());
p.setReuseTimeInterval(reviveTime);
logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size());
return;
}
}
try {
proxyQueue.put(p);
} catch (InterruptedException e) {
logger.warn("proxyQueue return proxy error", e);
}
}
public String allProxyStutus() {
String re = "all proxy info >>>> \n";
for (Entry<String, Proxy> entry : allProxy.entrySet()) {
re += entry.getValue().toString() + "\n";
}
return re;
}
public int getIdleNum() {
return proxyQueue.size();
}
public int getReuseInterval() {
return reuseInterval;
}
public void setReuseInterval(int reuseInterval) {
this.reuseInterval = reuseInterval;
}
public static List<String[]> getProxyList() {
List<String[]> proxyList = new ArrayList<String[]>();
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader(new File("proxy.txt")));
String line = "";
while ((line = br.readLine()) != null) {
proxyList.add(new String[] { line.split(":")[0], line.split(":")[1] });
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return proxyList;
}
public static void main(String[] args) throws IOException {
ProxyPool proxyPool = new ProxyPool(getProxyList());
proxyPool.setReuseInterval(10000);
// proxyPool.saveProxyList();
while (true) {
List<HttpHost> httphostList = new ArrayList<HttpHost>();
System.in.read();
int i = 0;
while (proxyPool.getIdleNum() > 2) {
HttpHost httphost = proxyPool.getProxy();
httphostList.add(httphost);
// proxyPool.proxyPool.use(httphost);
proxyPool.logger.info("borrow object>>>>" + i + ">>>>" + httphostList.get(i).toString());
i++;
}
System.out.println(proxyPool.allProxyStutus());
System.in.read();
for (i = 0; i < httphostList.size(); i++) {
proxyPool.returnProxy(httphostList.get(i), 200);
proxyPool.logger.info("return object>>>>" + i + ">>>>" + httphostList.get(i).toString());
}
System.out.println(proxyPool.allProxyStutus());
System.in.read();
}
}
public void enable(boolean isEnable) {
this.isEnable = isEnable;
}
public boolean isEnable() {
return isEnable;
}
}
package us.codecraft.webmagic.proxy;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.Socket;
import java.net.SocketException;
import java.util.Enumeration;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ClassName:ProxyUtil
*
* @see
* @author ch
* @version Ver 1.0
* @Date 2014-2-16 下午04:20:07
*/
public class ProxyUtil {
// TODO 改为单例
private static InetAddress localAddr;
private static final Logger logger = LoggerFactory.getLogger(ProxyUtil.class);
static {
init();
}
private static void init() {
Enumeration<InetAddress> localAddrs;
try {
NetworkInterface ni = NetworkInterface.getByName("eth7");
if (ni == null) {
logger.error("choose NetworkInterface\n" + getNetworkInterface());
}
localAddrs = ni.getInetAddresses();
while (localAddrs.hasMoreElements()) {
InetAddress tmp = localAddrs.nextElement();
if (!tmp.isLoopbackAddress() && !tmp.isLinkLocalAddress() && !(tmp instanceof Inet6Address)) {
localAddr = tmp;
logger.info("local IP:" + localAddr.getHostAddress());
break;
}
}
} catch (Exception e) {
logger.error("Failure when init ProxyUtil", e);
logger.error("choose NetworkInterface\n" + getNetworkInterface());
}
}
public static boolean validateProxy(HttpHost p) {
if (localAddr == null) {
logger.error("cannot get local ip");
return false;
}
boolean isReachable = false;
Socket socket = null;
try {
socket = new Socket();
socket.bind(new InetSocketAddress(localAddr, 0));
InetSocketAddress endpointSocketAddr = new InetSocketAddress(p.getAddress().getHostAddress(), p.getPort());
socket.connect(endpointSocketAddr, 3000);
logger.debug("SUCCESS - connection established! Local: " + localAddr.getHostAddress() + " remote: " + p);
isReachable = true;
} catch (IOException e) {
logger.warn("FAILRE - CAN not connect! Local: " + localAddr.getHostAddress() + " remote: " + p);
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
logger.warn("Error occurred while closing socket of validating proxy", e);
}
}
}
return isReachable;
}
private static String getNetworkInterface() {
String networkInterfaceName = "";
Enumeration<NetworkInterface> enumeration = null;
try {
enumeration = NetworkInterface.getNetworkInterfaces();
} catch (SocketException e1) {
e1.printStackTrace();
}
while (enumeration.hasMoreElements()) {
NetworkInterface networkInterface = enumeration.nextElement();
networkInterfaceName += networkInterface.toString() + '\n';
Enumeration<InetAddress> addr = networkInterface.getInetAddresses();
while (addr.hasMoreElements()) {
networkInterfaceName += "\tip:" + addr.nextElement().getHostAddress() + "\n";
}
}
return networkInterfaceName;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment