[英]TODO: move ZKException to distributedlog-protocol
代码示例来源:origin: twitter/distributedlog
public static boolean isRetryableZKException(ZKException zke) {
KeeperException.Code code = zke.getKeeperExceptionCode();
return KeeperException.Code.CONNECTIONLOSS == code ||
KeeperException.Code.OPERATIONTIMEOUT == code ||
KeeperException.Code.SESSIONEXPIRED == code ||
KeeperException.Code.SESSIONMOVED == code;
代码示例来源:origin: twitter/distributedlog
public Boolean handle(ZooKeeperClient zkc) throws IOException {
// check existence after syncing
try {
return null != Utils.sync(zkc, logRootPath).exists(logRootPath, false);
} catch (KeeperException e) {
throw new ZKException("Error on checking if log " + logRootPath + " exists", e.code());
} catch (InterruptedException e) {
throw new DLInterruptedException("Interrupted on checking if log " + logRootPath + " exists", e);
}, conf, uri);
代码示例来源:origin: twitter/distributedlog
@Test(timeout = 60000)
public void testAclModifyPermsDlmConflict() throws Exception {
String streamName = "test-stream";
// Reopening and writing again with the same un will succeed.
initDlogMeta("/" + runtime.getMethodName(), "test-un", streamName);
try {
// Reopening and writing again with a different un will fail.
initDlogMeta("/" + runtime.getMethodName(), "not-test-un", streamName);
fail("write should have failed due to perms");
} catch (ZKException ex) {
LOG.info("caught exception trying to write with no perms {}", ex);
assertEquals(KeeperException.Code.NOAUTH, ex.getKeeperExceptionCode());
} catch (Exception ex) {
LOG.info("caught wrong exception trying to write with no perms {}", ex);
fail("wrong exception " + ex.getClass().getName() + " expected " + LockingException.class.getName());
// Should work again.
initDlogMeta("/" + runtime.getMethodName(), "test-un", streamName);
代码示例来源:origin: twitter/distributedlog
protected Future<List<LogSegmentMetadata>> asyncGetLedgerListWithRetries(Comparator<LogSegmentMetadata> comparator,
LogSegmentFilter segmentFilter,
Watcher watcher) {
final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>();
asyncGetLedgerListWithRetries(comparator, segmentFilter, watcher, new GenericCallback<List<LogSegmentMetadata>>() {
public void operationComplete(int rc, List<LogSegmentMetadata> segments) {
if (KeeperException.Code.OK.intValue() == rc) {
} else if (KeeperException.Code.NONODE.intValue() == rc) {
promise.setException(new LogNotFoundException("Log " + getFullyQualifiedName() + " not found"));
} else {
String errMsg = "ZK Exception " + rc + " reading ledger list for " + getFullyQualifiedName();
promise.setException(new ZKException(errMsg, KeeperException.Code.get(rc)));
return promise;
代码示例来源:origin: twitter/distributedlog
@Test(timeout = 60000)
public void testToolCreateZkAclId() throws Exception {
createStream(defaultUri, "0", "CreateAclStream", defaultPrivilegedZkAclId);
try {
DistributedLogManager dlm = DLMTestUtil.createNewDLM("0CreateAclStream", conf, defaultUri);
DLMTestUtil.generateCompletedLogSegments(dlm, conf, 3, 1000);
} catch (ZKException ex) {
assertEquals(KeeperException.Code.NOAUTH, ex.getKeeperExceptionCode());
代码示例来源:origin: twitter/distributedlog
throw new ZKException("Failed to get list of pools from " + rootPath, e);
} catch (InterruptedException e) {
throw new DLInterruptedException("Interrupted on collecting ledgers from allocation pool " + poolPath, e);
} catch (KeeperException e) {
throw new ZKException("Failed to collect ledgers from allocation pool " + poolPath, e.code());
代码示例来源:origin: twitter/distributedlog
@Test(timeout = 60000)
public void testUpdateNonExistentLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> updateTxn = lsmStore.transaction();
lsmStore.updateLogSegment(updateTxn, segment);
try {
fail("Should fail update if log segment doesn't exist");
} catch (Throwable t) {
assertTrue("Should throw NoNodeException if log segment doesn't exist",
t instanceof ZKException);
ZKException zke = (ZKException) t;
assertEquals("Should throw NoNodeException if log segment doesn't exist",
KeeperException.Code.NONODE, zke.getKeeperExceptionCode());
代码示例来源:origin: twitter/distributedlog
} else {
promise.setException(new ZKException("Error reading namespace " + nsRootPath,
} else {
promise.setException(new ZKException("Error reading namespace " + nsRootPath,
代码示例来源:origin: twitter/distributedlog
@Test(timeout = 60000)
public void testDeleteNonExistentLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> deleteTxn = lsmStore.transaction();
lsmStore.deleteLogSegment(deleteTxn, segment);
try {
fail("Should fail deletion if log segment doesn't exist");
} catch (Throwable t) {
assertTrue("Should throw NoNodeException if log segment doesn't exist",
t instanceof ZKException);
ZKException zke = (ZKException) t;
assertEquals("Should throw NoNodeException if log segment doesn't exist",
KeeperException.Code.NONODE, zke.getKeeperExceptionCode());
代码示例来源:origin: twitter/distributedlog
promise.setException(new ZKException("Failed to create log " + logRootPath,
代码示例来源:origin: twitter/distributedlog
fail("Should fail allocating on second allocator as allocator1 is starting allocating something.");
} catch (ZKException zke) {
assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode());
代码示例来源:origin: twitter/distributedlog
throw new ZKException("Error syncing zookeeper connection ",
代码示例来源:origin: twitter/distributedlog
fail("Should fail on storing log segment sequence number if providing bad version");
} catch (ZKException zke) {
assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode());
代码示例来源:origin: twitter/distributedlog
* Wait for the result for a given <i>duration</i>.
* <p>If the result is not ready within `duration`, an IOException will thrown wrapping with
* corresponding {@link com.twitter.util.TimeoutException}.
* @param result result to wait
* @param duration duration to wait
* @return the result
* @throws IOException when encountered exceptions on the result or waiting for the result.
public static <T> T result(Future<T> result, Duration duration)
throws IOException {
try {
return Await.result(result, duration);
} catch (KeeperException ke) {
throw new ZKException("Encountered zookeeper exception on waiting result", ke);
} catch (BKException bke) {
throw new BKTransmitException("Encountered bookkeeper exception on waiting result", bke.getCode());
} catch (IOException ioe) {
throw ioe;
} catch (InterruptedException ie) {
throw new DLInterruptedException("Interrupted on waiting result", ie);
} catch (Exception e) {
throw new IOException("Encountered exception on waiting result", e);
代码示例来源:origin: twitter/distributedlog
fail("Should fail on storing log record transaction id if providing bad version");
} catch (ZKException zke) {
assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode());
代码示例来源:origin: twitter/distributedlog
private void checkLogStreamExists() throws IOException {
try {
if (null == Utils.sync(zooKeeperClient, logMetadata.getLogSegmentsPath())
.exists(logMetadata.getLogSegmentsPath(), false)) {
throw new LogNotFoundException("Log " + getFullyQualifiedName() + " doesn't exist");
} catch (InterruptedException ie) {
LOG.error("Interrupted while reading {}", logMetadata.getLogSegmentsPath(), ie);
throw new DLInterruptedException("Interrupted while checking "
+ logMetadata.getLogSegmentsPath(), ie);
} catch (KeeperException ke) {
LOG.error("Error checking existence for {} : ", logMetadata.getLogSegmentsPath(), ke);
throw new ZKException("Error checking existence for " + getFullyQualifiedName() + " : ", ke);
代码示例来源:origin: twitter/distributedlog
@Test(timeout = 60000)
public void testCreateLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> createTxn = lsmStore.transaction();
lsmStore.createLogSegment(createTxn, segment);
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
zkc.get().exists(segment.getZkPath(), false));
LogSegmentMetadata segment2 = createLogSegment(1L);
Transaction<Object> createTxn2 = lsmStore.transaction();
lsmStore.createLogSegment(createTxn2, segment2);
try {
fail("Should fail if log segment exists");
} catch (Throwable t) {
// expected
assertTrue("Should throw NodeExistsException if log segment exists",
t instanceof ZKException);
ZKException zke = (ZKException) t;
assertEquals("Should throw NodeExistsException if log segment exists",
KeeperException.Code.NODEEXISTS, zke.getKeeperExceptionCode());
代码示例来源:origin: twitter/distributedlog
public void processResult(int syncRc, String path, Object syncCtx) {
if (KeeperException.Code.NONODE.intValue() == syncRc) {
promise.setException(new LogNotFoundException(
String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
} else if (KeeperException.Code.OK.intValue() != syncRc){
promise.setException(new ZKException("Error on checking log existence for " + getFullyQualifiedName(),
zk.exists(logMetadata.getLogSegmentsPath(), false, new AsyncCallback.StatCallback() {
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (KeeperException.Code.OK.intValue() == rc) {
} else if (KeeperException.Code.NONODE.intValue() == rc) {
promise.setException(new LogNotFoundException(String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
} else {
promise.setException(new ZKException("Error on checking log existence for " + getFullyQualifiedName(),
}, null);
}, null);
代码示例来源:origin: twitter/distributedlog
ZKException zke = (ZKException) t;
assertEquals("Transaction is aborted",
KeeperException.Code.NONODE, zke.getKeeperExceptionCode());
代码示例来源:origin: twitter/distributedlog
synchronized void store(ZooKeeperClient zkc, String path, long logSegmentSeqNo) throws IOException {
try {
Stat stat = zkc.get().setData(path,
DLUtils.serializeLogSegmentSequenceNumber(logSegmentSeqNo), getZkVersion());
update(stat.getVersion(), logSegmentSeqNo);
} catch (KeeperException ke) {
throw new ZKException("Error writing max ledger sequence number " + logSegmentSeqNo + " to "
+ path + " : ", ke);
} catch (ZooKeeperClient.ZooKeeperConnectionException zce) {
throw new IOException("Error writing max ledger sequence number " + logSegmentSeqNo + " to "
+ path + " : ", zce);
} catch (InterruptedException e) {
throw new DLInterruptedException("Error writing max ledger sequence number " + logSegmentSeqNo + " to "
+ path + " : ", e);