[英]A lock under a given zookeeper session. This is a one-time lock. It is not reusable: if lock failed, if zookeeper session is expired, if #unlock is called, it would be transitioned to expired or closed state. The Locking Procedure is described as below.
| INIT | ------------------------------+
+--------+--------+ |
| |
| |
+--------v--------+ |
| PREPARING |----------------------------+ |
+--------+--------+ | |
| | |
| | |
+--------v--------+ | |
+-------------| PREPARED |--------------+ | |
| +-----^---------+-+ | | |
| | | | | | |
| | | | | | |
| | | | | | |
+------V-----------+ | | | +--------v----------+ | |
| WAITING |-------+ | | | CLAIMED | | |
+------+-----+-----+ | | +--+----------+-----+ | |
| | | | | | | |
| | | | | | | |
| | | | | | | |
| | | +-v----------v----+ | | |
| +-------------------->| EXPIRED | | | |
| | +--+--------------+ | | |
| | | | | |
| | | | | |
| +--------V-------V-+ | | |
+------------>| CLOSING |
tryAcquire: opstats. latency spent on try locking operations. it includes timeouts.
tryTimeouts: counter. the number of timeouts on try locking operations
unlock: opstats. latency spent on unlock operations.
| INIT | ------------------------------+
+--------+--------+ |
| |
| |
+--------v--------+ |
| PREPARING |----------------------------+ |
+--------+--------+ | |
| | |
| | |
+--------v--------+ | |
+-------------| PREPARED |--------------+ | |
| +-----^---------+-+ | | |
| | | | | | |
| | | | | | |
| | | | | | |
+------V-----------+ | | | +--------v----------+ | |
| WAITING |-------+ | | | CLAIMED | | |
+------+-----+-----+ | | +--+----------+-----+ | |
| | | | | | | |
| | | | | | | |
| | | | | | | |
| | | +-v----------v----+ | | |
| +-------------------->| EXPIRED | | | |
| | +--+--------------+ | | |
| | | | | |
| | | | | |
| +--------V-------V-+ | | |
+------------>| CLOSING |
tryAcquire: opstats. latency spent on try locking operations. it includes timeouts.
tryTimeouts: counter. the number of timeouts on try locking operations
unlock: opstats. latency spent on unlock operations.
代码示例来源:origin: twitter/distributedlog
private void handleNodeDelete(int lockEpoch, final WatchedEvent event) {
executeLockAction(lockEpoch, new LockAction() {
public void execute() {
// The lock is either expired or closed
if (!lockState.inState(State.WAITING)) {
LOG.info("{} ignore watched node {} deleted event, since lock state has moved to {}.",
new Object[] { lockId, event.getPath(), lockState.getState() });
// we don't need to wait and check the result, since:
// 1) if it claimed the ownership, it would notify the waiters when claimed ownerships
// 2) if it failed, it would also notify the waiters, the waiters would cleanup the state.
checkLockOwnerAndWaitIfPossible(watcher, true);
public String getActionName() {
return "handleNodeDelete(path=" + event.getPath() + ")";
代码示例来源:origin: twitter/distributedlog
public int compare(String o1, String o2) {
int l1 = parseMemberID(o1);
int l2 = parseMemberID(o2);
return l1 - l2;
代码示例来源:origin: twitter/distributedlog
public void tryLock(long timeout, TimeUnit unit) throws LockingException {
final Stopwatch stopwatch = Stopwatch.createStarted();
Future<LockWaiter> tryFuture = asyncTryLock(timeout, unit);
LockWaiter waiter = waitForTry(stopwatch, tryFuture);
boolean acquired = waiter.waitForAcquireQuietly();
if (!acquired) {
throw new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner());
代码示例来源:origin: twitter/distributedlog
@Test(timeout = 60000)
public void testLockWhenPreviousLockZnodeStillExists() throws Exception {
String lockPath = "/test-lock-when-previous-lock-znode-still-exists-" +
String clientId = "client-id";
ZooKeeper zk = zkc.get();
createLockPath(zk, lockPath);
final ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId, lockStateExecutor);
// lock0 lock
lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
// simulate lock0 expires but znode still exists
final DistributedLockContext context1 = new DistributedLockContext();
final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor,
60000, NullStatsLogger.INSTANCE, context1);
lock1.tryLock(0L, TimeUnit.MILLISECONDS);
assertEquals(State.CLAIMED, lock1.getLockState());
final DistributedLockContext context2 = new DistributedLockContext();
final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor,
60000, NullStatsLogger.INSTANCE, context2);
lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
assertEquals(State.CLAIMED, lock2.getLockState());
代码示例来源:origin: twitter/distributedlog
assertEquals(((ZKSessionLock) lock0.getInternalLock()).getLockId(),
Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId(),
Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
fail("Should fail check write lock since lock is already held by other people");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
fail("Should fail check write lock since lock is already held by other people");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
fail("Should fail check write lock since lock is already held by other people");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
fail("Should fail check write lock since lock is already held by other people");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
代码示例来源:origin: twitter/distributedlog
String clientId = "test-lock-on-non-existed-lock";
ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
lock.tryLock(0, TimeUnit.MILLISECONDS);
fail("Should fail on locking a non-existed lock.");
} catch (LockingException le) {
assertEquals(KeeperException.Code.NONODE, ((KeeperException) cause).code());
assertEquals(State.CLOSED, lock.getLockState());
lock.tryLock(0, TimeUnit.MILLISECONDS);
fail("Should fail on locking a failure lock.");
} catch (LockStateChangedException lsce) {
assertEquals(State.CLOSED, lock.getLockState());
代码示例来源:origin: twitter/distributedlog
throw new LockingException(lockPath, "Timeout during try phase of lock acquire", toe);
} catch (Exception ex) {
String message = getLockId() + " failed to lock " + lockPath;
throw new LockingException(lockPath, message, ex);
} finally {
代码示例来源:origin: twitter/distributedlog
new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
public void execute() {
lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
public void execute() {
lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
public void execute() {
lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
public void execute() {
代码示例来源:origin: twitter/distributedlog
final ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor)
assertEquals(State.INIT, lock.getLockState());
try {
lock.tryLock(timeout, TimeUnit.MILLISECONDS);
fail("Should fail locking using an expired lock");
} catch (LockingException le) {
assertTrue(le.getCause() instanceof KeeperException.SessionExpiredException);
assertEquals(State.CLOSED, lock.getLockState());
List<String> children = getLockWaiters(zkc, lockPath);
assertEquals(0, children.size());
代码示例来源:origin: twitter/distributedlog
@Test(timeout = 60000)
public void testParseClientID() throws Exception {
ZooKeeper zk = zkc.get();
String lockPath = "/test-parse-clientid";
String clientId = "test-parse-clientid-" + System.currentTimeMillis();
Pair<String, Long> lockId = Pair.of(clientId, zk.getSessionId());
createLockPath(zk, lockPath);
// Correct data
String node1 = getLockIdFromPath(createLockNodeV1(zk, lockPath, clientId));
String node2 = getLockIdFromPath(createLockNodeV2(zk, lockPath, clientId));
String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node1)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node2)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node3)));
// Bad Lock Node Name
String node4 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member"));
String node5 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode"));
String node6 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode"));
String node7 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode"));
String node8 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode_badnode"));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node4)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node5)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node6)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node7)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node8)));
// Malformed Node Name
String node9 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_malformed_s12345678_999999"));
assertEquals(Pair.of("malformed", 12345678L), Await.result(asyncParseClientID(zk, lockPath, node9)));
代码示例来源:origin: twitter/distributedlog
public Future<BoxedUnit> asyncUnlock() {
return asyncUnlock(new LockClosedException(lockPath, lockId, lockState.getState()));
代码示例来源:origin: twitter/distributedlog
private Future<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
final boolean wait) {
final Promise<String> promise = new Promise<String>();
checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise);
return promise;
代码示例来源:origin: twitter/distributedlog
final boolean wait = DistributedLogConstants.LOCK_IMMEDIATE != timeout;
if (wait) {
asyncTryLock(wait, result);
} else {
代码示例来源:origin: twitter/distributedlog
asyncTryLockWithoutCleanup(wait, lockResult);
代码示例来源:origin: twitter/distributedlog
* Test lock after unlock is called.
* @throws Exception
@Test(timeout = 60000)
public void testLockAfterUnlock() throws Exception {
String lockPath = "/test-lock-after-unlock";
String clientId = "test-lock-after-unlock";
ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
assertEquals(State.CLOSED, lock.getLockState());
try {
lock.tryLock(0, TimeUnit.MILLISECONDS);
fail("Should fail on tryLock since lock state has changed.");
} catch (LockStateChangedException lsce) {
// expected
assertEquals(State.CLOSED, lock.getLockState());
try {
lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
fail("Should fail on tryLock immediately if lock state has changed.");
} catch (LockStateChangedException lsce) {
// expected
assertEquals(State.CLOSED, lock.getLockState());
代码示例来源:origin: twitter/distributedlog
Pair<String, Long> lockId0_1 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
Pair<String, Long> lockId0_2 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertFalse("New lock should be created under different session", lockId0_1.equals(lockId0_2));
Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
代码示例来源:origin: twitter/distributedlog
* Test try-create after close race condition.
* @throws Exception
@Test(timeout = 60000)
public void testTryCloseRaceCondition() throws Exception {
String name = testNames.getMethodName();
String lockPath = "/" + name;
String clientId = name;
createLockPath(zkc.get(), lockPath);
ZKSessionLock lock = new ZKSessionLock(
zkc, lockPath, clientId, lockStateExecutor,
1*1000 /* op timeout */, NullStatsLogger.INSTANCE,
new DistributedLockContext());
try {
lock.tryLock(0, TimeUnit.MILLISECONDS);
} catch (LockClosedException ex) {
} finally {
assertEquals(State.CLOSED, lock.getLockState());
List<String> children = getLockWaiters(zkc, lockPath);
assertEquals(0, children.size());
代码示例来源:origin: twitter/distributedlog
public void unlock() {
Future<BoxedUnit> unlockResult = asyncUnlock();
try {
Await.result(unlockResult, Duration.fromMilliseconds(lockOpTimeout));
} catch (TimeoutException toe) {
// This shouldn't happen unless we lose a watch, and may result in a leaked lock.
LOG.error("Timeout unlocking {} owned by {} : ", new Object[] { lockPath, lockId, toe });
} catch (Exception e) {
LOG.warn("{} failed to unlock {} : ", new Object[] { lockId, lockPath, e });
代码示例来源:origin: twitter/distributedlog
ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId0, lockStateExecutor);
ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId1, lockStateExecutor);
lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
assertEquals(State.CLAIMED, lock0.getLockState());
List<String> children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
lock1.tryLock(timeout, TimeUnit.MILLISECONDS);
fail("lock1 should fail on locking since lock0 is holding the lock.");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(lock0.getLockId().getLeft(), oafe.getCurrentOwner());
assertEquals(State.CLAIMED, lock0.getLockState());
assertEquals(State.CLOSED, lock1.getLockState());
children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(State.CLOSED, lock0.getLockState());
assertEquals(0, getLockWaiters(zkc, lockPath).size());
ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId2, lockStateExecutor);
lock2.tryLock(timeout, TimeUnit.MILLISECONDS);
代码示例来源:origin: twitter/distributedlog
Pair<String, Long> lockId0_1 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(1, children.size());
Pair<String, Long> lock0_2 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
assertEquals(clientId, lock0_2.getLeft());