使用多个线程(在池中)访问blockingqueue没有按预期并行工作

bnlyeluc  于 2021-07-06  发布在  Java
关注(0)|答案(1)|浏览(421)

所以我有一个阻塞队列实现。其中一个调度程序将一个随机数以1秒的延迟放入队列,我已经实现了另一个调度程序,其中包含10个线程池,用于从消息队列调用take()。
重要的一点是,我正在尝试实现的场景是,在从队列中获取单个项目之后,线程等待20秒(线程睡眠),而我的理解是,线程池中的其他9个线程将开始并行工作,而第一个线程等待20秒(其他线程也将等待20秒)但事实并非如此。池中的其他线程似乎根本没有启动。我是一个新手封锁队列和任何帮助将不胜感激。
我的代码如下。
公共类blockingqueueimpl{

public Queue<Integer> messageQueue = new ConcurrentLinkedDeque();

private void putNumber(Integer number){
   try{
       System.out.println("putting number to the queue: " + number);
       messageQueue.add(number);
       System.out.println("size of the queue: " +messageQueue.size());

   } catch (Exception e){
       e.printStackTrace();
   }
}

private void getNumber(){

}

private class RunnableGetImpl implements Runnable {

    @Override
    public void run() {
        try{
            Integer num = messageQueue.poll();
            System.out.println("Polling from queue, number - "+ num);
            if(num!=null){
                System.out.println("Sleeping thread for 20 sec"+Thread.activeCount());
                Thread.sleep(20000);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

private class RunnablePutImpl implements Runnable {

    @Override
    public void run() {
        Random rand = new Random();
        int n = rand.nextInt(100);
        n += 1;
        putNumber(n);

    }

}

public static void main(String[] args){

    BlockingQueueImpl blockingQueue = new BlockingQueueImpl();

    ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(1);
    executor1.scheduleAtFixedRate(blockingQueue.new RunnablePutImpl(), 0, 1000, TimeUnit.MILLISECONDS);

    ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(20);
    executor2.scheduleAtFixedRate(blockingQueue.new RunnableGetImpl(), 0, 100, TimeUnit.MILLISECONDS);

}

}

ndasle7k

ndasle7k1#

来自 ScheduledThreadPoolExecutor.scheduleAtFixedRate :
如果此任务的任何执行时间长于其周期,则后续执行可能会延迟开始,但不会同时执行。
因此,你需要开始(安排)尽可能多的工人。
为了寻找更好的解决方案,请注意,您实际上并没有使用 BlockingQueue 你不执行 java.util.concurrent.Blockingqueue ,也没有使用它的实现。 ConcurrentLinkedDeque 只是一个集合,它甚至没有实现 Queue . ConcurrentLinkedDeque.poll() 不会阻塞,只会返回 null 如果队列为空。
这些是 BlockingQueue 接口:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/blockingqueue.html
使用 put() 向队列提供值。如果 BlockingQueue 已达到最大容量。使用 take() 删除元素。如果队列为空,这将阻塞。
正确使用这些类将提高应用程序的性能,因为您不会一直轮询某个值。
关于一个类似问题的答案中提供了更多细节:如何使用concurrentlinkedqueue?
更新:具有多个生产者/消费者的示例代码
下面的示例代码是从https://riptutorial.com/java/example/13011/multiple-producer-consumer-example-with-shared-global-queue 与我没有任何联系:
下面的代码展示了多个生产者/消费者程序。生产者线程和使用者线程共享相同的全局队列。

import java.util.concurrent.*;
import java.util.Random;

public class ProducerConsumerWithES {
    public static void main(String args[]) {
        BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

        ExecutorService pes = Executors.newFixedThreadPool(2);
        ExecutorService ces = Executors.newFixedThreadPool(2);

        pes.submit(new Producer(sharedQueue, 1));
        pes.submit(new Producer(sharedQueue, 2));
        ces.submit(new Consumer(sharedQueue, 1));
        ces.submit(new Consumer(sharedQueue, 2));

        pes.shutdown();
        ces.shutdown();
    }
}

/* Different producers produces a stream of integers continuously to a shared queue, 
which is shared between all Producers and consumers */

class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    private Random random = new Random();
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        // Producer produces a continuous stream of numbers for every 200 milli seconds
        while (true) {
            try {
                int number = random.nextInt(1000);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
                Thread.sleep(200);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        // Consumer consumes numbers generated from Producer threads continuously
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

输出:

Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1

请注意如何将多个生产者和消费者添加到池中—您需要尽可能多的生产者和消费者才能并行工作。这是您的代码缺少的关键—多个工人。调度器将对它们进行调度,但它不会神奇地将您要求它调度的单个示例相乘。
很明显,您需要根据您的需求调整生产商和消费者的数量。

相关问题