Commit 5ecd909e authored by yihua.huang's avatar yihua.huang

add timeout for wait/notify #111

parent 964e6372
......@@ -20,6 +20,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
......@@ -104,6 +105,8 @@ public class Spider implements Runnable, Task {
private Date startTime;
private int emptySleepTime = 30000;
/**
* create a spider with pageProcessor.
*
......@@ -524,7 +527,7 @@ public class Spider implements Runnable, Task {
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
return;
}
newUrlCondition.await();
newUrlCondition.await(emptySleepTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.warn("waitNewUrl - interrupted, error {}", e);
} finally {
......@@ -716,4 +719,13 @@ public class Spider implements Runnable, Task {
public Scheduler getScheduler() {
return scheduler;
}
/**
* Set wait time when no url is polled.<br></br>
*
* @param emptySleepTime In MILLISECONDS.
*/
public void setEmptySleepTime(int emptySleepTime) {
this.emptySleepTime = emptySleepTime;
}
}
......@@ -51,9 +51,10 @@ public class CountableThreadPool {
private ExecutorService executorService;
public void execute(final Runnable runnable) {
try {
if (threadAlive.get() >= threadNum) {
if (threadAlive.get() >= threadNum) {
try {
reentrantLock.lock();
while (threadAlive.get() >= threadNum) {
try {
......@@ -61,29 +62,27 @@ public class CountableThreadPool {
} catch (InterruptedException e) {
}
}
} finally {
reentrantLock.unlock();
}
threadAlive.incrementAndGet();
executorService.execute(new Runnable() {
@Override
public void run() {
}
threadAlive.incrementAndGet();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
runnable.run();
} finally {
try {
runnable.run();
reentrantLock.lock();
threadAlive.decrementAndGet();
condition.signal();
} finally {
try {
reentrantLock.lock();
threadAlive.decrementAndGet();
condition.signal();
} finally {
reentrantLock.unlock();
}
reentrantLock.unlock();
}
}
});
} finally {
if (reentrantLock.isLocked()) {
reentrantLock.unlock();
}
}
});
}
public boolean isShutdown() {
......
......@@ -37,7 +37,7 @@ public class SpiderTest {
@Test
public void testWaitAndNotify() throws InterruptedException {
for (int i = 0; i < 10000; i++) {
System.out.println("round" + i);
System.out.println("round " + i);
testRound();
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment