
x33g5p2x  于2022-02-05 转载在 其他  



[英]A lock under a given zookeeper session. This is a one-time lock. It is not reusable: if lock failed, if zookeeper session is expired, if #unlock is called, it would be transitioned to expired or closed state. The Locking Procedure is described as below.

  1. if it is an immediate lock, it would get lock waiters first. if the lock is already held by someone. it would fail immediately with com.twitter.distributedlog.exceptions.OwnershipAcquireFailedExceptionwith current owner. if there is no lock waiters, it would start locking procedure from 1. 1. prepare: create a sequential znode to identify the lock. 2. check lock waiters: get all lock waiters to check after prepare. if it is the first waiter, claim the ownership; if it is not the first waiter, but first waiter was itself (same client id and same session id) claim the ownership too; otherwise, it would set watcher on its sibling and wait it to disappared.
|       INIT      | ------------------------------+ 
+--------+--------+                               | 
|                                        | 
|                                        | 
+--------v--------+                               | 
|    PREPARING    |----------------------------+  | 
+--------+--------+                            |  | 
|                                     |  | 
|                                     |  | 
+--------v--------+                            |  | 
+-------------|    PREPARED     |--------------+             |  | 
|             +-----^---------+-+              |             |  | 
|                   |  |      |                |             |  | 
|                   |  |      |                |             |  | 
|                   |  |      |                |             |  | 
+------V-----------+       |  |      |       +--------v----------+  |  | 
|     WAITING      |-------+  |      |       |    CLAIMED        |  |  | 
+------+-----+-----+          |      |       +--+----------+-----+  |  | 
|     |                |      |          |        |          |  | 
|     |                |      |          |        |          |  | 
|     |                |      |          |        |          |  | 
|     |                |    +-v----------v----+   |          |  | 
|     +-------------------->|     EXPIRED     |   |          |  | 
|                      |    +--+--------------+   |          |  | 
|                      |       |                  |          |  | 
|                      |       |                  |          |  | 
|             +--------V-------V-+                |          |  | 
+------------>|     CLOSING      | 
 tryAcquire: opstats. latency spent on try locking operations. it includes timeouts. 
 tryTimeouts: counter. the number of timeouts on try locking operations 
 unlock: opstats. latency spent on unlock operations.


|       INIT      | ------------------------------+ 
+--------+--------+                               | 
|                                        | 
|                                        | 
+--------v--------+                               | 
|    PREPARING    |----------------------------+  | 
+--------+--------+                            |  | 
|                                     |  | 
|                                     |  | 
+--------v--------+                            |  | 
+-------------|    PREPARED     |--------------+             |  | 
|             +-----^---------+-+              |             |  | 
|                   |  |      |                |             |  | 
|                   |  |      |                |             |  | 
|                   |  |      |                |             |  | 
+------V-----------+       |  |      |       +--------v----------+  |  | 
|     WAITING      |-------+  |      |       |    CLAIMED        |  |  | 
+------+-----+-----+          |      |       +--+----------+-----+  |  | 
|     |                |      |          |        |          |  | 
|     |                |      |          |        |          |  | 
|     |                |      |          |        |          |  | 
|     |                |    +-v----------v----+   |          |  | 
|     +-------------------->|     EXPIRED     |   |          |  | 
|                      |    +--+--------------+   |          |  | 
|                      |       |                  |          |  | 
|                      |       |                  |          |  | 
|             +--------V-------V-+                |          |  | 
+------------>|     CLOSING      | 
 tryAcquire: opstats. latency spent on try locking operations. it includes timeouts. 
 tryTimeouts: counter. the number of timeouts on try locking operations 
 unlock: opstats. latency spent on unlock operations.


代码示例来源:origin: twitter/distributedlog

private void handleNodeDelete(int lockEpoch, final WatchedEvent event) {
  executeLockAction(lockEpoch, new LockAction() {
    public void execute() {
      // The lock is either expired or closed
      if (!lockState.inState(State.WAITING)) {"{} ignore watched node {} deleted event, since lock state has moved to {}.",
            new Object[] { lockId, event.getPath(), lockState.getState() });
      // we don't need to wait and check the result, since:
      // 1) if it claimed the ownership, it would notify the waiters when claimed ownerships
      // 2) if it failed, it would also notify the waiters, the waiters would cleanup the state.
      checkLockOwnerAndWaitIfPossible(watcher, true);
    public String getActionName() {
      return "handleNodeDelete(path=" + event.getPath() + ")";

代码示例来源:origin: twitter/distributedlog

public int compare(String o1, String o2) {
    int l1 = parseMemberID(o1);
    int l2 = parseMemberID(o2);
    return l1 - l2;

代码示例来源:origin: twitter/distributedlog

public void tryLock(long timeout, TimeUnit unit) throws LockingException {
  final Stopwatch stopwatch = Stopwatch.createStarted();
  Future<LockWaiter> tryFuture = asyncTryLock(timeout, unit);
  LockWaiter waiter = waitForTry(stopwatch, tryFuture);
  boolean acquired = waiter.waitForAcquireQuietly();
  if (!acquired) {
    throw new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner());

代码示例来源:origin: twitter/distributedlog

@Test(timeout = 60000)
public void testLockWhenPreviousLockZnodeStillExists() throws Exception {
  String lockPath = "/test-lock-when-previous-lock-znode-still-exists-" +
  String clientId = "client-id";
  ZooKeeper zk = zkc.get();
  createLockPath(zk, lockPath);
  final ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId, lockStateExecutor);
  // lock0 lock
  lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
  // simulate lock0 expires but znode still exists
  final DistributedLockContext context1 = new DistributedLockContext();
  final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor,
      60000, NullStatsLogger.INSTANCE, context1);
  lock1.tryLock(0L, TimeUnit.MILLISECONDS);
  assertEquals(State.CLAIMED, lock1.getLockState());
  final DistributedLockContext context2 = new DistributedLockContext();
  final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor,
      60000, NullStatsLogger.INSTANCE, context2);
  lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
  assertEquals(State.CLAIMED, lock2.getLockState());

代码示例来源:origin: twitter/distributedlog

assertEquals(((ZKSessionLock) lock0.getInternalLock()).getLockId(),
    Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId(),
    Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
    fail("Should fail check write lock since lock is already held by other people");
  } catch (OwnershipAcquireFailedException oafe) {
    assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
    fail("Should fail check write lock since lock is already held by other people");
  } catch (OwnershipAcquireFailedException oafe) {
    assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
    fail("Should fail check write lock since lock is already held by other people");
  } catch (OwnershipAcquireFailedException oafe) {
    assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
    fail("Should fail check write lock since lock is already held by other people");
  } catch (OwnershipAcquireFailedException oafe) {
    assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),

代码示例来源:origin: twitter/distributedlog

String clientId = "test-lock-on-non-existed-lock";
ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
  lock.tryLock(0, TimeUnit.MILLISECONDS);
  fail("Should fail on locking a non-existed lock.");
} catch (LockingException le) {
  assertEquals(KeeperException.Code.NONODE, ((KeeperException) cause).code());
assertEquals(State.CLOSED, lock.getLockState());
  lock.tryLock(0, TimeUnit.MILLISECONDS);
  fail("Should fail on locking a failure lock.");
} catch (LockStateChangedException lsce) {
assertEquals(State.CLOSED, lock.getLockState());

代码示例来源:origin: twitter/distributedlog

throw new LockingException(lockPath, "Timeout during try phase of lock acquire", toe);
} catch (Exception ex) {
  String message = getLockId() + " failed to lock " + lockPath;
  throw new LockingException(lockPath, message, ex);
} finally {

代码示例来源:origin: twitter/distributedlog

new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
  public void execute() {
lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
  public void execute() {
lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
  public void execute() {
lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
  public void execute() {

代码示例来源:origin: twitter/distributedlog

final ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor)
assertEquals(State.INIT, lock.getLockState());
try {
  lock.tryLock(timeout, TimeUnit.MILLISECONDS);
  fail("Should fail locking using an expired lock");
} catch (LockingException le) {
  assertTrue(le.getCause() instanceof KeeperException.SessionExpiredException);
assertEquals(State.CLOSED, lock.getLockState());
List<String> children = getLockWaiters(zkc, lockPath);
assertEquals(0, children.size());

代码示例来源:origin: twitter/distributedlog

@Test(timeout = 60000)
public void testParseClientID() throws Exception {
  ZooKeeper zk = zkc.get();
  String lockPath = "/test-parse-clientid";
  String clientId = "test-parse-clientid-" + System.currentTimeMillis();
  Pair<String, Long> lockId = Pair.of(clientId, zk.getSessionId());
  createLockPath(zk, lockPath);
  // Correct data
  String node1 = getLockIdFromPath(createLockNodeV1(zk, lockPath, clientId));
  String node2 = getLockIdFromPath(createLockNodeV2(zk, lockPath, clientId));
  String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node1)));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node2)));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node3)));
  // Bad Lock Node Name
  String node4 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member"));
  String node5 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode"));
  String node6 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode"));
  String node7 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode"));
  String node8 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode_badnode"));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node4)));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node5)));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node6)));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node7)));
  assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node8)));
  // Malformed Node Name
  String node9 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_malformed_s12345678_999999"));
  assertEquals(Pair.of("malformed", 12345678L), Await.result(asyncParseClientID(zk, lockPath, node9)));

代码示例来源:origin: twitter/distributedlog

public Future<BoxedUnit> asyncUnlock() {
  return asyncUnlock(new LockClosedException(lockPath, lockId, lockState.getState()));

代码示例来源:origin: twitter/distributedlog

private Future<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
                            final boolean wait) {
  final Promise<String> promise = new Promise<String>();
  checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise);
  return promise;

代码示例来源:origin: twitter/distributedlog

final boolean wait = DistributedLogConstants.LOCK_IMMEDIATE != timeout;
if (wait) {
  asyncTryLock(wait, result);
} else {

代码示例来源:origin: twitter/distributedlog

asyncTryLockWithoutCleanup(wait, lockResult);

代码示例来源:origin: twitter/distributedlog

 * Test lock after unlock is called.
 * @throws Exception
@Test(timeout = 60000)
public void testLockAfterUnlock() throws Exception {
  String lockPath = "/test-lock-after-unlock";
  String clientId = "test-lock-after-unlock";
  ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
  assertEquals(State.CLOSED, lock.getLockState());
  try {
    lock.tryLock(0, TimeUnit.MILLISECONDS);
    fail("Should fail on tryLock since lock state has changed.");
  } catch (LockStateChangedException lsce) {
    // expected
  assertEquals(State.CLOSED, lock.getLockState());
  try {
    lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    fail("Should fail on tryLock immediately if lock state has changed.");
  } catch (LockStateChangedException lsce) {
    // expected
  assertEquals(State.CLOSED, lock.getLockState());

代码示例来源:origin: twitter/distributedlog

Pair<String, Long> lockId0_1 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
    Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
Pair<String, Long> lockId0_2 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertFalse("New lock should be created under different session", lockId0_1.equals(lockId0_2));
    Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));

代码示例来源:origin: twitter/distributedlog

 * Test try-create after close race condition.
 * @throws Exception
@Test(timeout = 60000)
public void testTryCloseRaceCondition() throws Exception {
  String name = testNames.getMethodName();
  String lockPath = "/" + name;
  String clientId = name;
  createLockPath(zkc.get(), lockPath);
  ZKSessionLock lock = new ZKSessionLock(
      zkc, lockPath, clientId, lockStateExecutor,
      1*1000 /* op timeout */, NullStatsLogger.INSTANCE,
      new DistributedLockContext());
  try {
    lock.tryLock(0, TimeUnit.MILLISECONDS);
  } catch (LockClosedException ex) {
  } finally {
  assertEquals(State.CLOSED, lock.getLockState());
  List<String> children = getLockWaiters(zkc, lockPath);
  assertEquals(0, children.size());

代码示例来源:origin: twitter/distributedlog

public void unlock() {
  Future<BoxedUnit> unlockResult = asyncUnlock();
  try {
    Await.result(unlockResult, Duration.fromMilliseconds(lockOpTimeout));
  } catch (TimeoutException toe) {
    // This shouldn't happen unless we lose a watch, and may result in a leaked lock.
    LOG.error("Timeout unlocking {} owned by {} : ", new Object[] { lockPath, lockId, toe });
  } catch (Exception e) {
    LOG.warn("{} failed to unlock {} : ", new Object[] { lockId, lockPath, e });

代码示例来源:origin: twitter/distributedlog

ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId0, lockStateExecutor);
ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId1, lockStateExecutor);
lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
assertEquals(State.CLAIMED, lock0.getLockState());
List<String> children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
  lock1.tryLock(timeout, TimeUnit.MILLISECONDS);
  fail("lock1 should fail on locking since lock0 is holding the lock.");
} catch (OwnershipAcquireFailedException oafe) {
  assertEquals(lock0.getLockId().getLeft(), oafe.getCurrentOwner());
assertEquals(State.CLAIMED, lock0.getLockState());
assertEquals(State.CLOSED, lock1.getLockState());
children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(State.CLOSED, lock0.getLockState());
assertEquals(0, getLockWaiters(zkc, lockPath).size());
ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId2, lockStateExecutor);
lock2.tryLock(timeout, TimeUnit.MILLISECONDS);

代码示例来源:origin: twitter/distributedlog

Pair<String, Long> lockId0_1 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
    Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(1, children.size());
Pair<String, Long> lock0_2 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
    Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
assertEquals(clientId, lock0_2.getLeft());
