本文整理了Java中kafka.utils.ZkUtils
类的一些代码示例,展示了ZkUtils
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkUtils
类的具体详情如下:
包路径:kafka.utils.ZkUtils
类名称:ZkUtils
暂无
代码示例来源:origin: apache/flink
public ZkUtils getZkUtils() {
LOG.info("In getZKUtils:: zookeeperConnectionString = {}", zookeeperConnectionString);
ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
return ZkUtils.apply(creator, false);
}
代码示例来源:origin: apache/flink
@Override
public void deleteTestTopic(String topic) {
ZkUtils zkUtils = getZkUtils();
try {
LOG.info("Deleting topic {}", topic);
ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
AdminUtils.deleteTopic(zkUtils, topic);
zk.close();
} finally {
zkUtils.close();
}
}
代码示例来源:origin: apache/incubator-gobblin
ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), false);
int partitions = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.PARTITION_COUNT, KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
int replication = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.REPLICATION_COUNT, KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
Properties topicConfig = new Properties();
if(AdminUtils.topicExists(zkUtils, topicName)) {
log.debug("Topic"+topicName+" already Exists with replication: "+replication+" and partitions :"+partitions);
return;
AdminUtils.createTopic(zkUtils, topicName, partitions, replication, topicConfig);
} catch (RuntimeException e) {
throw new RuntimeException(e);
代码示例来源:origin: OryxProject/oryx
/**
* @param zkServers Zookeeper server string: host1:port1[,host2:port2,...]
* @param topic topic to check for existence
* @return {@code true} if and only if the given topic exists
*/
public static boolean topicExists(String zkServers, String topic) {
ZkUtils zkUtils = ZkUtils.apply(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
try {
return AdminUtils.topicExists(zkUtils, topic);
} finally {
zkUtils.close();
}
}
代码示例来源:origin: linkedin/kafka-monitor
/**
* @param zkUrl zookeeper connection url
* @return number of brokers in this cluster
*/
public static int getBrokerCount(String zkUrl) {
ZkUtils zkUtils = ZkUtils.apply(zkUrl, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, JaasUtils.isZkSecurityEnabled());
try {
return zkUtils.getAllBrokersInCluster().size();
} finally {
zkUtils.close();
}
}
代码示例来源:origin: OryxProject/oryx
/**
* @param zkServers Zookeeper server string: host1:port1[,host2:port2,...]
* @param topic topic to delete, if it exists
*/
public static void deleteTopic(String zkServers, String topic) {
ZkUtils zkUtils = ZkUtils.apply(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
try {
if (AdminUtils.topicExists(zkUtils, topic)) {
log.info("Deleting topic {}", topic);
AdminUtils.deleteTopic(zkUtils, topic);
log.info("Deleted Zookeeper topic {}", topic);
} else {
log.info("No need to delete topic {} as it does not exist", topic);
}
} finally {
zkUtils.close();
}
}
代码示例来源:origin: confluentinc/kafka-streams-examples
/**
* Creates and starts the cluster.
*/
public void start() throws Exception {
log.debug("Initiating embedded Kafka cluster startup");
log.debug("Starting a ZooKeeper instance...");
zookeeper = new ZooKeeperEmbedded();
log.debug("ZooKeeper instance is running at {}", zookeeper.connectString());
zkUtils = ZkUtils.apply(
zookeeper.connectString(),
30000,
30000,
JaasUtils.isZkSecurityEnabled());
Properties effectiveBrokerConfig = effectiveBrokerConfigFrom(brokerConfig, zookeeper);
log.debug("Starting a Kafka instance on port {} ...",
effectiveBrokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
broker = new KafkaEmbedded(effectiveBrokerConfig);
log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
broker.brokerList(), broker.zookeeperConnect());
Properties schemaRegistryProps = new Properties();
schemaRegistryProps.put(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG, KAFKASTORE_OPERATION_TIMEOUT_MS);
schemaRegistryProps.put(SchemaRegistryConfig.DEBUG_CONFIG, KAFKASTORE_DEBUG);
schemaRegistryProps.put(SchemaRegistryConfig.KAFKASTORE_INIT_TIMEOUT_CONFIG, KAFKASTORE_INIT_TIMEOUT);
schemaRegistry = new RestApp(0, zookeeperConnect(), KAFKA_SCHEMAS_TOPIC, AVRO_COMPATIBILITY_TYPE, schemaRegistryProps);
schemaRegistry.start();
running = true;
}
代码示例来源:origin: apache/incubator-gobblin
Properties props = new Properties();
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, topicReplicationCount);
props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, topicPartitionCount );
props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, liveZookeeper);
ZkClient zkClient = new ZkClient(
liveZookeeper,
sessionTimeoutMs,
ZKStringSerializer$.MODULE$);
boolean isSecureKafkaCluster = false;
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(liveZookeeper), isSecureKafkaCluster);
AdminUtils.fetchTopicMetadataFromZk(topic,zkUtils);
Assert.assertEquals(metaData.partitionsMetadata().size(), Integer.parseInt(topicPartitionCount));
代码示例来源:origin: apache/drill
public static void createTopicHelper(final String topicName, final int partitions) {
Properties topicProps = new Properties();
topicProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
topicProps.put(TopicConfig.RETENTION_MS_CONFIG, "-1");
ZkUtils zkUtils = new ZkUtils(zkClient,
new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
AdminUtils.createTopic(zkUtils, topicName, partitions, 1,
topicProps, RackAwareMode.Disabled$.MODULE$);
org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk =
AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
logger.info("Topic Metadata: " + fetchTopicMetadataFromZk);
}
代码示例来源:origin: apache/phoenix
@Before
public void setUp() throws IOException, SQLException {
// setup Zookeeper
zkServer = new EmbeddedZookeeper();
String zkConnect = ZKHOST + ":" + zkServer.port();
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
// setup Broker
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
brokerProps.setProperty("log.dirs",
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
KafkaConfig config = new KafkaConfig(brokerProps);
Time mock = new MockTime();
kafkaServer = TestUtils.createServer(config, mock);
kafkaServer.startup();
// create topic
AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties());
pConsumer = new PhoenixConsumer();
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
conn = DriverManager.getConnection(getUrl(), props);
}
代码示例来源:origin: OryxProject/oryx
/**
* @param zkServers Zookeeper server string: host1:port1[,host2:port2,...]
* @param topic topic to create (if not already existing)
* @param partitions number of topic partitions
* @param topicProperties optional topic config properties
*/
public static void maybeCreateTopic(String zkServers,
String topic,
int partitions,
Properties topicProperties) {
ZkUtils zkUtils = ZkUtils.apply(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
try {
if (AdminUtils.topicExists(zkUtils, topic)) {
log.info("No need to create topic {} as it already exists", topic);
} else {
log.info("Creating topic {} with {} partition(s)", topic, partitions);
try {
AdminUtils.createTopic(
zkUtils, topic, partitions, 1, topicProperties, RackAwareMode.Enforced$.MODULE$);
log.info("Created topic {}", topic);
} catch (TopicExistsException re) {
log.info("Topic {} already exists", topic);
}
}
} finally {
zkUtils.close();
}
}
代码示例来源:origin: com.hotels.road/road-kafka-store
@SuppressWarnings({ "rawtypes", "unchecked" })
private static void verifyTopic(ZkUtils zkUtils, String topic) {
Set topics = new HashSet();
topics.add(topic);
// check # partition and the replication factor
scala.collection.mutable.Map partitionAssignmentForTopics = zkUtils
.getPartitionAssignmentForTopics(JavaConversions.asScalaSet(topics).toSeq());
scala.collection.Map partitionAssignment = (scala.collection.Map) partitionAssignmentForTopics.get(topic).get();
if (partitionAssignment.size() != 1) {
throw new RuntimeException(String.format("The schema topic %s should have only 1 partition.", topic));
}
// check the retention policy
Properties prop = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic);
String retentionPolicy = prop.getProperty(LogConfig.CleanupPolicyProp());
if (retentionPolicy == null || "compact".compareTo(retentionPolicy) != 0) {
throw new RuntimeException(String.format("The retention policy of the schema topic %s must be compact.", topic));
}
}
}
代码示例来源:origin: apache/incubator-gobblin
public void provisionTopic(String topic) {
if (_topicConsumerMap.containsKey(topic)) {
// nothing to do: return
} else {
// provision topic
AdminUtils.createTopic(ZkUtils.apply(_kafkaServerSuite.getZkClient(), false),
topic, 1, 1, new Properties());
List<KafkaServer> servers = new ArrayList<>();
servers.add(_kafkaServerSuite.getKafkaServer());
kafka.utils.TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);
KafkaConsumerSuite consumerSuite = new KafkaConsumerSuite(_kafkaServerSuite.getZkConnectString(), topic);
_topicConsumerMap.put(topic, consumerSuite);
}
}
代码示例来源:origin: apache/tajo
public void createTopic(int partitions, int replication, String topic) {
checkState(started.get(), "not started!");
ZkClient zkClient = new ZkClient(getZookeeperConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
try {
AdminUtils.createTopic(ZkUtils.apply(zkClient, false), topic, partitions, replication, new Properties(),
RackAwareMode.Enforced$.MODULE$);
} finally {
zkClient.close();
}
}
代码示例来源:origin: reactor/reactor-kafka
public String createNewTopic(String newTopic, int partitions) {
ZkUtils zkUtils = new ZkUtils(embeddedKafka.zkClient(), null, false);
Properties props = new Properties();
AdminUtils.createTopic(zkUtils, newTopic, partitions, 1, props, null);
waitForTopic(newTopic, partitions, true);
return newTopic;
}
代码示例来源:origin: homeaway/stream-registry
private ZkUtils initZkUtils(Properties config) {
String zkConnect = config.getProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM);
ZkClient zkClient = new ZkClient(zkConnect);
zkClient.setZkSerializer(ZKStringSerializer$.MODULE$);
ZkConnection zkConnection = new ZkConnection(zkConnect);
ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
return zkUtils;
}
代码示例来源:origin: linkedin/kafka-monitor
ZkUtils zkUtils = ZkUtils.apply(zkUrl, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, JaasUtils.isZkSecurityEnabled());
try {
if (AdminUtils.topicExists(zkUtils, topic)) {
return getPartitionNumForTopic(zkUrl, topic);
int brokerCount = zkUtils.getAllBrokersInCluster().size();
int partitionCount = Math.max((int) Math.ceil(brokerCount * partitionToBrokerRatio), minPartitionNum);
AdminUtils.createTopic(zkUtils, topic, partitionCount, replicationFactor, topicConfig, RackAwareMode.Enforced$.MODULE$);
} catch (TopicExistsException e) {
+ topicConfig.get(KafkaConfig.MinInSyncReplicasProp()) + " and replication factor of " + replicationFactor + ".");
zkUtils.close();
代码示例来源:origin: org.apache.kafka/kafka_2.10
public int run(final String[] args, final Properties config) {
consumerConfig.clear();
consumerConfig.putAll(config);
zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
30000,
30000,
allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
zkUtils.close();
代码示例来源:origin: org.apache.apex/malhar-contrib
/**
* There is always only one string in zkHost
* @param zkHost
* @return
*/
public static Set<String> getBrokers(Set<String> zkHost){
ZkClient zkclient = new ZkClient(zkHost.iterator().next(), 30000, 30000, ZKStringSerializer$.MODULE$);
Set<String> brokerHosts = new HashSet<String>();
for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) {
brokerHosts.add(b.connectionString());
}
zkclient.close();
return brokerHosts;
}
代码示例来源:origin: linkedin/cruise-control
public BrokerFailureDetector(KafkaCruiseControlConfig config,
LoadMonitor loadMonitor,
Queue<Anomaly> anomalies,
Time time,
KafkaCruiseControl kafkaCruiseControl) {
String zkUrl = config.getString(KafkaCruiseControlConfig.ZOOKEEPER_CONNECT_CONFIG);
ZkConnection zkConnection = new ZkConnection(zkUrl, 30000);
_zkClient = new ZkClient(zkConnection, 30000, new ZkStringSerializer());
// Do not support secure ZK at this point.
_zkUtils = new ZkUtils(_zkClient, zkConnection, false);
_failedBrokers = new HashMap<>();
_failedBrokersZkPath = config.getString(KafkaCruiseControlConfig.FAILED_BROKERS_ZK_PATH_CONFIG);
_loadMonitor = loadMonitor;
_anomalies = anomalies;
_time = time;
_kafkaCruiseControl = kafkaCruiseControl;
_allowCapacityEstimation = config.getBoolean(KafkaCruiseControlConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG);
}
内容来源于网络,如有侵权,请联系作者删除!