co.cask.coopr.common.queue.internal.ZKQueueGroup类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(3.4k)|赞(0)|评价(0)|浏览(80)

本文整理了Java中co.cask.coopr.common.queue.internal.ZKQueueGroup类的一些代码示例,展示了ZKQueueGroup类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZKQueueGroup类的具体详情如下:
包路径:co.cask.coopr.common.queue.internal.ZKQueueGroup
类名称:ZKQueueGroup

ZKQueueGroup介绍

[英]Implementation of a QueueGroup that uses queues built on zookeeper. Whenever a queue name is referenced in a method, the queue is cached and the physical zookeeper queue is created if it does not already exist. Watches zookeeper to make sure queues added or deleted by another instance of the group are reflected in this group.
[中]使用zookeeper上构建的队列的QueueGroup的实现。每当在方法中引用队列名称时,就会缓存该队列,并创建物理zookeeper队列(如果该队列尚不存在)。监视zookeeper以确保由组的另一个实例添加或删除的队列反映在此组中。

代码示例

代码示例来源:origin: caskdata/coopr

@Inject
private ZKQueueService(ZKClient zkClient) {
 ImmutableMap.Builder<QueueType, QueueGroup> builder = ImmutableMap.builder();
 for (QueueType type : QueueType.GROUP_TYPES) {
  builder.put(type, new ZKQueueGroup(zkClient, type));
 }
 this.queueGroups = builder.build();
}

代码示例来源:origin: caskdata/coopr

@Override
protected void startUp() throws Exception {
 Futures.getUnchecked(ZKClientExt.ensureExists(zkClient, queueType.getPath()));
 refreshQueues(Futures.getUnchecked(zkClient.getChildren(queueType.getPath())));
 ZKOperations.watchChildren(zkClient, queueType.getPath(), new ZKOperations.ChildrenCallback() {
  @Override
  public void updated(NodeChildren nodeChildren) {
   refreshQueues(nodeChildren);
  }
 });
}

代码示例来源:origin: caskdata/coopr

@Test
public void testInstanceInitializedWithExistingData() throws Exception {
 QueueGroup instance1 = new ZKQueueGroup(zkClient, QueueType.PROVISIONER);
 instance1.startAndWait();
 instance1.add("tenant1", new Element("val1"));
 instance1.add("tenant2", new Element("val2"));
 QueueGroup instance2 = new ZKQueueGroup(zkClient, QueueType.PROVISIONER);
 instance2.startAndWait();
 waitForQueueNames(Sets.newHashSet("tenant1", "tenant2"), instance2);
 instance1.stop();
 instance2.stop();
}

代码示例来源:origin: caskdata/coopr

@Test
public void testChangesSeenAcrossInstances() throws Exception {
 QueueGroup instance1 = new ZKQueueGroup(zkClient, QueueType.PROVISIONER);
 QueueGroup instance2 = new ZKQueueGroup(zkClient, QueueType.PROVISIONER);
 instance1.startAndWait();
 instance2.startAndWait();
 // add a queue for tenant3 with 2 elements
 String tenant = "tenantX";
 Set<String> expectedQueueNames = Sets.newHashSet(tenant);
 instance1.add(tenant, new Element("id3-1", "val1"));
 instance1.add(tenant, new Element("id3-2", "val2"));
 // check both instances see tenant3
 Assert.assertEquals(expectedQueueNames, instance1.getQueueNames());
 waitForQueueNames(expectedQueueNames, instance2);
 // make sure each instance gets an accurate picture of the queue
 Iterator<GroupElement> queuesIter1 = instance1.takeIterator("consumer1");
 Iterator<GroupElement> queuesIter2 = instance1.takeIterator("consumer2");
 GroupElement gelement = queuesIter1.next();
 Assert.assertEquals(tenant, gelement.getQueueName());
 Assert.assertEquals("id3-1", gelement.getElement().getId());
 Assert.assertEquals("val1", gelement.getElement().getValue());
 gelement = queuesIter2.next();
 Assert.assertEquals(tenant, gelement.getQueueName());
 Assert.assertEquals("id3-2", gelement.getElement().getId());
 Assert.assertEquals("val2", gelement.getElement().getValue());
 Assert.assertFalse(queuesIter1.hasNext());
 Assert.assertFalse(queuesIter2.hasNext());
 instance1.stop();
 instance2.stop();
}

相关文章

ZKQueueGroup类方法