在删除条目时迭代concurrenthashmap

xmjla07d  于 2021-07-03  发布在  Java
关注(0)|答案(3)|浏览(358)

我想周期性地迭代 ConcurrentHashMap 删除条目时,如下所示:

for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
    Entry<Integer, Integer> entry = iter.next();
    // do something
    iter.remove();
}

问题是,在我迭代时,另一个线程可能正在更新或修改值。如果发生这种情况,这些更新可能会永远丢失,因为我的线程在迭代时只看到过时的值,但是 remove() 将删除live条目。
经过一番考虑,我想出了这个解决办法:

map.forEach((key, value) -> {
    // delete if value is up to date, otherwise leave for next round
    if (map.remove(key, value)) {
        // do something
    }
});

这样做的一个问题是,它不会捕获对不实现的可变值的修改 equals() (例如 AtomicInteger ). 有没有更好的方法安全地删除并发修改?

4ktjp1zp

4ktjp1zp1#

你的解决方案是可行的,但是有一个潜在的场景。如果某些条目具有常量更新,则map.remove(key,value)可能永远不会返回true,直到更新结束。
如果您使用jdk8,这里是我的解决方案

for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
    Entry<Integer, Integer> entry = iter.next();
    Map.compute(entry.getKey(), (k, v) -> f(v));
    //do something for prevValue
}
....
private Integer prevValue;

private Integer f(Integer v){
    prevValue = v;
    return null;
}

compute()将f(v)应用于该值,在本例中,将该值赋给全局变量并删除该项。
根据javadoc,它是原子的。
尝试计算指定键及其当前Map值的Map(如果没有当前Map,则为null)。整个方法调用是以原子方式执行的。在计算过程中,其他线程在此Map上尝试的某些更新操作可能会被阻止,因此计算应该简短,并且不能尝试更新此Map的任何其他Map。

u4vypkhs

u4vypkhs2#

你的变通方法其实很好。在其他设施的基础上,您可以构建类似的解决方案(例如,使用 computeIfPresent() 和逻辑删除值),但它们有自己的注意事项,我在稍微不同的用例中使用了它们。
至于使用不实现 equals() 对于Map值,可以在相应类型的顶部使用自己的 Package 器。这是将对象相等的自定义语义注入由提供的原子replace/remove操作的最直接的方法 ConcurrentMap .
更新
下面是一个草图,展示了如何在 ConcurrentMap.remove(Object key, Object value) 应用程序编程接口:
在用于值的可变类型之上定义一个 Package 器类型,同时还定义自定义类型 equals() 方法构建在当前可变值之上。
在你的 BiConsumer (你要传给的羔羊) forEach ),创建值的深度副本(属于新 Package 器类型)并执行逻辑以确定是否需要在副本上删除该值。
如果需要删除该值,请调用 remove(myKey, myValueCopy) .
如果在计算是否需要删除值时发生了一些并发更改, remove(myKey, myValueCopy) 会回来的 false (aba问题除外,这是一个单独的主题)。
下面是一些代码来说明这一点:

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

public class Playground {

   private static class AtomicIntegerWrapper {
      private final AtomicInteger value;

      AtomicIntegerWrapper(int value) {
         this.value = new AtomicInteger(value);
      }

      public void set(int value) {
         this.value.set(value);
      }

      public int get() {
         return this.value.get();
      }

      @Override
      public boolean equals(Object obj) {
         if (this == obj) {
            return true;
         }
         if (!(obj instanceof AtomicIntegerWrapper)) {
            return false;
         }
         AtomicIntegerWrapper other = (AtomicIntegerWrapper) obj;
         if (other.value.get() == this.value.get()) {
            return true;
         }
         return false;
      }

      public static AtomicIntegerWrapper deepCopy(AtomicIntegerWrapper wrapper) {
         int wrapped = wrapper.get();
         return new AtomicIntegerWrapper(wrapped);
      }
   }

   private static final ConcurrentMap<Integer, AtomicIntegerWrapper> MAP
         = new ConcurrentHashMap<>();

   private static final int NUM_THREADS = 3;

   public static void main(String[] args) throws InterruptedException {
      for (int i = 0; i < 10; ++i) {
         MAP.put(i, new AtomicIntegerWrapper(1));
      }

      Thread.sleep(1);

      for (int i = 0; i < NUM_THREADS; ++i) {
         new Thread(() -> {
            Random rnd = new Random();
            while (!MAP.isEmpty()) {
               MAP.forEach((key, value) -> {
                  AtomicIntegerWrapper elem = MAP.get(key);
                  if (elem == null) {
                     System.out.println("Oops...");
                  } else if (elem.get() == 1986) {
                     elem.set(1);
                  } else if ((rnd.nextInt() & 128) == 0) {
                     elem.set(1986);
                  }
               });
            }
         }).start();
      }

      Thread.sleep(1);

      new Thread(() -> {
         Random rnd = new Random();
         while (!MAP.isEmpty()) {
            MAP.forEach((key, value) -> {
               AtomicIntegerWrapper elem =
                     AtomicIntegerWrapper.deepCopy(MAP.get(key));
               if (elem.get() == 1986) {
                  try {
                     Thread.sleep(10);
                  } catch (Exception e) {}
                  boolean replaced = MAP.remove(key, elem);
                  if (!replaced) {
                     System.out.println("Bailed out!");
                  } else {
                     System.out.println("Replaced!");
                  }
               }
            });
         }
      }).start();
   }
}

你会看到打印出来的“保释!”,混杂着“替换!”(删除成功,因为没有您关心的并发更新),计算将在某个点停止。
如果你删除自定义 equals() 方法并继续使用副本,您将看到源源不断的“bailed out!”,因为拷贝永远不会被认为等于Map中的值。
如果不使用副本,就不会看到“bailed out!”打印出来后,您将遇到您要解释的问题—值将被删除,而不考虑并发更改。

zmeyuzjn

zmeyuzjn3#

让我们考虑一下你有什么选择。
创建自己的容器类 isUpdated() 操作并使用自己的解决方法。
如果您的Map只包含几个元素,并且与put/delete操作相比,您非常频繁地迭代Map。这可能是一个很好的选择
CopyOnWriteArrayList CopyOnWriteArrayList<Entry<Integer, Integer>> lookupArray = ...; 另一个选择是实现自己的 CopyOnWriteMap ```
public class CopyOnWriteMap<K, V> implements Map<K, V>{

private volatile Map<K, V> currentMap;

public V put(K key, V value) {
    synchronized (this) {
        Map<K, V> newOne = new HashMap<K, V>(this.currentMap);
        V val = newOne.put(key, value);
        this.currentMap = newOne; // atomic operation
        return val;
    }
}

public V remove(Object key) {
    synchronized (this) {
        Map<K, V> newOne = new HashMap<K, V>(this.currentMap);
        V val = newOne.remove(key);
        this.currentMap = newOne; // atomic operation
        return val;
    }
}

[...]

}

有负面的副作用。如果您使用的是copy-on-write集合,您的更新将永远不会丢失,但您可以再次看到一些以前删除的条目。
最坏情况:如果Map被复制,每次删除的条目都会被恢复。

相关问题