本文整理了Java中com.twitter.distributedlog.lock.ZKSessionLock
类的一些代码示例,展示了ZKSessionLock
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZKSessionLock
类的具体详情如下:
包路径:com.twitter.distributedlog.lock.ZKSessionLock
类名称:ZKSessionLock
[英]A lock under a given zookeeper session. This is a one-time lock. It is not reusable: if lock failed, if zookeeper session is expired, if #unlock is called, it would be transitioned to expired or closed state. The Locking Procedure is described as below.
+-----------------+
| INIT | ------------------------------+
+--------+--------+ |
| |
| |
+--------v--------+ |
| PREPARING |----------------------------+ |
+--------+--------+ | |
| | |
| | |
+--------v--------+ | |
+-------------| PREPARED |--------------+ | |
| +-----^---------+-+ | | |
| | | | | | |
| | | | | | |
| | | | | | |
+------V-----------+ | | | +--------v----------+ | |
| WAITING |-------+ | | | CLAIMED | | |
+------+-----+-----+ | | +--+----------+-----+ | |
| | | | | | | |
| | | | | | | |
| | | | | | | |
| | | +-v----------v----+ | | |
| +-------------------->| EXPIRED | | | |
| | +--+--------------+ | | |
| | | | | |
| | | | | |
| +--------V-------V-+ | | |
+------------>| CLOSING |
Metrics
tryAcquire: opstats. latency spent on try locking operations. it includes timeouts.
tryTimeouts: counter. the number of timeouts on try locking operations
unlock: opstats. latency spent on unlock operations.
[中]给定zookeeper会话下的锁。这是一次性锁。它不可重复使用:如果锁定失败,如果zookeeper会话过期,如果调用#unlock,它将转换为过期或关闭状态。锁定程序如下所述。
0.如果是即时锁,它会首先得到锁服务人员。如果锁已经被某人持有。如果使用com,它将立即失败。啁啾分布式日志。例外。OwnershipAcquireFailedException与当前所有者。如果没有锁侍者,它将从1开始锁程序。1.准备:创建一个顺序znode来识别锁。2.检查锁侍者:准备好后让所有锁侍者检查。如果是第一个侍者,要求所有权;如果不是第一个服务员,但第一个服务员本身(相同的客户id和相同的会话id)也拥有所有权;否则,它会让观察者监视它的兄弟,并等待它消失。
+-----------------+
| INIT | ------------------------------+
+--------+--------+ |
| |
| |
+--------v--------+ |
| PREPARING |----------------------------+ |
+--------+--------+ | |
| | |
| | |
+--------v--------+ | |
+-------------| PREPARED |--------------+ | |
| +-----^---------+-+ | | |
| | | | | | |
| | | | | | |
| | | | | | |
+------V-----------+ | | | +--------v----------+ | |
| WAITING |-------+ | | | CLAIMED | | |
+------+-----+-----+ | | +--+----------+-----+ | |
| | | | | | | |
| | | | | | | |
| | | | | | | |
| | | +-v----------v----+ | | |
| +-------------------->| EXPIRED | | | |
| | +--+--------------+ | | |
| | | | | |
| | | | | |
| +--------V-------V-+ | | |
+------------>| CLOSING |
Metrics
tryAcquire: opstats. latency spent on try locking operations. it includes timeouts.
tryTimeouts: counter. the number of timeouts on try locking operations
unlock: opstats. latency spent on unlock operations.
代码示例来源:origin: twitter/distributedlog
private void handleNodeDelete(int lockEpoch, final WatchedEvent event) {
executeLockAction(lockEpoch, new LockAction() {
@Override
public void execute() {
// The lock is either expired or closed
if (!lockState.inState(State.WAITING)) {
LOG.info("{} ignore watched node {} deleted event, since lock state has moved to {}.",
new Object[] { lockId, event.getPath(), lockState.getState() });
return;
}
lockState.transition(State.PREPARED);
// we don't need to wait and check the result, since:
// 1) if it claimed the ownership, it would notify the waiters when claimed ownerships
// 2) if it failed, it would also notify the waiters, the waiters would cleanup the state.
checkLockOwnerAndWaitIfPossible(watcher, true);
}
@Override
public String getActionName() {
return "handleNodeDelete(path=" + event.getPath() + ")";
}
});
}
代码示例来源:origin: twitter/distributedlog
public int compare(String o1, String o2) {
int l1 = parseMemberID(o1);
int l2 = parseMemberID(o2);
return l1 - l2;
}
};
代码示例来源:origin: twitter/distributedlog
@Override
public void tryLock(long timeout, TimeUnit unit) throws LockingException {
final Stopwatch stopwatch = Stopwatch.createStarted();
Future<LockWaiter> tryFuture = asyncTryLock(timeout, unit);
LockWaiter waiter = waitForTry(stopwatch, tryFuture);
boolean acquired = waiter.waitForAcquireQuietly();
if (!acquired) {
throw new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner());
}
}
代码示例来源:origin: twitter/distributedlog
@Test(timeout = 60000)
public void testLockWhenPreviousLockZnodeStillExists() throws Exception {
String lockPath = "/test-lock-when-previous-lock-znode-still-exists-" +
System.currentTimeMillis();
String clientId = "client-id";
ZooKeeper zk = zkc.get();
createLockPath(zk, lockPath);
final ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId, lockStateExecutor);
// lock0 lock
lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
// simulate lock0 expires but znode still exists
final DistributedLockContext context1 = new DistributedLockContext();
context1.addLockId(lock0.getLockId());
final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor,
60000, NullStatsLogger.INSTANCE, context1);
lock1.tryLock(0L, TimeUnit.MILLISECONDS);
assertEquals(State.CLAIMED, lock1.getLockState());
lock1.unlock();
final DistributedLockContext context2 = new DistributedLockContext();
context2.addLockId(lock0.getLockId());
final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor,
60000, NullStatsLogger.INSTANCE, context2);
lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
assertEquals(State.CLAIMED, lock2.getLockState());
lock2.unlock();
lock0.unlock();
}
代码示例来源:origin: twitter/distributedlog
assertTrue(lock0.haveLock());
assertFalse(lock1.haveLock());
assertEquals(((ZKSessionLock) lock0.getInternalLock()).getLockId(),
Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId(),
Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
fail("Should fail check write lock since lock is already held by other people");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
oafe.getCurrentOwner());
fail("Should fail check write lock since lock is already held by other people");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
oafe.getCurrentOwner());
fail("Should fail check write lock since lock is already held by other people");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
oafe.getCurrentOwner());
fail("Should fail check write lock since lock is already held by other people");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
oafe.getCurrentOwner());
assertFalse(lock0.haveLock());
assertTrue(lock1.haveLock());
代码示例来源:origin: twitter/distributedlog
String clientId = "test-lock-on-non-existed-lock";
ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
lock.tryLock(0, TimeUnit.MILLISECONDS);
fail("Should fail on locking a non-existed lock.");
} catch (LockingException le) {
assertEquals(KeeperException.Code.NONODE, ((KeeperException) cause).code());
assertEquals(State.CLOSED, lock.getLockState());
lock.tryLock(0, TimeUnit.MILLISECONDS);
fail("Should fail on locking a failure lock.");
} catch (LockStateChangedException lsce) {
assertEquals(State.CLOSED, lock.getLockState());
代码示例来源:origin: twitter/distributedlog
throw new LockingException(lockPath, "Timeout during try phase of lock acquire", toe);
} catch (Exception ex) {
String message = getLockId() + " failed to lock " + lockPath;
throw new LockingException(lockPath, message, ex);
} finally {
unlock();
代码示例来源:origin: twitter/distributedlog
new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
@Override
public void execute() {
lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
@Override
public void execute() {
lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
@Override
public void execute() {
lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
@Override
public void execute() {
代码示例来源:origin: twitter/distributedlog
final ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor)
.setLockListener(listener);
expiredLatch.await();
assertEquals(State.INIT, lock.getLockState());
try {
lock.tryLock(timeout, TimeUnit.MILLISECONDS);
fail("Should fail locking using an expired lock");
} catch (LockingException le) {
assertTrue(le.getCause() instanceof KeeperException.SessionExpiredException);
assertEquals(State.CLOSED, lock.getLockState());
List<String> children = getLockWaiters(zkc, lockPath);
assertEquals(0, children.size());
代码示例来源:origin: twitter/distributedlog
@Test(timeout = 60000)
public void testParseClientID() throws Exception {
ZooKeeper zk = zkc.get();
String lockPath = "/test-parse-clientid";
String clientId = "test-parse-clientid-" + System.currentTimeMillis();
Pair<String, Long> lockId = Pair.of(clientId, zk.getSessionId());
createLockPath(zk, lockPath);
// Correct data
String node1 = getLockIdFromPath(createLockNodeV1(zk, lockPath, clientId));
String node2 = getLockIdFromPath(createLockNodeV2(zk, lockPath, clientId));
String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node1)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node2)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node3)));
// Bad Lock Node Name
String node4 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member"));
String node5 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode"));
String node6 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode"));
String node7 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode"));
String node8 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode_badnode"));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node4)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node5)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node6)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node7)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node8)));
// Malformed Node Name
String node9 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_malformed_s12345678_999999"));
assertEquals(Pair.of("malformed", 12345678L), Await.result(asyncParseClientID(zk, lockPath, node9)));
}
代码示例来源:origin: twitter/distributedlog
@Override
public Future<BoxedUnit> asyncUnlock() {
return asyncUnlock(new LockClosedException(lockPath, lockId, lockState.getState()));
}
代码示例来源:origin: twitter/distributedlog
private Future<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
final boolean wait) {
final Promise<String> promise = new Promise<String>();
checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise);
return promise;
}
代码示例来源:origin: twitter/distributedlog
final boolean wait = DistributedLogConstants.LOCK_IMMEDIATE != timeout;
if (wait) {
asyncTryLock(wait, result);
} else {
代码示例来源:origin: twitter/distributedlog
asyncTryLockWithoutCleanup(wait, lockResult);
代码示例来源:origin: twitter/distributedlog
/**
* Test lock after unlock is called.
*
* @throws Exception
*/
@Test(timeout = 60000)
public void testLockAfterUnlock() throws Exception {
String lockPath = "/test-lock-after-unlock";
String clientId = "test-lock-after-unlock";
ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
lock.unlock();
assertEquals(State.CLOSED, lock.getLockState());
try {
lock.tryLock(0, TimeUnit.MILLISECONDS);
fail("Should fail on tryLock since lock state has changed.");
} catch (LockStateChangedException lsce) {
// expected
}
assertEquals(State.CLOSED, lock.getLockState());
try {
lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
fail("Should fail on tryLock immediately if lock state has changed.");
} catch (LockStateChangedException lsce) {
// expected
}
assertEquals(State.CLOSED, lock.getLockState());
}
代码示例来源:origin: twitter/distributedlog
FutureUtils.result(lock0.asyncAcquire());
Pair<String, Long> lockId0_1 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertTrue(lock0.haveLock());
assertEquals(lockId0_1,
Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
Pair<String, Long> lockId0_2 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertFalse("New lock should be created under different session", lockId0_1.equals(lockId0_2));
assertTrue(lock0.haveLock());
assertEquals(lockId0_2,
Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
代码示例来源:origin: twitter/distributedlog
/**
* Test try-create after close race condition.
*
* @throws Exception
*/
@Test(timeout = 60000)
public void testTryCloseRaceCondition() throws Exception {
String name = testNames.getMethodName();
String lockPath = "/" + name;
String clientId = name;
createLockPath(zkc.get(), lockPath);
ZKSessionLock lock = new ZKSessionLock(
zkc, lockPath, clientId, lockStateExecutor,
1*1000 /* op timeout */, NullStatsLogger.INSTANCE,
new DistributedLockContext());
try {
FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition,
FailpointUtils.DEFAULT_ACTION);
lock.tryLock(0, TimeUnit.MILLISECONDS);
} catch (LockClosedException ex) {
;
} finally {
FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition);
}
assertEquals(State.CLOSED, lock.getLockState());
List<String> children = getLockWaiters(zkc, lockPath);
assertEquals(0, children.size());
}
代码示例来源:origin: twitter/distributedlog
@Override
public void unlock() {
Future<BoxedUnit> unlockResult = asyncUnlock();
try {
Await.result(unlockResult, Duration.fromMilliseconds(lockOpTimeout));
} catch (TimeoutException toe) {
// This shouldn't happen unless we lose a watch, and may result in a leaked lock.
LOG.error("Timeout unlocking {} owned by {} : ", new Object[] { lockPath, lockId, toe });
} catch (Exception e) {
LOG.warn("{} failed to unlock {} : ", new Object[] { lockId, lockPath, e });
}
}
代码示例来源:origin: twitter/distributedlog
ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId0, lockStateExecutor);
ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId1, lockStateExecutor);
lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
assertEquals(State.CLAIMED, lock0.getLockState());
List<String> children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
lock1.tryLock(timeout, TimeUnit.MILLISECONDS);
fail("lock1 should fail on locking since lock0 is holding the lock.");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(lock0.getLockId().getLeft(), oafe.getCurrentOwner());
assertEquals(State.CLAIMED, lock0.getLockState());
assertEquals(State.CLOSED, lock1.getLockState());
children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
lock0.unlock();
assertEquals(State.CLOSED, lock0.getLockState());
assertEquals(0, getLockWaiters(zkc, lockPath).size());
ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId2, lockStateExecutor);
lock2.tryLock(timeout, TimeUnit.MILLISECONDS);
代码示例来源:origin: twitter/distributedlog
FutureUtils.result(lock0.asyncAcquire());
Pair<String, Long> lockId0_1 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertTrue(lock0.haveLock());
assertEquals(lockId0_1,
Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(1, children.size());
assertTrue(lock0.haveLock());
Pair<String, Long> lock0_2 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertEquals(lock0_2,
Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
assertEquals(clientId, lock0_2.getLeft());
assertFalse(lockId0_1.equals(lock0_2));
内容来源于网络,如有侵权,请联系作者删除!