我有一个Producer,它生成一个带有属性type的POJO。只能有两种类型,“A”和“B”。我有一个消费者线程池。每当我从生产者那里收到“B”类型的消息时,在我可以继续执行之前,我需要确保池中所有其他线程都已完成执行(目前是默认 Thread.sleep)。然后消费者线程应该拾取“B”类型的消息并运行它。直到这个线程运行,才能从队列中弹出任何消息。
示例:
class POJO_Message{
String type; //This will contain the type of message "A" or "B"
}
您可以使用LinkedBlockingDeque。例如:
public class ProducerConsumer {
public static void main(String[] args) {
final LinkedBlockingDeque<Message> queue = new LinkedBlockingDeque<>(10);
final AtomicLong id = new AtomicLong(0);
final Timer producer = new Timer(true);
producer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
queue.add(new Message( String.format("msg: %s" , id.incrementAndGet() ) ) );
}
}, 10, 10);
// consume
for(;;) {
try {
Message msg = queue.take();
System.out.println( msg );
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private static class Message {
private final String id;
public Message(String id) {
this.id = id;
}
public String getId() {
return id;
}
@Override
public String toString() {
return String.format("Message [id=%s]", id);
}
}
}
您可以使用ReadWriteLock work.when 消息类型为B,尝试获取写锁,其他类型的消息获取读 lock.one 这样的简单代码。
public class ConsumerProducerQueue {
ExecutorService executor = Executors.newFixedThreadPool(10);
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void addMessage(Message message) {
if ("B".equals(message.getType())) {
lock.writeLock().lock();
Future<?> result = executor.submit(new Task(message));
try {
result.get();
} catch (Exception e) {
} finally {
lock.writeLock().unlock();
}
} else {
lock.readLock().lock();
Future<?> result = executor.submit(new Task(message));
try {
result.get();
} catch (Exception e) {
} finally {
lock.readLock().unlock();
}
}
}
}
这种方法的性能不好。