`
vortexchoo
  • 浏览: 64290 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

线程池+队列

    博客分类:
  • java
阅读更多
笔记:自己实现的线程池+队列。

package org.vic.thread.core;

import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public abstract class ThreadPool<T> implements Runnable {

private Integer defaultPoolSize = 10;
private Integer defaultQueueSize = 10;

private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

private Long take_waittingMillis = 0L;
private Long put_warttingMillis = 0L;

private LinkedList<T> queue = new LinkedList<T>();

private boolean start = false;

public void take_waittingMills(Long val) {
this.take_waittingMillis = val;
}
public void put_waittingMills(Long val) {
this.put_warttingMillis = val;
}

/**
* @author Vic.Chu
* @param poolSize
* @param queueSize
* @param takeWaittingMillis
* @param putWaittingMillis
* Constructor
*/
public ThreadPool (Integer poolSize, Integer queueSize, Long takeWaittingMillis, Long putWaittingMillis) {
if (poolSize != null && poolSize > 0) {
defaultPoolSize = poolSize;
}
if (queueSize != null && queueSize > 0) {
defaultQueueSize = queueSize;
}
if (takeWaittingMillis != null && takeWaittingMillis > 0) {
take_waittingMills(takeWaittingMillis);
}
if (putWaittingMillis != null && putWaittingMillis > 0) {
put_waittingMills(putWaittingMillis);
}
}

/**
* @author Vic.Chu
* Constructor
*/
public ThreadPool () {}

/**
* threads will invoke this method after starting.
*/
@Override
public void run() {
try {
while (true) {
T t = take();
if (t == null) continue;
doProcess(t);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* @author Vic.Chu
* @param t
* Programmers have to implement this method for processing their data.
* @return
*/
public abstract T doProcess(T t);

/**
* @author Vic.Chu
* When new this ThreadPool, you should start it!
*/
public void start() {
if (!start) {
for (int i = 0; i < defaultPoolSize; i++) {
Thread thread = new Thread(this);
thread.start();
}
this.start = true;
}
}

/**
* @author Vic.Chu
* @param collection. the data collection which should be processed.
* @throws Exception. If the collection is null or is empty, this exception will be thrown.
*/
public void putCollections (Collection<? extends T> collection) throws Exception {
if (collection == null || collection.size() == 0) {
throw new Exception("Collection is null or is empty!");
}
for (T t : collection) {
try {
this.put(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* @author Vic.Chu
* @param t
* @throws InterruptedException
* Put a data element into the queue, await the thread to process.
*/
private void put(T t) throws InterruptedException {
lock.lock();
try {
if (queue.size() >= defaultQueueSize) {
notFull.await();
} else {
notEmpty.signal();
Thread.sleep(this.put_warttingMillis);
queue.add(t);
}
} finally {
lock.unlock();
}
}

/**
* @author Vic.Chu
* @return
* @throws InterruptedException
* Take a data element from the queue and destroy this data element in the queue.
*/
private T take() throws InterruptedException {
lock.lock();
try {
if (queue.size() == 0) {
notEmpty.await();
return null;
} else {
notFull.signal();
Thread.sleep(this.take_waittingMillis);
T t = queue.getFirst();
queue.removeFirst();
return t;
}
} finally {
lock.unlock();
}
}

/**
* @author Vic.Chu
* @return
* Get data elements count in the queue.
*/
public Integer currentQueueSize () {
return queue.size();
}

/**
* @author Vic.Chu
* @return
* Get this thread pool's size.
*/
public Integer poolSize () {
return this.defaultPoolSize;
}

/**
* @author Vic.Chu
* @return
* Get this queue's size
*/
public Integer queueSize () {
return this.defaultQueueSize;
}

/**
* test
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
ThreadPool<String> tp = new ThreadPool<String>(5, 5, 0L, 0L) {
@Override
public String doProcess(String t) {
System.out.println("current process is "
+ Thread.currentThread().getName());
System.out.println(t);
return t;
}
};
tp.start();
tp.putCollections(Arrays.asList(new String[]{"s","t","r","i","n","g"}));
}
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics