我对风暴和Druid都是新手。从最近几天开始,我一直在讨论这个问题。我正在把Kafka的数据发送给斯托姆,然后再发送给Druid。我认为druidbeambolt正在接收数据,但在传输到druid之前无法将其转换为json。查看我的druidoltfactory代码以获取更多详细信息。如果每个人都需要更多关于代码的信息,那么请告诉我,提前谢谢
这就是错误
java.lang.NoClassDefFoundError: Could not initialize class com.metamx.tranquility.druid.DruidBeams$
at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[stormjar.jar:na]
at io.weblytics.backend.enricher.bolt.DruidBoltFactory.makeBeam(DruidBoltFactory.java:57) ~[stormjar.jar:na]
at io.weblytics.backend.enricher.bolt.DruidBeamBolt.prepare(DruidBeamBolt.java:48) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:38.961+0000 b.s.util [ERROR] Async loop died!
java.lang.NoClassDefFoundError: Could not initialize class com.metamx.tranquility.druid.DruidBeams$
at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[stormjar.jar:na]
at io.weblytics.backend.enricher.bolt.DruidBoltFactory.makeBeam(DruidBoltFactory.java:57) ~[stormjar.jar:na]
at io.weblytics.backend.enricher.bolt.DruidBeamBolt.prepare(DruidBeamBolt.java:48) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:38.961+0000 b.s.util [ERROR] Async loop died!
java.lang.VerifyError: class com.fasterxml.jackson.module.scala.ser.ScalaIteratorSerializer overrides final method withResolved.(Lcom/fasterxml/jackson/databind/BeanProperty;Lcom/fasterxml/jackson/databind/jsontype/TypeSerializer;Lcom/fasterxml/jackson/databind/JsonSerializer;)Lcom/fasterxml/jackson/databind/ser/std/AsArraySerializerBase;
at java.lang.ClassLoader.defineClass1(Native Method) ~[na:1.8.0_91]
at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[na:1.8.0_91]
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[na:1.8.0_91]
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[na:1.8.0_91]
at java.net.URLClassLoader.access$100(URLClassLoader.java:73) ~[na:1.8.0_91]
at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[na:1.8.0_91]
at java.net.URLClassLoader$1.run(URLClassLoader.java:362) ~[na:1.8.0_91]
at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_91]
at java.net.URLClassLoader.findClass(URLClassLoader.java:361) ~[na:1.8.0_91]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_91]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[na:1.8.0_91]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_91]
at com.fasterxml.jackson.module.scala.ser.IteratorSerializerModule$class.$init$(IteratorSerializerModule.scala:70) ~[stormjar.jar:na]
at com.fasterxml.jackson.module.scala.DefaultScalaModule.<init>(DefaultScalaModule.scala:19) ~[stormjar.jar:na]
at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<init>(DefaultScalaModule.scala:35) ~[stormjar.jar:na]
at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<clinit>(DefaultScalaModule.scala) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:70) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:68) ~[stormjar.jar:na]
at com.metamx.common.scala.Predef$EffectOps.withEffect(Predef.scala:44) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:67) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:64) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$class.$init$(Jackson.scala:14) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$.<init>(Jackson.scala:10) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$.<clinit>(Jackson.scala) ~[stormjar.jar:na]
at com.metamx.tranquility.druid.DruidBeams$.<init>(DruidBeams.scala:107) ~[stormjar.jar:na]
at com.metamx.tranquility.druid.DruidBeams$.<clinit>(DruidBeams.scala) ~[stormjar.jar:na]
at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[stormjar.jar:na]
at io.weblytics.backend.enricher.bolt.DruidBoltFactory.makeBeam(DruidBoltFactory.java:57) ~[stormjar.jar:na]
at io.weblytics.backend.enricher.bolt.DruidBeamBolt.prepare(DruidBeamBolt.java:48) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:38.962+0000 b.s.d.executor [ERROR]
java.lang.NoClassDefFoundError: Could not initialize class com.metamx.tranquility.druid.DruidBeams$
at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[stormjar.jar:na]
at io.weblytics.backend.enricher.bolt.DruidBoltFactory.makeBeam(DruidBoltFactory.java:57) ~[stormjar.jar:na]
at io.weblytics.backend.enricher.bolt.DruidBeamBolt.prepare(DruidBeamBolt.java:48) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:38.962+0000 b.s.d.executor [ERROR]
java.lang.VerifyError: class com.fasterxml.jackson.module.scala.ser.ScalaIteratorSerializer overrides final method withResolved.(Lcom/fasterxml/jackson/databind/BeanProperty;Lcom/fasterxml/jackson/databind/jsontype/TypeSerializer;Lcom/fasterxml/jackson/databind/JsonSerializer;)Lcom/fasterxml/jackson/databind/ser/std/AsArraySerializerBase;
at java.lang.ClassLoader.defineClass1(Native Method) ~[na:1.8.0_91]
at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[na:1.8.0_91]
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[na:1.8.0_91]
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[na:1.8.0_91]
at java.net.URLClassLoader.access$100(URLClassLoader.java:73) ~[na:1.8.0_91]
at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[na:1.8.0_91]
at java.net.URLClassLoader$1.run(URLClassLoader.java:362) ~[na:1.8.0_91]
at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_91]
at java.net.URLClassLoader.findClass(URLClassLoader.java:361) ~[na:1.8.0_91]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_91]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[na:1.8.0_91]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_91]
at com.fasterxml.jackson.module.scala.ser.IteratorSerializerModule$class.$init$(IteratorSerializerModule.scala:70) ~[stormjar.jar:na]
at com.fasterxml.jackson.module.scala.DefaultScalaModule.<init>(DefaultScalaModule.scala:19) ~[stormjar.jar:na]
at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<init>(DefaultScalaModule.scala:35) ~[stormjar.jar:na]
at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<clinit>(DefaultScalaModule.scala) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:70) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:68) ~[stormjar.jar:na]
at com.metamx.common.scala.Predef$EffectOps.withEffect(Predef.scala:44) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:67) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:64) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$class.$init$(Jackson.scala:14) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$.<init>(Jackson.scala:10) ~[stormjar.jar:na]
at com.metamx.common.scala.Jackson$.<clinit>(Jackson.scala) ~[stormjar.jar:na]
at com.metamx.tranquility.druid.DruidBeams$.<init>(DruidBeams.scala:107) ~[stormjar.jar:na]
at com.metamx.tranquility.druid.DruidBeams$.<clinit>(DruidBeams.scala) ~[stormjar.jar:na]
at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[stormjar.jar:na]
at io.weblytics.backend.enricher.bolt.DruidBoltFactory.makeBeam(DruidBoltFactory.java:57) ~[stormjar.jar:na]
at io.weblytics.backend.enricher.bolt.DruidBeamBolt.prepare(DruidBeamBolt.java:48) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:38.965+0000 b.s.d.executor [ERROR]
java.lang.NoClassDefFoundError: Could not initialize class com.metamx.tranquility.druid.DruidBeams$
at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[stormjar.jar:na]
at io.weblytics.backend.enricher.bolt.DruidBoltFactory.makeBeam(DruidBoltFactory.java:57) ~[stormjar.jar:na]
at io.weblytics.backend.enricher.bolt.DruidBeamBolt.prepare(DruidBeamBolt.java:48) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:38.967+0000 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-zkserver1/172.10.1.133:6703... [7]
2016-09-26T07:12:39.006+0000 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__3812$fn__3813.invoke(worker.clj:456) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:39.011+0000 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__3812$fn__3813.invoke(worker.clj:456) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:39.011+0000 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__3812$fn__3813.invoke(worker.clj:456) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
- 这是我的暴风拓扑课*
package io.web.backend.enricher;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import io.weblytics.backend.enricher.bolt.DruidBeamBolt;
import io.weblytics.backend.enricher.bolt.DruidBoltFactory;
import io.weblytics.backend.enricher.bolt.GenderBolt;
import io.weblytics.backend.enricher.druid.DruidBeamFactory;
import io.weblytics.backend.enricher.druid.DruidConfig;
import io.weblytics.backend.enricher.druid.ITupleDruidEventMapper;
import io.weblytics.backend.enricher.druid.TupleDruidEventMapper;
import io.weblytics.backend.enricher.spout.KafkaSpoutBuilder;
import java.util.HashMap;
import java.util.Map;
public class StormTopology {
public static void main(String[] args) throws Exception {
// BeamBolt<Map<String, Object>> beamBolt = new BeamBolt<>(new DruidBoltFactory());
DruidBeamFactory druidBeamFactory = new DruidBoltFactory(new HashMap<String, Object>());
DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutBuilder ksb = new KafkaSpoutBuilder();
builder.setSpout("kafka-spout", ksb.buildKafkaSpout(), 1);
builder.setBolt("gender-bolt", new GenderBolt(), 1)
.shuffleGrouping("kafka-spout");
// builder.setBolt("age-bolt", new AgeBolt(), 10)
// .shuffleGrouping("gender-bolt");
builder.setBolt("druid-bolt", druidBolt, 10)
.shuffleGrouping("gender-bolt");
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("we-enricher", conf, builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();
}
}
}
这是我的Druid光束螺栓
package io.web.backend.enricher.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.metamx.tranquility.tranquilizer.MessageDroppedException;
import com.metamx.tranquility.tranquilizer.Tranquilizer;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import io.weblytics.backend.enricher.druid.DruidBeamFactory;
import io.weblytics.backend.enricher.druid.DruidConfig;
import io.weblytics.backend.enricher.druid.ITupleDruidEventMapper;
import java.util.Map;
public class DruidBeamBolt<E> extends BaseRichBolt {
private volatile OutputCollector collector;
private DruidBeamFactory<E> beamFactory = null;
private DruidConfig druidConfig = null;
private Tranquilizer<E> tranquilizer = null;
private ITupleDruidEventMapper<E> druidEventMapper = null;
public DruidBeamBolt(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper, DruidConfig druidConfig) {
this.beamFactory = beamFactory;
this.druidConfig = druidConfig;
this.druidEventMapper = druidEventMapper;
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
tranquilizer = Tranquilizer.builder()
.maxBatchSize(druidConfig.getMaxBatchSize())
.maxPendingBatches(druidConfig.getMaxPendingBatches())
.lingerMillis(druidConfig.getLingerMillis())
.blockOnFull(druidConfig.isBlockOnFull())
.build(beamFactory.makeBeam(stormConf, context));
this.tranquilizer.start();
}
@Override
public void execute(final Tuple tuple) {
System.out.print("inside if 12");
Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
future.addEventListener(new FutureEventListener() {
@Override
public void onFailure(Throwable cause) {
if (cause instanceof MessageDroppedException) {
System.out.print("inside if");
collector.ack(tuple);
if (druidConfig.getDiscardStreamId() != null)
collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple, System.currentTimeMillis()));
}
else {
System.out.print("inside else");
collector.fail(tuple);
}
}
@Override
public void onSuccess(Object value) {
System.out.print("wking");
collector.ack(tuple);
}
});
}
@Override
public void cleanup() {
tranquilizer.stop();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(druidConfig.getDiscardStreamId(), new Fields("tuple", "timestamp"));
}
}
这是我的Druid螺栓工厂,我想问题出在这个螺栓上。尤其是在创建对象生成器时。生成器通过jackson将事件类型在内部序列化为druid可以理解的json。
package io.weblytics.backend.enricher.bolt;
import backtype.storm.task.IMetricsContext;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.metamx.common.Granularity;
import com.metamx.tranquility.beam.Beam;
import com.metamx.tranquility.beam.ClusteredBeamTuning;
import com.metamx.tranquility.druid.DruidBeams;
import com.metamx.tranquility.druid.DruidLocation;
import com.metamx.tranquility.druid.DruidRollup;
import com.metamx.tranquility.storm.BeamFactory;
import com.metamx.tranquility.typeclass.Timestamper;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.weblytics.backend.enricher.druid.DruidBeamFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.curator.retry.RetryOneTime;
import org.joda.time.DateTime;
import org.joda.time.Period;
import java.util.List;
import java.util.Map;
public class DruidBoltFactory implements DruidBeamFactory<Map<String, Object>> {
Map<String, Object> factoryConf = null;
public DruidBoltFactory(Map<String, Object> factoryConf){
this.factoryConf = factoryConf;
}
@Override
public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
try {
final CuratorFramework curator = CuratorFrameworkFactory.newClient(
"172.10.1.252:2181", new BoundedExponentialBackoffRetry(100, 1000, 5));
curator.start();
final List<String> dimensions = ImmutableList.of("interests", "origin", "user_id",
"geo_country", "timestamp", "age", "site_id", "value", "server",
"http_method", "cust_id", "gender","device","http_code","geo_region","page","unit");
final List<AggregatorFactory> aggregators = ImmutableList
.<AggregatorFactory>of(new CountAggregatorFactory("cnt"));
final String dataSource = "events-test";
System.out.print("beam is working");
final DruidBeams.Builder<Map<String, Object>, Map<String,
Object>> builder = DruidBeams.builder(
new Timestamper<Map<String, Object>>() {
@Override
public DateTime timestamp(Map<String, Object> theMap) {
return new DateTime(theMap.get("timestamp"));
}
}
)
.curator(curator)
.discoveryPath("/druid/discovery")
.location(
DruidLocation.create(
"druid:local:indexer",
"druid:local:firehose:%s",
dataSource
)
)
.rollup(DruidRollup.create(dimensions, aggregators, QueryGranularities.MINUTE))
.tuning(
ClusteredBeamTuning.builder()
.segmentGranularity(Granularity.HOUR)
.windowPeriod(new Period("PT1M"))
.build()
);
System.out.print("beam is not working");
final Beam<Map<String, Object>> beam = builder.buildBeam();
return beam;
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
这是我的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datasystem</artifactId>
<groupId>io.weblytics.backend</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>enricher</artifactId>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.3</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>tranquility-core_2.11</artifactId>
<version>0.8.2</version>
</dependency>
<!-- tranquility library depends on jackson 2.4.6 version -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>tranquility-storm_2.11</artifactId>
<version>0.8.2</version>
</dependency>
<dependency>
<groupId>io.weblytics.backend</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/net.sourceforge.wurfl/wurfl -->
<dependency>
<groupId>net.sourceforge.wurfl</groupId>
<artifactId>wurfl</artifactId>
<version>1.3.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.maxmind.geoip2/geoip2 -->
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>io.weblytics.backend.enricher.StormTopology</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
暂无答案!
目前还没有任何答案,快来回答吧!