Commit 44794282 authored by yihua.huang's avatar yihua.huang

add multithread support

parent b5f2498c
#!/bin/sh #!/bin/sh
VERSION="0.4.1-SNAPTHOS" VERSION="0.4.1-SNAPSHOT"
mvn clean package mvn clean package
cp target/webmagic-scripts-${VERSION}.jar /usr/local/webmagic/webmagic-console.jar cp target/webmagic-scripts-${VERSION}.jar /usr/local/webmagic/webmagic-console.jar
rsync -avz --delete target/lib/ /usr/local/webmagic/lib/ rsync -avz --delete target/lib/ /usr/local/webmagic/lib/
...@@ -2,6 +2,8 @@ package us.codecraft.webmagic.scripts; ...@@ -2,6 +2,8 @@ package us.codecraft.webmagic.scripts;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.commons.cli.*; import org.apache.commons.cli.*;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import us.codecraft.webmagic.Spider; import us.codecraft.webmagic.Spider;
import java.util.HashMap; import java.util.HashMap;
...@@ -85,7 +87,7 @@ public class ScriptConsole { ...@@ -85,7 +87,7 @@ public class ScriptConsole {
private static void startSpider(Params params) { private static void startSpider(Params params) {
ScriptProcessor pageProcessor = ScriptProcessorBuilder.custom() ScriptProcessor pageProcessor = ScriptProcessorBuilder.custom()
.language(params.getLanguage()).scriptFromFile(params.getScriptFileName()).build(); .language(params.getLanguage()).scriptFromFile(params.getScriptFileName()).thread(params.getThread()).build();
pageProcessor.getSite().setSleepTime(params.getSleepTime()); pageProcessor.getSite().setSleepTime(params.getSleepTime());
pageProcessor.getSite().setAcceptStatCode(Sets.<Integer>newHashSet(200, 404, 500)); pageProcessor.getSite().setAcceptStatCode(Sets.<Integer>newHashSet(200, 404, 500));
Spider spider = Spider.create(pageProcessor).thread(params.getThread()); Spider spider = Spider.create(pageProcessor).thread(params.getThread());
...@@ -100,13 +102,15 @@ public class ScriptConsole { ...@@ -100,13 +102,15 @@ public class ScriptConsole {
spider.run(); spider.run();
} }
private static Params parseCommand(String[] args) { private static Params parseCommand(String[] args) {
try { try {
Options options = new Options(); Options options = new Options();
options.addOption(new Option("l", true, "language")); options.addOption(new Option("l", "language", true, "language"));
options.addOption(new Option("t", true, "thread")); options.addOption(new Option("t", "thread", true, "thread"));
options.addOption(new Option("f", true, "script file")); options.addOption(new Option("f", "file", true, "script file"));
options.addOption(new Option("s", true, "sleep time")); options.addOption(new Option("s", "sleep", true, "sleep time"));
options.addOption(new Option("g", "logger", true, "sleep time"));
CommandLineParser commandLineParser = new PosixParser(); CommandLineParser commandLineParser = new PosixParser();
CommandLine commandLine = commandLineParser.parse(options, args); CommandLine commandLine = commandLineParser.parse(options, args);
return readOptions(commandLine); return readOptions(commandLine);
...@@ -143,7 +147,27 @@ public class ScriptConsole { ...@@ -143,7 +147,27 @@ public class ScriptConsole {
Integer thread = Integer.parseInt(commandLine.getOptionValue("t")); Integer thread = Integer.parseInt(commandLine.getOptionValue("t"));
params.setThread(thread); params.setThread(thread);
} }
if (commandLine.hasOption("g")) {
configLogger(commandLine.getOptionValue("g"));
}
params.setUrls(commandLine.getArgList()); params.setUrls(commandLine.getArgList());
return params; return params;
} }
private static void configLogger(String value) {
Logger rootLogger = Logger.getRootLogger();
if ("debug".equalsIgnoreCase(value)) {
rootLogger.setLevel(Level.DEBUG);
} else if ("info".equalsIgnoreCase(value)) {
rootLogger.setLevel(Level.INFO);
} else if ("warn".equalsIgnoreCase(value)) {
rootLogger.setLevel(Level.WARN);
} else if ("trace".equalsIgnoreCase(value)) {
rootLogger.setLevel(Level.TRACE);
} else if ("off".equalsIgnoreCase(value)) {
rootLogger.setLevel(Level.OFF);
} else if ("error".equalsIgnoreCase(value)) {
rootLogger.setLevel(Level.ERROR);
}
}
} }
package us.codecraft.webmagic.scripts;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author code4crafter@gmail.com
* @since 0.4.1
*/
public class ScriptEnginePool {
private final int size;
private final AtomicInteger availableCount;
private final LinkedBlockingQueue<ScriptEngine> scriptEngines = new LinkedBlockingQueue<ScriptEngine>();
public ScriptEnginePool(Language language,int size) {
this.size = size;
this.availableCount = new AtomicInteger(size);
for (int i=0;i<size;i++){
ScriptEngineManager manager = new ScriptEngineManager();
ScriptEngine engine = manager.getEngineByName(language.getEngineName());
scriptEngines.add(engine);
}
}
public ScriptEngine getEngine() {
availableCount.decrementAndGet();
return scriptEngines.poll();
}
public void release(ScriptEngine scriptEngine){
scriptEngines.add(scriptEngine);
}
}
...@@ -7,7 +7,6 @@ import us.codecraft.webmagic.processor.PageProcessor; ...@@ -7,7 +7,6 @@ import us.codecraft.webmagic.processor.PageProcessor;
import javax.script.ScriptContext; import javax.script.ScriptContext;
import javax.script.ScriptEngine; import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException; import javax.script.ScriptException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
...@@ -18,7 +17,7 @@ import java.io.InputStream; ...@@ -18,7 +17,7 @@ import java.io.InputStream;
*/ */
public class ScriptProcessor implements PageProcessor { public class ScriptProcessor implements PageProcessor {
private ScriptEngine engine; private ScriptEnginePool enginePool;
private String defines; private String defines;
...@@ -28,13 +27,12 @@ public class ScriptProcessor implements PageProcessor { ...@@ -28,13 +27,12 @@ public class ScriptProcessor implements PageProcessor {
private Site site = Site.me(); private Site site = Site.me();
public ScriptProcessor(Language language, String script) { public ScriptProcessor(Language language, String script, int threadNum) {
if (language == null || script == null) { if (language == null || script == null) {
throw new IllegalArgumentException("language and script must not be null!"); throw new IllegalArgumentException("language and script must not be null!");
} }
this.language = language; this.language = language;
ScriptEngineManager manager = new ScriptEngineManager(); enginePool = new ScriptEnginePool(language, threadNum);
engine = manager.getEngineByName(language.getEngineName());
InputStream resourceAsStream = this.getClass().getClassLoader().getResourceAsStream(language.getDefineFile()); InputStream resourceAsStream = this.getClass().getClassLoader().getResourceAsStream(language.getDefineFile());
try { try {
defines = IOUtils.toString(resourceAsStream); defines = IOUtils.toString(resourceAsStream);
...@@ -46,6 +44,8 @@ public class ScriptProcessor implements PageProcessor { ...@@ -46,6 +44,8 @@ public class ScriptProcessor implements PageProcessor {
@Override @Override
public void process(Page page) { public void process(Page page) {
ScriptEngine engine = enginePool.getEngine();
try {
ScriptContext context = engine.getContext(); ScriptContext context = engine.getContext();
context.setAttribute("page", page, ScriptContext.ENGINE_SCOPE); context.setAttribute("page", page, ScriptContext.ENGINE_SCOPE);
context.setAttribute("config", site, ScriptContext.ENGINE_SCOPE); context.setAttribute("config", site, ScriptContext.ENGINE_SCOPE);
...@@ -67,6 +67,9 @@ public class ScriptProcessor implements PageProcessor { ...@@ -67,6 +67,9 @@ public class ScriptProcessor implements PageProcessor {
} catch (ScriptException e) { } catch (ScriptException e) {
e.printStackTrace(); e.printStackTrace();
} }
} finally {
enginePool.release(engine);
}
} }
@Override @Override
......
...@@ -18,6 +18,8 @@ public class ScriptProcessorBuilder { ...@@ -18,6 +18,8 @@ public class ScriptProcessorBuilder {
private String script; private String script;
private int threadNum = 1;
private ScriptProcessorBuilder() { private ScriptProcessorBuilder() {
} }
...@@ -57,8 +59,13 @@ public class ScriptProcessorBuilder { ...@@ -57,8 +59,13 @@ public class ScriptProcessorBuilder {
return this; return this;
} }
public ScriptProcessorBuilder thread(int threadNum) {
this.threadNum = threadNum;
return this;
}
public ScriptProcessor build(){ public ScriptProcessor build(){
return new ScriptProcessor(language,script); return new ScriptProcessor(language,script,threadNum);
} }
} }
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
</appender> </appender>
<logger name="org.apache" additivity="false"> <logger name="org.apache" additivity="false">
<level value="warn" /> <level value="error" />
<appender-ref ref="stdout" /> <appender-ref ref="stdout" />
</logger> </logger>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment