storm与druid类com.fasterxml.jackson.module.scala.ser.scalaiteratorserializer的集成用已解决的错误重写最终方法

46qrfjad  于 2021-06-21  发布在  Storm
关注(0)|答案(0)|浏览(241)

我对风暴和Druid都是新手。从最近几天开始,我一直在讨论这个问题。我正在把Kafka的数据发送给斯托姆,然后再发送给Druid。我认为druidbeambolt正在接收数据,但在传输到druid之前无法将其转换为json。查看我的druidoltfactory代码以获取更多详细信息。如果每个人都需要更多关于代码的信息,那么请告诉我,提前谢谢
这就是错误

java.lang.NoClassDefFoundError: Could not initialize class com.metamx.tranquility.druid.DruidBeams$
    at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[stormjar.jar:na]
    at io.weblytics.backend.enricher.bolt.DruidBoltFactory.makeBeam(DruidBoltFactory.java:57) ~[stormjar.jar:na]
    at io.weblytics.backend.enricher.bolt.DruidBeamBolt.prepare(DruidBeamBolt.java:48) ~[stormjar.jar:na]
    at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:38.961+0000 b.s.util [ERROR] Async loop died!
java.lang.NoClassDefFoundError: Could not initialize class com.metamx.tranquility.druid.DruidBeams$
    at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[stormjar.jar:na]
    at io.weblytics.backend.enricher.bolt.DruidBoltFactory.makeBeam(DruidBoltFactory.java:57) ~[stormjar.jar:na]
    at io.weblytics.backend.enricher.bolt.DruidBeamBolt.prepare(DruidBeamBolt.java:48) ~[stormjar.jar:na]
    at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:38.961+0000 b.s.util [ERROR] Async loop died!
java.lang.VerifyError: class com.fasterxml.jackson.module.scala.ser.ScalaIteratorSerializer overrides final method withResolved.(Lcom/fasterxml/jackson/databind/BeanProperty;Lcom/fasterxml/jackson/databind/jsontype/TypeSerializer;Lcom/fasterxml/jackson/databind/JsonSerializer;)Lcom/fasterxml/jackson/databind/ser/std/AsArraySerializerBase;
    at java.lang.ClassLoader.defineClass1(Native Method) ~[na:1.8.0_91]
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[na:1.8.0_91]
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[na:1.8.0_91]
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[na:1.8.0_91]
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) ~[na:1.8.0_91]
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[na:1.8.0_91]
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) ~[na:1.8.0_91]
    at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_91]
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) ~[na:1.8.0_91]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_91]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[na:1.8.0_91]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_91]
    at com.fasterxml.jackson.module.scala.ser.IteratorSerializerModule$class.$init$(IteratorSerializerModule.scala:70) ~[stormjar.jar:na]
    at com.fasterxml.jackson.module.scala.DefaultScalaModule.<init>(DefaultScalaModule.scala:19) ~[stormjar.jar:na]
    at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<init>(DefaultScalaModule.scala:35) ~[stormjar.jar:na]
    at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<clinit>(DefaultScalaModule.scala) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:70) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:68) ~[stormjar.jar:na]
    at com.metamx.common.scala.Predef$EffectOps.withEffect(Predef.scala:44) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:67) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:64) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$class.$init$(Jackson.scala:14) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$.<init>(Jackson.scala:10) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$.<clinit>(Jackson.scala) ~[stormjar.jar:na]
    at com.metamx.tranquility.druid.DruidBeams$.<init>(DruidBeams.scala:107) ~[stormjar.jar:na]
    at com.metamx.tranquility.druid.DruidBeams$.<clinit>(DruidBeams.scala) ~[stormjar.jar:na]
    at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[stormjar.jar:na]
    at io.weblytics.backend.enricher.bolt.DruidBoltFactory.makeBeam(DruidBoltFactory.java:57) ~[stormjar.jar:na]
    at io.weblytics.backend.enricher.bolt.DruidBeamBolt.prepare(DruidBeamBolt.java:48) ~[stormjar.jar:na]
    at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:38.962+0000 b.s.d.executor [ERROR] 
java.lang.NoClassDefFoundError: Could not initialize class com.metamx.tranquility.druid.DruidBeams$
    at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[stormjar.jar:na]
    at io.weblytics.backend.enricher.bolt.DruidBoltFactory.makeBeam(DruidBoltFactory.java:57) ~[stormjar.jar:na]
    at io.weblytics.backend.enricher.bolt.DruidBeamBolt.prepare(DruidBeamBolt.java:48) ~[stormjar.jar:na]
    at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:38.962+0000 b.s.d.executor [ERROR] 
java.lang.VerifyError: class com.fasterxml.jackson.module.scala.ser.ScalaIteratorSerializer overrides final method withResolved.(Lcom/fasterxml/jackson/databind/BeanProperty;Lcom/fasterxml/jackson/databind/jsontype/TypeSerializer;Lcom/fasterxml/jackson/databind/JsonSerializer;)Lcom/fasterxml/jackson/databind/ser/std/AsArraySerializerBase;
    at java.lang.ClassLoader.defineClass1(Native Method) ~[na:1.8.0_91]
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[na:1.8.0_91]
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[na:1.8.0_91]
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[na:1.8.0_91]
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) ~[na:1.8.0_91]
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[na:1.8.0_91]
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) ~[na:1.8.0_91]
    at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_91]
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) ~[na:1.8.0_91]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_91]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[na:1.8.0_91]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_91]
    at com.fasterxml.jackson.module.scala.ser.IteratorSerializerModule$class.$init$(IteratorSerializerModule.scala:70) ~[stormjar.jar:na]
    at com.fasterxml.jackson.module.scala.DefaultScalaModule.<init>(DefaultScalaModule.scala:19) ~[stormjar.jar:na]
    at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<init>(DefaultScalaModule.scala:35) ~[stormjar.jar:na]
    at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<clinit>(DefaultScalaModule.scala) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:70) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:68) ~[stormjar.jar:na]
    at com.metamx.common.scala.Predef$EffectOps.withEffect(Predef.scala:44) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:67) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:64) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$class.$init$(Jackson.scala:14) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$.<init>(Jackson.scala:10) ~[stormjar.jar:na]
    at com.metamx.common.scala.Jackson$.<clinit>(Jackson.scala) ~[stormjar.jar:na]
    at com.metamx.tranquility.druid.DruidBeams$.<init>(DruidBeams.scala:107) ~[stormjar.jar:na]
    at com.metamx.tranquility.druid.DruidBeams$.<clinit>(DruidBeams.scala) ~[stormjar.jar:na]
    at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[stormjar.jar:na]
    at io.weblytics.backend.enricher.bolt.DruidBoltFactory.makeBeam(DruidBoltFactory.java:57) ~[stormjar.jar:na]
    at io.weblytics.backend.enricher.bolt.DruidBeamBolt.prepare(DruidBeamBolt.java:48) ~[stormjar.jar:na]
    at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:38.965+0000 b.s.d.executor [ERROR] 
java.lang.NoClassDefFoundError: Could not initialize class com.metamx.tranquility.druid.DruidBeams$
    at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[stormjar.jar:na]
    at io.weblytics.backend.enricher.bolt.DruidBoltFactory.makeBeam(DruidBoltFactory.java:57) ~[stormjar.jar:na]
    at io.weblytics.backend.enricher.bolt.DruidBeamBolt.prepare(DruidBeamBolt.java:48) ~[stormjar.jar:na]
    at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:38.967+0000 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-zkserver1/172.10.1.133:6703... [7]
2016-09-26T07:12:39.006+0000 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
    at backtype.storm.daemon.worker$fn__3812$fn__3813.invoke(worker.clj:456) [storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:39.011+0000 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
    at backtype.storm.daemon.worker$fn__3812$fn__3813.invoke(worker.clj:456) [storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-09-26T07:12:39.011+0000 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
    at backtype.storm.daemon.worker$fn__3812$fn__3813.invoke(worker.clj:456) [storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
    at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
  • 这是我的暴风拓扑课*
package io.web.backend.enricher;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import io.weblytics.backend.enricher.bolt.DruidBeamBolt;
import io.weblytics.backend.enricher.bolt.DruidBoltFactory;
import io.weblytics.backend.enricher.bolt.GenderBolt;
import io.weblytics.backend.enricher.druid.DruidBeamFactory;
import io.weblytics.backend.enricher.druid.DruidConfig;
import io.weblytics.backend.enricher.druid.ITupleDruidEventMapper;
import io.weblytics.backend.enricher.druid.TupleDruidEventMapper;
import io.weblytics.backend.enricher.spout.KafkaSpoutBuilder;

import java.util.HashMap;
import java.util.Map;

public class StormTopology {
    public static void main(String[] args) throws Exception {

//        BeamBolt<Map<String, Object>> beamBolt = new BeamBolt<>(new DruidBoltFactory());

        DruidBeamFactory druidBeamFactory = new DruidBoltFactory(new HashMap<String, Object>());
        DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();

        ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);

        DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);

        TopologyBuilder builder = new TopologyBuilder();
        KafkaSpoutBuilder ksb = new KafkaSpoutBuilder();

        builder.setSpout("kafka-spout", ksb.buildKafkaSpout(), 1);
        builder.setBolt("gender-bolt", new GenderBolt(), 1)
                .shuffleGrouping("kafka-spout");
        //     builder.setBolt("age-bolt", new AgeBolt(), 10)
        //           .shuffleGrouping("gender-bolt");
        builder.setBolt("druid-bolt", druidBolt, 10)
                .shuffleGrouping("gender-bolt");
        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("we-enricher", conf, builder.createTopology());

            Thread.sleep(1000);

            cluster.shutdown();
        }
    }
}

这是我的Druid光束螺栓

package io.web.backend.enricher.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.metamx.tranquility.tranquilizer.MessageDroppedException;
import com.metamx.tranquility.tranquilizer.Tranquilizer;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import io.weblytics.backend.enricher.druid.DruidBeamFactory;
import io.weblytics.backend.enricher.druid.DruidConfig;
import io.weblytics.backend.enricher.druid.ITupleDruidEventMapper;

import java.util.Map;

public class DruidBeamBolt<E> extends BaseRichBolt {

    private volatile OutputCollector collector;
    private DruidBeamFactory<E> beamFactory = null;
    private DruidConfig druidConfig = null;
    private Tranquilizer<E> tranquilizer = null;
    private ITupleDruidEventMapper<E> druidEventMapper = null;

    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper, DruidConfig druidConfig) {
        this.beamFactory = beamFactory;
        this.druidConfig = druidConfig;
        this.druidEventMapper = druidEventMapper;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        tranquilizer = Tranquilizer.builder()
                .maxBatchSize(druidConfig.getMaxBatchSize())
                .maxPendingBatches(druidConfig.getMaxPendingBatches())
                .lingerMillis(druidConfig.getLingerMillis())
                .blockOnFull(druidConfig.isBlockOnFull())
                .build(beamFactory.makeBeam(stormConf, context));
        this.tranquilizer.start();
    }

    @Override
    public void execute(final Tuple tuple) {
        System.out.print("inside if 12");
        Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
        future.addEventListener(new FutureEventListener() {
            @Override
            public void onFailure(Throwable cause) {
                if (cause instanceof MessageDroppedException) {
                    System.out.print("inside if");
                    collector.ack(tuple);
                    if (druidConfig.getDiscardStreamId() != null)
                        collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple, System.currentTimeMillis()));
                }
                else {
                    System.out.print("inside else");
                    collector.fail(tuple);
                }
            }

            @Override
            public void onSuccess(Object value) {
                System.out.print("wking");
                collector.ack(tuple);
            }
        });

    }

    @Override
    public void cleanup() {
        tranquilizer.stop();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(druidConfig.getDiscardStreamId(), new Fields("tuple", "timestamp"));
    }
}

这是我的Druid螺栓工厂,我想问题出在这个螺栓上。尤其是在创建对象生成器时。生成器通过jackson将事件类型在内部序列化为druid可以理解的json。

package io.weblytics.backend.enricher.bolt;

import backtype.storm.task.IMetricsContext;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.metamx.common.Granularity;
import com.metamx.tranquility.beam.Beam;
import com.metamx.tranquility.beam.ClusteredBeamTuning;
import com.metamx.tranquility.druid.DruidBeams;
import com.metamx.tranquility.druid.DruidLocation;
import com.metamx.tranquility.druid.DruidRollup;
import com.metamx.tranquility.storm.BeamFactory;
import com.metamx.tranquility.typeclass.Timestamper;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.weblytics.backend.enricher.druid.DruidBeamFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.curator.retry.RetryOneTime;
import org.joda.time.DateTime;
import org.joda.time.Period;

import java.util.List;
import java.util.Map;

public class DruidBoltFactory implements DruidBeamFactory<Map<String, Object>> {

    Map<String, Object> factoryConf = null;

    public DruidBoltFactory(Map<String, Object> factoryConf){
        this.factoryConf = factoryConf;
    }

    @Override
    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
        try {
            final CuratorFramework curator = CuratorFrameworkFactory.newClient(
                    "172.10.1.252:2181", new BoundedExponentialBackoffRetry(100, 1000, 5));
            curator.start();
            final List<String> dimensions = ImmutableList.of("interests", "origin", "user_id",
                    "geo_country", "timestamp", "age", "site_id", "value", "server",
                    "http_method", "cust_id", "gender","device","http_code","geo_region","page","unit");

            final List<AggregatorFactory> aggregators = ImmutableList
                    .<AggregatorFactory>of(new CountAggregatorFactory("cnt"));
            final String dataSource = "events-test";
            System.out.print("beam is working");

final DruidBeams.Builder<Map<String, Object>, Map<String,

Object>> builder = DruidBeams.builder(
                    new Timestamper<Map<String, Object>>() {
                        @Override
                        public DateTime timestamp(Map<String, Object> theMap) {
                            return new DateTime(theMap.get("timestamp"));
                        }
                    }
            )
                    .curator(curator)
                    .discoveryPath("/druid/discovery")
                    .location(
                            DruidLocation.create(
                                    "druid:local:indexer",
                                    "druid:local:firehose:%s",
                                    dataSource
                            )
                    )
                    .rollup(DruidRollup.create(dimensions, aggregators, QueryGranularities.MINUTE))
                    .tuning(
                            ClusteredBeamTuning.builder()
                                    .segmentGranularity(Granularity.HOUR)
                                    .windowPeriod(new Period("PT1M"))
                                    .build()
                    );

                System.out.print("beam is not working");
                final Beam<Map<String, Object>> beam = builder.buildBeam();
                return beam;
            } catch (Exception e) {
                throw Throwables.propagate(e);
            }
        }
    }

这是我的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>datasystem</artifactId>
        <groupId>io.weblytics.backend</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>enricher</artifactId>
    <dependencies>

    <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.10.0</version>
    </dependency>

     <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.10.0.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.3</version>
        <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.3</version>
        </dependency>

        <dependency>
            <groupId>io.druid</groupId>
            <artifactId>tranquility-core_2.11</artifactId>
            <version>0.8.2</version>
        </dependency>

        <!-- tranquility library depends on jackson 2.4.6 version -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.8.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.4.4</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.8.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-smile</artifactId>
            <version>2.8.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-scala_2.10</artifactId>
            <version>2.8.2</version>
        </dependency>

        <dependency>
            <groupId>io.druid</groupId>
            <artifactId>druid-server</artifactId>
            <version>0.9.1</version>
        </dependency>

        <dependency>
            <groupId>io.druid</groupId>
            <artifactId>tranquility-storm_2.11</artifactId>
            <version>0.8.2</version>
        </dependency>
        <dependency>
            <groupId>io.weblytics.backend</groupId>
            <artifactId>common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/net.sourceforge.wurfl/wurfl -->
        <dependency>
            <groupId>net.sourceforge.wurfl</groupId>
            <artifactId>wurfl</artifactId>
            <version>1.3.1.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.maxmind.geoip2/geoip2 -->
        <dependency>
            <groupId>com.maxmind.geoip2</groupId>
            <artifactId>geoip2</artifactId>
            <version>2.7.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>io.weblytics.backend.enricher.StormTopology</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题