java 允许单个线程使用Long值

zpf6vheq  于 11个月前  发布在  Java
关注(0)|答案(4)|浏览(67)

假设RequestID是一个Long值
我有2个线程不断被调用来处理一个“RequestID”。
如果这两个线程处理不同的RequestID,但不能同时处理相同的RequestID,则它们可以保持并行工作。
我想在RequestID上获得某种锁,这样其他线程就不能在它上面工作,除非第一个线程完成了RequestID。
做这件事的最好方法是什么?

sbdsn5lh

sbdsn5lh1#

注意:我没有试过显示的代码。关闭责任不在RequestProcessor中。你可以在这个类中添加一个shutdown方法,这个方法可以委托给被 Package 的执行器。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    public static void main(String[] args) {
        final ExecutorService executor = Executors.newCachedThreadPool();
        final long requestId = 5;
        
        executor.execute(() -> {
            //you could create processors for request which returns different types
            //For instance Boolean or any custom type
            //You could provide different implementation of ExecutorService
            final var booleanProcessor = new RequestProcessor<Boolean>(executor);
            
            final Callable<Boolean> aTask = new Callable<>() {

                @Override
                public Boolean call() throws Exception {
                    System.out.println("TASK 1 TRUE wait 5 seconds");
                    
                    Thread.sleep(5000);
                    
                    return true;
                }
                
            };
            
            booleanProcessor.runATaskForId(aTask, requestId);
            
            booleanProcessor.runATaskForId(() ->  {
                System.out.println("TASK 2 FALSE wait 4 seconds" );
                
                Thread.sleep(4000);
                
                return false;
            }, requestId);
            
    
        });
        
        executor.submit(()-> {
            final var stringProcessor = new RequestProcessor<String>(executor);
            
            //another tusk with lambda expression
            stringProcessor.runATaskForId(() -> {
                    System.out.println("Another Task That Returns String For Request Id Given");
                    
                    System.out.println("TASK 3 wait 10 seconds" );
                    
                    Thread.sleep(10000);
                    
                    return "";
                },
                requestId
            );
            
        });
        
        System.out.println("END");
    }

}

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

public class RequestProcessor<T> {
    private class RequestTask implements Callable<T>{
        private final long requestId;
        private final Callable<T> wrappedCallable;
        
        private T result;
        
        public RequestTask(long requestId, Callable<T> wrappedCallable) {
            this.requestId = requestId;
            this.wrappedCallable = wrappedCallable;
        }
        
        public long getRequestId() {
            return requestId;
        }
        
        @Override
        public T call() throws Exception {
            return wrappedCallable.call();
        }

        public void setResult(T result) {
            this.result = result;
        }

        public T getResult() {
            return result;
        }
    }

    private static final ConcurrentHashMap<Long, Future<?>> inProgressRequestIds = new ConcurrentHashMap<>();
    
    private final ExecutorService executor;

    public RequestProcessor(ExecutorService executor) {
        this.executor = executor;
    }
    
    public T runATaskForId(Callable<T> task, long Id) {
        return processRequest(new RequestTask(Id, task));
    }
    
    private T processRequest(RequestTask task) {
        inProgressRequestIds.compute(
            task.getRequestId(), 
            (Long key, Future<?> existingFuture) -> {
                task.setResult(retrieveResultOf(executor.submit(task)));
                
                return null;
            }
        );
    
        return task.getResult();
    }
    
    private T retrieveResultOf(Future<T> future) {
        boolean isInterrupted = false;
        T value = null;
        
        while(true) {
            try {
                value = future.get();
                break;
            } catch (InterruptedException e) {
                 isInterrupted = true;
            } catch (Exception e) {
                throw new RequestProcessingException(e);
            }
        }
        
        if(isInterrupted)
            Thread.currentThread().interrupt();
        
        return value;
    }

}

public class RequestProcessingException extends RuntimeException{

    /**
     * 
     */
    private static final long serialVersionUID = 1775615938643533445L;

    public RequestProcessingException(String message) {
        super(message);
    }

    public RequestProcessingException(String message, Throwable cause) {
        super(message, cause);
    }

    public RequestProcessingException(Throwable cause) {
        super(cause);
    }
    
}

字符串

3qpi33ja

3qpi33ja2#

编辑:经过一些讨论,这是不安全的使用!:)
我已经写了一些类似的东西,但它的明确未经测试的生产。我做了一些测试,但很难测试这样的东西。
这个想法是有一个内部静态并发散列表,它为每个键存储“信号量”。每个线程将尝试在这个Map中寻找信号量的外观,如果它不存在,则创建它。

public class Blocking {

  private static final ConcurrentHashMap<String, Semaphore> internalMap = new ConcurrentHashMap<>();

  public static <O> O byKey(String keys, Supplier<O> action) {

    var semaphores = new Semaphore[1];
    try {
      semaphores[0] = internalMap.computeIfAbsent(keys, k -> new Semaphore(1));
      semaphores[0].acquire();
      return action.get();
    } finally {
      internalMap.remove(keys);
      semaphores[0].release();
    }
  }
}

字符串
用途:

Blocking.byKey("KEY", () -> doYourStuff())

mcdcgff0

mcdcgff03#

您需要执行2个操作

  • 检查requestId是否被其他线程使用
  • 如果未使用,则将requestId添加为“in process”

上述2个操作需要是原子的,可以通过使用锁(使用同步的隐式)或外部锁来实现。无论哪种方式,都将导致争用,因为每个线程都需要在执行任何操作之前获取锁
这里使用ConcurrentHashMap很有用。因为putIfAbsent是原子的,并且它在内部使用桶级锁,这可以减少每个requestId的争用。您可以在下面引用其中一个实现的代码片段

public class LongThreadSafe implements Runnable{

    ConcurrentHashMap<Long,Long> map;

    public LongThreadSafe(ConcurrentHashMap map) {
        this.map = map;
    }

    @Override
    public void run() {

        List<Long> list = Arrays.asList(2L, 3L, 4L, 5L, 23L, 43L);

        for (Long requestId:list) {
            //we don't have any problem if multiple threads are updating value
            Long previousValue = map.putIfAbsent(requestId, requestId);
            if (previousValue == null){
                //run your task
                //update database record using (requestId)
                map.remove(requestId);
            }else {
                System.out.println("Current requestId: "+requestId+" is being processed by another thread");
            }
        }

    }
}
class client{

    public static void main(String[] args) {

        ConcurrentHashMap<Long, Long> map = new ConcurrentHashMap<>();

        Thread t1 = new Thread(new LongThreadSafe(map));
        Thread t2 = new Thread(new LongThreadSafe(map));
        t1.start();
        t2.start();
    }

}

字符串

fnx2tebb

fnx2tebb4#

一个简单的方法来控制线程之间对某个对象的访问,并拥有无限的此类对象的供应,就是使用条带锁。
带区锁是一个锁数组(或者只是对象)。
因此,根据对象的散列,您可以确定这个锁数组中的索引,然后获取它。
举例说明该方法:

// e.g. part of a constructor
   locks = new Object[1024];
   for(int k=0;k<locks.length;k++){
      locks[k]=new Object();
   }

   // part of the processing function
   int hash = requestId.hashCode();
   int lockIndex = hashToIndex(hash, locks.length);
   synchronized(locks[lockIndex)){
      ... do your stuff
   }

   public static int hashToIndex(int hash, int length) {
        if(length <= 0) {
           throw new IllegalArgumentException();
        }

        if (hash == Integer.MIN_VALUE) {
            return 0;
        }

        return abs(hash) % length;
    }

字符串
条带锁的最大优点是您不需要处理锁的销毁,因为它们可以在程序运行期间保留。所以你不会得到任何垃圾,你有很好的和简单的代码。
这是一个非常基本的解决方案;如果锁被保持很长一段时间,那么这可能导致争用(甚至是错误争用,因为不同的requestIdMap到相同的锁)。
在这种情况下,重组该计划可能是一个更好的解决方案。例如,你可以有一个单线程执行器的数组,并根据上面的hashToIndex函数对requestId的哈希值将requestId抛入执行器。这样就保证了相同的requestId由相同的线程处理,因此不需要锁定。

相关问题