本文整理了Java中com.twitter.util.Future.collect()
方法的一些代码示例,展示了Future.collect()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Future.collect()
方法的具体详情如下:
包路径:com.twitter.util.Future
类名称:Future
方法名:collect
暂无
代码示例来源:origin: twitter/distributedlog
private Future<List<Set<String>>> retrieveLogs() {
Collection<SubNamespace> subNss = subNamespaces.values();
List<Future<Set<String>>> logsList = Lists.newArrayListWithExpectedSize(subNss.size());
for (SubNamespace subNs : subNss) {
logsList.add(subNs.getLogs());
}
return Future.collect(logsList);
}
代码示例来源:origin: twitter/distributedlog
private Future<List<Void>> closeStreams(Set<Stream> streamsToClose, Optional<RateLimiter> rateLimiter) {
if (streamsToClose.isEmpty()) {
logger.info("No streams to close.");
List<Void> emptyList = new ArrayList<Void>();
return Future.value(emptyList);
}
List<Future<Void>> futures = new ArrayList<Future<Void>>(streamsToClose.size());
for (Stream stream : streamsToClose) {
if (rateLimiter.isPresent()) {
rateLimiter.get().acquire();
}
futures.add(stream.requestClose("Close Streams"));
}
return Future.collect(futures);
}
代码示例来源:origin: twitter/distributedlog
@Override
public Future<Void> setAcceptNewStream(boolean enabled) {
Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients();
List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size());
for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) {
futures.add(entry.getValue().getService().setAcceptNewStream(enabled));
}
return Future.collect(futures).map(new Function<List<Void>, Void>() {
@Override
public Void apply(List<Void> list) {
return null;
}
});
}
代码示例来源:origin: twitter/distributedlog
static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
String logRootPath,
boolean ownAllocator) {
// Note re. persistent lock state initialization: the read lock persistent state (path) is
// initialized here but only used in the read handler. The reason is its more convenient and
// less error prone to manage all stream structure in one place.
final String logRootParentPath = new File(logRootPath).getParent();
final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
final String lockPath = logRootPath + LOCK_PATH;
final String readLockPath = logRootPath + READ_LOCK_PATH;
final String versionPath = logRootPath + VERSION_PATH;
final String allocationPath = logRootPath + ALLOCATION_PATH;
int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
checkFutures.add(Utils.zkGetData(zk, versionPath, false));
checkFutures.add(Utils.zkGetData(zk, lockPath, false));
checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
if (ownAllocator) {
checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
}
return Future.collect(checkFutures);
}
代码示例来源:origin: twitter/distributedlog
private void getLastCommitPositions(final Promise<Map<String, DLSN>> result,
List<String> subscribers) {
List<Future<Pair<String, DLSN>>> futures =
new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size());
for (String s : subscribers) {
final String subscriber = s;
Future<Pair<String, DLSN>> future =
// Get the last commit position from zookeeper
getSubscriber(subscriber).getLastCommitPositionFromZK().map(
new AbstractFunction1<DLSN, Pair<String, DLSN>>() {
@Override
public Pair<String, DLSN> apply(DLSN dlsn) {
return Pair.of(subscriber, dlsn);
}
});
futures.add(future);
}
Future.collect(futures).foreach(
new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() {
@Override
public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) {
Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>();
for (Pair<String, DLSN> pair : subscriptions) {
subscriptionMap.put(pair.getLeft(), pair.getRight());
}
result.setValue(subscriptionMap);
return BoxedUnit.UNIT;
}
});
}
代码示例来源:origin: twitter/distributedlog
fetchFutures.add(fetchLogLocation(uri, logName));
Future.collect(fetchFutures).addEventListener(new FutureEventListener<List<Optional<URI>>>() {
@Override
public void onSuccess(List<Optional<URI>> fetchResults) {
代码示例来源:origin: twitter/distributedlog
uriList.add(uri);
Future.collect(futureList).addEventListener(new FutureEventListener<List<Set<String>>>() {
@Override
public void onSuccess(List<Set<String>> resultList) {
代码示例来源:origin: twitter/distributedlog
segmentCandidates = Await.result(Future.collect(futures));
} catch (Exception e) {
throw new IOException("Failed on checking stream " + streamName, e);
代码示例来源:origin: twitter/distributedlog
Future.collect(searchResults).addEventListener(
FutureEventListenerRunnable.of(processSearchResultsListener, executorService));
代码示例来源:origin: twitter/distributedlog
FutureUtils.result(Future.collect(writeFutures));
client.close();
代码示例来源:origin: twitter/distributedlog
@Test(timeout = 60000)
public void testGetLogSegmentNames() throws Exception {
Transaction<Object> createTxn = lsmStore.transaction();
List<LogSegmentMetadata> createdSegments = Lists.newArrayListWithExpectedSize(10);
for (int i = 0; i < 10; i++) {
LogSegmentMetadata segment = createLogSegment(i);
createdSegments.add(segment);
lsmStore.createLogSegment(createTxn, segment);
}
FutureUtils.result(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
List<String> children = zkc.get().getChildren(rootPath, false);
Collections.sort(children);
assertEquals("Should find 10 log segments",
10, children.size());
List<String> logSegmentNames = FutureUtils.result(lsmStore.getLogSegmentNames(rootPath));
Collections.sort(logSegmentNames);
assertEquals("Should find 10 log segments",
10, logSegmentNames.size());
assertEquals(children, logSegmentNames);
List<Future<LogSegmentMetadata>> getFutures = Lists.newArrayListWithExpectedSize(10);
for (int i = 0; i < 10; i++) {
getFutures.add(lsmStore.getLogSegment(rootPath + "/" + logSegmentNames.get(i)));
}
List<LogSegmentMetadata> segments =
FutureUtils.result(Future.collect(getFutures));
for (int i = 0; i < 10; i++) {
assertEquals(createdSegments.get(i), segments.get(i));
}
}
代码示例来源:origin: twitter/distributedlog
List<DLSN> writeResults = Await.result(Future.collect(writePromiseList));
for (int i = 0; i < 5; i++) {
Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
Assert.assertEquals(new DLSN(1L, 1L, (10 + i)), writeResults.get(6 + i));
List<DLSN> recordSetWriteResults = Await.result(Future.collect(recordSetPromiseList));
for (int i = 0; i < 5; i++) {
Assert.assertEquals(new DLSN(1L, 1L, (5 + i)), recordSetWriteResults.get(i));
代码示例来源:origin: twitter/distributedlog
List<DLSN> dlsns = Await.result(Future.collect(futureList));
assertEquals("All 11 records should be written",
numRecords + 1, dlsns.size());
代码示例来源:origin: twitter/distributedlog
assertEquals("Position should still be " + numRecords,
10, writer.getPositionWithinLogSegment());
List<DLSN> dlsns = Await.result(Future.collect(futureList));
assertEquals("All records should be written",
numRecords, dlsns.size());
代码示例来源:origin: twitter/distributedlog
List<DLSN> dlsns = Await.result(Future.collect(futureList));
assertEquals("All first 10 records should be written",
numRecords, dlsns.size());
代码示例来源:origin: twitter/distributedlog
List<DLSN> writeResults = FutureUtils.result(Future.collect(writeFutures));
Assert.assertEquals(new DLSN(1L, 6L + i, 0L), writeResults.get(6 + i));
List<DLSN> recordSetWriteResults = Await.result(Future.collect(recordSetFutures));
for (int i = 0; i < 5; i++) {
Assert.assertEquals(new DLSN(1L, 5L, i), recordSetWriteResults.get(i));
代码示例来源:origin: twitter/distributedlog
List<DLSN> writeResults = Await.result(Future.collect(writePromiseList));
for (int i = 0; i < 10; i++) {
Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
代码示例来源:origin: org.apache.distributedlog/distributedlog-service
private Future<List<Void>> closeStreams(Set<Stream> streamsToClose, Optional<RateLimiter> rateLimiter) {
if (streamsToClose.isEmpty()) {
logger.info("No streams to close.");
List<Void> emptyList = new ArrayList<Void>();
return Future.value(emptyList);
}
List<Future<Void>> futures = new ArrayList<Future<Void>>(streamsToClose.size());
for (Stream stream : streamsToClose) {
if (rateLimiter.isPresent()) {
rateLimiter.get().acquire();
}
futures.add(stream.requestClose("Close Streams"));
}
return Future.collect(futures);
}
代码示例来源:origin: com.twitter/distributedlog-client
@Override
public Future<Void> setAcceptNewStream(boolean enabled) {
Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients();
List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size());
for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) {
futures.add(entry.getValue().getService().setAcceptNewStream(enabled));
}
return Future.collect(futures).map(new Function<List<Void>, Void>() {
@Override
public Void apply(List<Void> list) {
return null;
}
});
}
代码示例来源:origin: org.apache.distributedlog/distributedlog-client
@Override
public Future<Void> setAcceptNewStream(boolean enabled) {
Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients();
List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size());
for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) {
futures.add(entry.getValue().getService().setAcceptNewStream(enabled));
}
return Future.collect(futures).map(new Function<List<Void>, Void>() {
@Override
public Void apply(List<Void> list) {
return null;
}
});
}
内容来源于网络,如有侵权,请联系作者删除!