无法用kafka在solr中接收数据

nom7f22z  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(301)

我试图插入Kafka的数据自动solr和香蕉,但这是我不可能的,因为这一点

error in #Convert SolrDocuments

java.lang.numberformatexception:对于输入字符串:java.lang.numberformatexception.forinputstring(numberformatexception)中的“2007”。java:65)在java.lang.integer.parseint(integer。java:580)在java.lang.integer.valueof(integer。java:766)在com.example.streaming.eventparseutil.convertdata(eventparseutil.java:24)在com.example.streaming.careventsprocessor.lambda$main$91ca40fe$1(carev entsprocessor)。java:76)在org.apache.spark.api.java.javapairdd$$anonfun$toscalefunction$1.appl是( java 语)。scala:1015)在scala.collection.iterator$$anon$11.next(iterator。scala:328)在scala.collection.convert.wrappers$iteratorwrapper.next(wrappers.scala:30)在com.lucidworks.spark.solrsupport$5.call(solrsupport。java:216)访问com.lucidworks.spark.solrsupport$5.call(solrsupport。java:210)在org.apache.spark.api.java.javarddlike$$anonfun$foreachpartition$1.app ly(javarddlike。scala:225)位于org.apache.spark.api.java.javarddlike$$anonfun$foreachpartition$1.app ly(javarddlike。scala:225)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$3.5.apply(rdd。scala:927)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$3.5.apply(rdd)。scala:927)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext.scala:1857)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext.scala:1857)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:66)在org.apache.spark.scheduler.task.run(任务。scala:89)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:247)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)18/10/06 01:10:08错误executor.executor:阶段0.0(t id 1)中任务1.0出现异常java.lang.numberformatexception:对于输入字符串:“2007”atjava.lang.numberformatexception.forinputstring(numberformatexception。java:65)在java.lang.integer.parseint(integer。java:580)在java.lang.integer.valueof(integer。java:766)在com.example.streaming.eventparseutil.convertdata(eventparseutil.java:24)在com.example.streaming.careventsprocessor.lambda$main$91ca40fe$1(carev entsprocessor)。java:76)在org.apache.spark.api.java.javapairdd$$anonfun$toscalefunction$1.appl是( java 语)。scala:1015)在scala.collection.iterator$$anon$11.next(iterator。scala:328)在scala.collection.convert.wrappers$iteratorwrapper.next(wrappers.scala:30)在com.lucidworks.spark.solrsupport$5.call(solrsupport。java:216)访问com.lucidworks.spark.solrsupport$5.call(solrsupport。java:210)在org.apache.spark.api.java.javarddlike$$anonfun$foreachpartition$1.app ly(javarddlike。scala:225)位于org.apache.spark.api.java.javarddlike$$anonfun$foreachpartition$1.app ly(javarddlike。scala:225)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$3.5.apply(rdd。scala:927)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$3.5.apply(rdd)。scala:927)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext.scala:1857)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext.scala:1857)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:66)在org.apache.spark.scheduler.task.run(任务。scala:89)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:247)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)18/10/06 01:10:08错误executor.executor:阶段0.0中的任务0.0中出现异常(t id 0)java.lang.numberformatexception:对于输入字符串:“2007”atjava.lang.numberformatexception.forinputstring(numberformatexception。java:65)在java.lang.integer.parseint(integer。java:580)在java.lang.integer.valueof(integer。java:766)在com.example.streaming.eventparseutil.convertdata(eventparseutil.java:24)在com.example.streaming.careventsprocessor.lambda$main$91ca40fe$1(carev entsprocessor)。java:76)在org.apache.spark.api.java.javapairdd$$anonfun$toscalefunction$1.appl是( java 语)。scala:1015)在scala.collection.iterator$$anon$11.next(iterator。scala:328)在scala.collection.convert.wrappers$iteratorwrapper.next(wrappers.scala:30)在com.lucidworks.spark.solrsupport$5.call(solrsupport。java:216)访问com.lucidworks.spark.solrsupport$5.call(solrsupport。java:210)在org.apache.spark.api.java.javarddlike$$anonfun$foreachpartition$1.app ly(javarddlike。scala:225)位于org.apache.spark.api.java.javarddlike$$anonfun$foreachpartition$1.app ly(javarddlike。scala:225)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$3.5.apply(rdd。scala:927)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$3.5.apply(rdd)。scala:927)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext.scala:1857)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext.scala:1857)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:66)在org.apache.spark.scheduler.task.run(任务。scala:89)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:247)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)18/10/06 01:10:08错误spark.solrsupport:将批发送到collection connectedcar
数据失败,原因是:org.apache.solr.common.solrexception:找不到集合:
连接卡数据
我附上完整的密码。
有人怀疑失败是什么吗?

public class CarEventsProcessor {

 private CarEventsProcessor() {}

 public static void main(String[] args) throws JsonParseException, JsonMappingException, IOException {
  if (args.length < 4) {
   System.err
    .println("Usage: CarEventsProcessor <brokers> <topics> <zk_url> <index_name>\n" +
     "  <brokers> is a list of one or more Kafka brokers\n" +
     "  <topics> is a list of one or more kafka topics to consume from\n" +
     " <zk_url> zookeeper url\n" +
     " <index_name> name of solr index\n\n");
   System.exit(1);
  }

  String brokers = args[0];
  String topics = args[1];
  String zk_url = args[2];
  String index_name = args[3];

  ObjectMapper objectMapper = new ObjectMapper();
  objectMapper.registerModule(new DefaultScalaModule());

  // Create context with a 2 seconds batch interval
  SparkConf sparkConf = new SparkConf()
   .setAppName("CarEventsProcessor");
  sparkConf.setMaster("local[4]");

  JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
  jssc.sparkContext().setLogLevel("ERROR");

  HashSet < String > topicsSet = new HashSet < String > (Arrays.asList(topics.split(",")));
  HashMap < String, String > kafkaParams = new HashMap < String, String > ();
  kafkaParams.put("metadata.broker.list", brokers);

  // Create direct kafka stream with brokers and topics
  JavaPairInputDStream < String, String > messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
   StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);

  // Get the messages and extract payload
  JavaDStream < String > events = messages
   .map(new Function < Tuple2 < String, String > , String > () {
    @Override
    public String call(Tuple2 < String, String > tuple2) {
     return tuple2._2();
    }
   });

  //convert to SolrDocuments
  JavaDStream < SolrInputDocument > parsedSolrEvents = events.map(incomingRecord -> EventParseUtil.convertData(incomingRecord));

  //send to solr
  SolrSupport.indexDStreamOfDocs(zk_url, index_name, 10, parsedSolrEvents);

  parsedSolrEvents.print();
  jssc.start();
  jssc.awaitTermination();
 }
}
zdwk9cvp

zdwk9cvp1#

NumberFormatException: For input string: "2007 "... at com.example.streaming.EventParseUtil.convertData(EventParseUtil.java: 24) 你打电话来了 Integer.parseInt 在包含空格的字符串上。
必须将字符串参数修剪为该方法。
在错误的底部,您有一个集合没有找到。
一般来说,hdp推荐的在solr和kafka之间获取这些数据的方法是使用nifi

相关问题