com.twitter.distributedlog.lock.ZKSessionLock类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(15.8k)|赞(0)|评价(0)|浏览(104)

本文整理了Java中com.twitter.distributedlog.lock.ZKSessionLock类的一些代码示例,展示了ZKSessionLock类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZKSessionLock类的具体详情如下:
包路径:com.twitter.distributedlog.lock.ZKSessionLock
类名称:ZKSessionLock

ZKSessionLock介绍

[英]A lock under a given zookeeper session. This is a one-time lock. It is not reusable: if lock failed, if zookeeper session is expired, if #unlock is called, it would be transitioned to expired or closed state. The Locking Procedure is described as below.

  1. if it is an immediate lock, it would get lock waiters first. if the lock is already held by someone. it would fail immediately with com.twitter.distributedlog.exceptions.OwnershipAcquireFailedExceptionwith current owner. if there is no lock waiters, it would start locking procedure from 1. 1. prepare: create a sequential znode to identify the lock. 2. check lock waiters: get all lock waiters to check after prepare. if it is the first waiter, claim the ownership; if it is not the first waiter, but first waiter was itself (same client id and same session id) claim the ownership too; otherwise, it would set watcher on its sibling and wait it to disappared.
+-----------------+ 
|       INIT      | ------------------------------+ 
+--------+--------+                               | 
|                                        | 
|                                        | 
+--------v--------+                               | 
|    PREPARING    |----------------------------+  | 
+--------+--------+                            |  | 
|                                     |  | 
|                                     |  | 
+--------v--------+                            |  | 
+-------------|    PREPARED     |--------------+             |  | 
|             +-----^---------+-+              |             |  | 
|                   |  |      |                |             |  | 
|                   |  |      |                |             |  | 
|                   |  |      |                |             |  | 
+------V-----------+       |  |      |       +--------v----------+  |  | 
|     WAITING      |-------+  |      |       |    CLAIMED        |  |  | 
+------+-----+-----+          |      |       +--+----------+-----+  |  | 
|     |                |      |          |        |          |  | 
|     |                |      |          |        |          |  | 
|     |                |      |          |        |          |  | 
|     |                |    +-v----------v----+   |          |  | 
|     +-------------------->|     EXPIRED     |   |          |  | 
|                      |    +--+--------------+   |          |  | 
|                      |       |                  |          |  | 
|                      |       |                  |          |  | 
|             +--------V-------V-+                |          |  | 
+------------>|     CLOSING      | 
 Metrics 
  
 tryAcquire: opstats. latency spent on try locking operations. it includes timeouts. 
 tryTimeouts: counter. the number of timeouts on try locking operations 
 unlock: opstats. latency spent on unlock operations.

[中]给定zookeeper会话下的锁。这是一次性锁。它不可重复使用:如果锁定失败,如果zookeeper会话过期,如果调用#unlock,它将转换为过期或关闭状态。锁定程序如下所述。
0.如果是即时锁,它会首先得到锁服务人员。如果锁已经被某人持有。如果使用com,它将立即失败。啁啾分布式日志。例外。OwnershipAcquireFailedException与当前所有者。如果没有锁侍者,它将从1开始锁程序。1.准备:创建一个顺序znode来识别锁。2.检查锁侍者:准备好后让所有锁侍者检查。如果是第一个侍者,要求所有权;如果不是第一个服务员,但第一个服务员本身(相同的客户id和相同的会话id)也拥有所有权;否则,它会让观察者监视它的兄弟,并等待它消失。

+-----------------+ 
|       INIT      | ------------------------------+ 
+--------+--------+                               | 
|                                        | 
|                                        | 
+--------v--------+                               | 
|    PREPARING    |----------------------------+  | 
+--------+--------+                            |  | 
|                                     |  | 
|                                     |  | 
+--------v--------+                            |  | 
+-------------|    PREPARED     |--------------+             |  | 
|             +-----^---------+-+              |             |  | 
|                   |  |      |                |             |  | 
|                   |  |      |                |             |  | 
|                   |  |      |                |             |  | 
+------V-----------+       |  |      |       +--------v----------+  |  | 
|     WAITING      |-------+  |      |       |    CLAIMED        |  |  | 
+------+-----+-----+          |      |       +--+----------+-----+  |  | 
|     |                |      |          |        |          |  | 
|     |                |      |          |        |          |  | 
|     |                |      |          |        |          |  | 
|     |                |    +-v----------v----+   |          |  | 
|     +-------------------->|     EXPIRED     |   |          |  | 
|                      |    +--+--------------+   |          |  | 
|                      |       |                  |          |  | 
|                      |       |                  |          |  | 
|             +--------V-------V-+                |          |  | 
+------------>|     CLOSING      | 
 Metrics 
  
 tryAcquire: opstats. latency spent on try locking operations. it includes timeouts. 
 tryTimeouts: counter. the number of timeouts on try locking operations 
 unlock: opstats. latency spent on unlock operations.

代码示例

代码示例来源:origin: twitter/distributedlog

private void handleNodeDelete(int lockEpoch, final WatchedEvent event) {
  executeLockAction(lockEpoch, new LockAction() {
    @Override
    public void execute() {
      // The lock is either expired or closed
      if (!lockState.inState(State.WAITING)) {
        LOG.info("{} ignore watched node {} deleted event, since lock state has moved to {}.",
            new Object[] { lockId, event.getPath(), lockState.getState() });
        return;
      }
      lockState.transition(State.PREPARED);
      // we don't need to wait and check the result, since:
      // 1) if it claimed the ownership, it would notify the waiters when claimed ownerships
      // 2) if it failed, it would also notify the waiters, the waiters would cleanup the state.
      checkLockOwnerAndWaitIfPossible(watcher, true);
    }
    @Override
    public String getActionName() {
      return "handleNodeDelete(path=" + event.getPath() + ")";
    }
  });
}

代码示例来源:origin: twitter/distributedlog

public int compare(String o1, String o2) {
    int l1 = parseMemberID(o1);
    int l2 = parseMemberID(o2);
    return l1 - l2;
  }
};

代码示例来源:origin: twitter/distributedlog

@Override
public void tryLock(long timeout, TimeUnit unit) throws LockingException {
  final Stopwatch stopwatch = Stopwatch.createStarted();
  Future<LockWaiter> tryFuture = asyncTryLock(timeout, unit);
  LockWaiter waiter = waitForTry(stopwatch, tryFuture);
  boolean acquired = waiter.waitForAcquireQuietly();
  if (!acquired) {
    throw new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner());
  }
}

代码示例来源:origin: twitter/distributedlog

@Test(timeout = 60000)
public void testLockWhenPreviousLockZnodeStillExists() throws Exception {
  String lockPath = "/test-lock-when-previous-lock-znode-still-exists-" +
      System.currentTimeMillis();
  String clientId = "client-id";
  ZooKeeper zk = zkc.get();
  createLockPath(zk, lockPath);
  final ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId, lockStateExecutor);
  // lock0 lock
  lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
  // simulate lock0 expires but znode still exists
  final DistributedLockContext context1 = new DistributedLockContext();
  context1.addLockId(lock0.getLockId());
  final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor,
      60000, NullStatsLogger.INSTANCE, context1);
  lock1.tryLock(0L, TimeUnit.MILLISECONDS);
  assertEquals(State.CLAIMED, lock1.getLockState());
  lock1.unlock();
  final DistributedLockContext context2 = new DistributedLockContext();
  context2.addLockId(lock0.getLockId());
  final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor,
      60000, NullStatsLogger.INSTANCE, context2);
  lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
  assertEquals(State.CLAIMED, lock2.getLockState());
  lock2.unlock();
  lock0.unlock();
}

代码示例来源:origin: twitter/distributedlog

assertTrue(lock0.haveLock());
assertFalse(lock1.haveLock());
assertEquals(((ZKSessionLock) lock0.getInternalLock()).getLockId(),
    Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId(),
    Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
    fail("Should fail check write lock since lock is already held by other people");
  } catch (OwnershipAcquireFailedException oafe) {
    assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
        oafe.getCurrentOwner());
    fail("Should fail check write lock since lock is already held by other people");
  } catch (OwnershipAcquireFailedException oafe) {
    assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
        oafe.getCurrentOwner());
    fail("Should fail check write lock since lock is already held by other people");
  } catch (OwnershipAcquireFailedException oafe) {
    assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
        oafe.getCurrentOwner());
    fail("Should fail check write lock since lock is already held by other people");
  } catch (OwnershipAcquireFailedException oafe) {
    assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
        oafe.getCurrentOwner());
assertFalse(lock0.haveLock());
assertTrue(lock1.haveLock());

代码示例来源:origin: twitter/distributedlog

String clientId = "test-lock-on-non-existed-lock";
ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
  lock.tryLock(0, TimeUnit.MILLISECONDS);
  fail("Should fail on locking a non-existed lock.");
} catch (LockingException le) {
  assertEquals(KeeperException.Code.NONODE, ((KeeperException) cause).code());
assertEquals(State.CLOSED, lock.getLockState());
  lock.tryLock(0, TimeUnit.MILLISECONDS);
  fail("Should fail on locking a failure lock.");
} catch (LockStateChangedException lsce) {
assertEquals(State.CLOSED, lock.getLockState());

代码示例来源:origin: twitter/distributedlog

throw new LockingException(lockPath, "Timeout during try phase of lock acquire", toe);
} catch (Exception ex) {
  String message = getLockId() + " failed to lock " + lockPath;
  throw new LockingException(lockPath, message, ex);
} finally {
    unlock();

代码示例来源:origin: twitter/distributedlog

new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
  @Override
  public void execute() {
lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
  @Override
  public void execute() {
lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
  @Override
  public void execute() {
lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
  @Override
  public void execute() {

代码示例来源:origin: twitter/distributedlog

final ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor)
    .setLockListener(listener);
expiredLatch.await();
assertEquals(State.INIT, lock.getLockState());
try {
  lock.tryLock(timeout, TimeUnit.MILLISECONDS);
  fail("Should fail locking using an expired lock");
} catch (LockingException le) {
  assertTrue(le.getCause() instanceof KeeperException.SessionExpiredException);
assertEquals(State.CLOSED, lock.getLockState());
List<String> children = getLockWaiters(zkc, lockPath);
assertEquals(0, children.size());

代码示例来源:origin: twitter/distributedlog

@Test(timeout = 60000)
public void testParseClientID() throws Exception {
  ZooKeeper zk = zkc.get();
  String lockPath = "/test-parse-clientid";
  String clientId = "test-parse-clientid-" + System.currentTimeMillis();
  Pair<String, Long> lockId = Pair.of(clientId, zk.getSessionId());
  createLockPath(zk, lockPath);
  // Correct data
  String node1 = getLockIdFromPath(createLockNodeV1(zk, lockPath, clientId));
  String node2 = getLockIdFromPath(createLockNodeV2(zk, lockPath, clientId));
  String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node1)));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node2)));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node3)));
  // Bad Lock Node Name
  String node4 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member"));
  String node5 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode"));
  String node6 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode"));
  String node7 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode"));
  String node8 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode_badnode"));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node4)));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node5)));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node6)));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node7)));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node8)));
  // Malformed Node Name
  String node9 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_malformed_s12345678_999999"));
  assertEquals(Pair.of("malformed", 12345678L), Await.result(asyncParseClientID(zk, lockPath, node9)));
}

代码示例来源:origin: twitter/distributedlog

@Override
public Future<BoxedUnit> asyncUnlock() {
  return asyncUnlock(new LockClosedException(lockPath, lockId, lockState.getState()));
}

代码示例来源:origin: twitter/distributedlog

private Future<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
                            final boolean wait) {
  final Promise<String> promise = new Promise<String>();
  checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise);
  return promise;
}

代码示例来源:origin: twitter/distributedlog

final boolean wait = DistributedLogConstants.LOCK_IMMEDIATE != timeout;
if (wait) {
  asyncTryLock(wait, result);
} else {

代码示例来源:origin: twitter/distributedlog

asyncTryLockWithoutCleanup(wait, lockResult);

代码示例来源:origin: twitter/distributedlog

/**
 * Test lock after unlock is called.
 *
 * @throws Exception
 */
@Test(timeout = 60000)
public void testLockAfterUnlock() throws Exception {
  String lockPath = "/test-lock-after-unlock";
  String clientId = "test-lock-after-unlock";
  ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
  lock.unlock();
  assertEquals(State.CLOSED, lock.getLockState());
  try {
    lock.tryLock(0, TimeUnit.MILLISECONDS);
    fail("Should fail on tryLock since lock state has changed.");
  } catch (LockStateChangedException lsce) {
    // expected
  }
  assertEquals(State.CLOSED, lock.getLockState());
  try {
    lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    fail("Should fail on tryLock immediately if lock state has changed.");
  } catch (LockStateChangedException lsce) {
    // expected
  }
  assertEquals(State.CLOSED, lock.getLockState());
}

代码示例来源:origin: twitter/distributedlog

FutureUtils.result(lock0.asyncAcquire());
Pair<String, Long> lockId0_1 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertTrue(lock0.haveLock());
assertEquals(lockId0_1,
    Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
Pair<String, Long> lockId0_2 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertFalse("New lock should be created under different session", lockId0_1.equals(lockId0_2));
assertTrue(lock0.haveLock());
assertEquals(lockId0_2,
    Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));

代码示例来源:origin: twitter/distributedlog

/**
 * Test try-create after close race condition.
 *
 * @throws Exception
 */
@Test(timeout = 60000)
public void testTryCloseRaceCondition() throws Exception {
  String name = testNames.getMethodName();
  String lockPath = "/" + name;
  String clientId = name;
  createLockPath(zkc.get(), lockPath);
  ZKSessionLock lock = new ZKSessionLock(
      zkc, lockPath, clientId, lockStateExecutor,
      1*1000 /* op timeout */, NullStatsLogger.INSTANCE,
      new DistributedLockContext());
  try {
    FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition,
                  FailpointUtils.DEFAULT_ACTION);
    lock.tryLock(0, TimeUnit.MILLISECONDS);
  } catch (LockClosedException ex) {
    ;
  } finally {
    FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition);
  }
  assertEquals(State.CLOSED, lock.getLockState());
  List<String> children = getLockWaiters(zkc, lockPath);
  assertEquals(0, children.size());
}

代码示例来源:origin: twitter/distributedlog

@Override
public void unlock() {
  Future<BoxedUnit> unlockResult = asyncUnlock();
  try {
    Await.result(unlockResult, Duration.fromMilliseconds(lockOpTimeout));
  } catch (TimeoutException toe) {
    // This shouldn't happen unless we lose a watch, and may result in a leaked lock.
    LOG.error("Timeout unlocking {} owned by {} : ", new Object[] { lockPath, lockId, toe });
  } catch (Exception e) {
    LOG.warn("{} failed to unlock {} : ", new Object[] { lockId, lockPath, e });
  }
}

代码示例来源:origin: twitter/distributedlog

ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId0, lockStateExecutor);
ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId1, lockStateExecutor);
lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
assertEquals(State.CLAIMED, lock0.getLockState());
List<String> children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
  lock1.tryLock(timeout, TimeUnit.MILLISECONDS);
  fail("lock1 should fail on locking since lock0 is holding the lock.");
} catch (OwnershipAcquireFailedException oafe) {
  assertEquals(lock0.getLockId().getLeft(), oafe.getCurrentOwner());
assertEquals(State.CLAIMED, lock0.getLockState());
assertEquals(State.CLOSED, lock1.getLockState());
children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
lock0.unlock();
assertEquals(State.CLOSED, lock0.getLockState());
assertEquals(0, getLockWaiters(zkc, lockPath).size());
ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId2, lockStateExecutor);
lock2.tryLock(timeout, TimeUnit.MILLISECONDS);

代码示例来源:origin: twitter/distributedlog

FutureUtils.result(lock0.asyncAcquire());
Pair<String, Long> lockId0_1 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertTrue(lock0.haveLock());
assertEquals(lockId0_1,
    Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(1, children.size());
assertTrue(lock0.haveLock());
Pair<String, Long> lock0_2 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertEquals(lock0_2,
    Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
assertEquals(clientId, lock0_2.getLeft());
assertFalse(lockId0_1.equals(lock0_2));

相关文章