java.lang.string不能转换为java.util.list从一个r螺栓向下一个螺栓发送数据时显示错误

swvgeqrz  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(313)

我正在使用r bolt处理我的数据。我想将处理后的数据从r bolt发送到我的下一个bolt,即countbolt。但是从r bolt发送时,java.lang.string不能转换为java.util.list错误显示。

ERROR o.a.s.t.ShellBolt - Halting process: ShellBolt died. Command: [Rscript, permute.R], ProcessInfo pid:7169, name:python-split-sentence exitCode:-1, errorString: 
java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.storm.multilang.JsonSerializer.readShellMsg(JsonSerializer.java:135) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:125) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:330) [storm-core-1.2.2.jar:1.2.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
7055 [Thread-77] ERROR o.a.s.d.executor - 
java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.storm.multilang.JsonSerializer.readShellMsg(JsonSerializer.java:135) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:125) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:330) [storm-core-1.2.2.jar:1.2.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
7145 [Thread-58-sentence-spout-executor[27 27]] INFO  o.a.s.d.task - Emitting: sentence-spout default [the cow jumped over the moon]
7145 [Thread-58-sentence-spout-executor[27 27]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 25 tuple: source: sentence-spout:27, stream: default, id: {}, [the cow jumped over the moon]]
7148 [Thread-70-python-split-sentence-executor[25 25]] INFO  o.a.s.d.executor - Processing received message FOR 25 TUPLE: source: sentence-spout:27, stream: default, id: {}, [the cow jumped over the moon]
7149 [Thread-70-python-split-sentence-executor[25 25]] ERROR o.a.s.util - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: pid:7212, name:python-split-sentence exitCode:-1, errorString: 
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: pid:7212, name:python-split-sentence exitCode:-1, errorString: 
    at org.apache.storm.task.ShellBolt.execute(ShellBolt.java:175) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]
    ... 6 more
Caused by: java.lang.RuntimeException: pid:7212, name:python-split-sentence exitCode:-1, errorString: 
    at org.apache.storm.task.ShellBolt.die(ShellBolt.java:292) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt.access$400(ShellBolt.java:72) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:364) ~[storm-core-1.2.2.jar:1.2.2]
    ... 1 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.storm.multilang.JsonSerializer.readShellMsg(JsonSerializer.java:135) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:125) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:330) ~[storm-core-1.2.2.jar:1.2.2]
    ... 1 more
7150 [Thread-70-python-split-sentence-executor[25 25]] ERROR o.a.s.d.executor - 
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: pid:7212, name:python-split-sentence exitCode:-1, errorString: 
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: pid:7212, name:python-split-sentence exitCode:-1, errorString: 
    at org.apache.storm.task.ShellBolt.execute(ShellBolt.java:175) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]
    ... 6 more
Caused by: java.lang.RuntimeException: pid:7212, name:python-split-sentence exitCode:-1, errorString: 
    at org.apache.storm.task.ShellBolt.die(ShellBolt.java:292) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt.access$400(ShellBolt.java:72) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:364) ~[storm-core-1.2.2.jar:1.2.2]
    ... 1 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.storm.multilang.JsonSerializer.readShellMsg(JsonSerializer.java:135) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:125) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:330) ~[storm-core-1.2.2.jar:1.2.2]
    ... 1 more
7246 [Thread-58-sentence-spout-executor[27 27]] INFO  o.a.s.d.task - Emitting: sentence-spout default [an apple a day keeps the doctor away]
7246 [Thread-58-sentence-spout-executor[27 27]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 24 tuple: source: sentence-spout:27, stream: default, id: {}, [an apple a day keeps the doctor away]]
7248 [Thread-30-python-split-sentence-executor[24 24]] INFO  o.a.s.d.executor - Processing received message FOR 24 TUPLE: source: sentence-spout:27, stream: default, id: {}, [an apple a day keeps the doctor away]
7248 [Thread-30-python-split-sentence-executor[24 24]] INFO  o.a.s.d.executor - Execute done TUPLE source: sentence-spout:27, stream: default, id: {}, [an apple a day keeps the doctor away] TASK: 24 DELTA: -1
7254 [Thread-75] ERROR o.a.s.t.ShellBolt - Halting process: ShellBolt died. Command: [Rscript, /home/uvionics/Downloads/Dixon/Rstorm/src/jvm/udacity/storm/resources/permute.R], ProcessInfo pid:7170, name:python-split-sentence exitCode:-1, errorString: 
java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.storm.multilang.JsonSerializer.readShellMsg(JsonSerializer.java:135) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:125) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:330) [storm-core-1.2.2.jar:1.2.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
7254 [Thread-75] ERROR o.a.s.d.executor - 
java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
    at org.apache.storm.multilang.JsonSerializer.readShellMsg(JsonSerializer.java:135) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:125) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:330) [storm-core-1.2.2.jar:1.2.2]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]

下面的代码表示sentencewordcounttopology的代码

package***.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.ShellBolt;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import org.apache.storm.utils.Utils;

import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;

import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import***.storm.spout.RandomSentenceSpout;
import***.storm.SplitSentence;

public class SentenceWordCountTopology {

  private SentenceWordCountTopology() { }

  static class CountBolt extends BaseRichBolt {

    // To output tuples from this bolt to the next stage bolts, if any
    private OutputCollector collector;
    PrintWriter fOut;
    String outFile;

    // Map to store the count of the words
    private Map<String, Integer> countMap;

    @Override
    public void execute(Tuple tuple)
    {
         try {
              if(fOut == null) {
                   fOut = new PrintWriter(new FileWriter(outFile));
               }

              String key = tuple.getStringByField("key");
              String value = tuple.getStringByField("value");

              //fOut.println("FROM STORM:" + count + ":" + tuple.toString()); 
              fOut.println("FROM STORM:" + ":(" + key + "," + value + ")");
              fOut.flush();

      } catch (IOException ioe) {
             System.out.println("Writing to file:" + outFile + " failed");
      }
        System.err.println("++++++++++++++++++ " + tuple);

        countMap = new HashMap<String, Integer>();

      //Syntax to get the word from the 1st column of incoming tuple
      String word = tuple.getString(0);

      // check if the word is present in the map
      if (countMap.get(word) == null) {

      // not present, add the word with a count of 1
      countMap.put(word, 1);
      } else {

      // already there, hence get the count
      Integer val = countMap.get(word);

      // increment the count and save it to the map
      countMap.put(word, ++val);

    }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
    {
      outputFieldsDeclarer.declare(new Fields("word","count"));
    }

    @Override
    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        outFile = config.get((Object)"ALLFILE").toString(); 

    }
  }

  public static void main(String[] args) throws Exception
  {
    // create the topology
      Config config = new Config();
        config.setDebug(true);
        config.setNumWorkers(1);
                config.put("ALLFILE",(Object)"/path/new.txt");

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("sentence-spout", new RandomSentenceSpout(), 1);
    builder.setBolt("python-split-sentence", new SplitSentence(), 10).shuffleGrouping("sentence-spout");
    builder.setBolt("count-bolt", new CountBolt(), 15).shuffleGrouping("python-split-sentence");

    LocalCluster cluster=new LocalCluster();
    cluster.submitTopology("pc", config, builder.createTopology());
    Thread.sleep(40000);
    cluster.shutdown();
    StormSubmitter.submitTopology("pc", config, builder.createTopology());

  }
}

下面的代码表示splitsentence的代码

package***.storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.ShellBolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;

  public class SplitSentence extends ShellBolt implements IRichBolt {

    public SplitSentence() {
        super("Rscript", "/path/permute.R");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields());
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

下面的代码表示permute.r的代码


# !/usr/bin/Rscript

library(permute,quietly=TRUE)

# library(Storm,quietly=TRUE)

source("/path/Storm.R")

storm = Storm$new();

storm$lambda = function(s) {

  t = s$tuple;

  words = strsplit(unlist(t$input[1]),"\\s+",perl=TRUE)[[1]];
  words1 = toJSON(words)
  words2 = charToRaw(words1)

  t$output = vector(mode="character",length=1);
  t$output[1] = paste(c("PERMUTE",words[shuffle(length(words))]),collapse=" ");

  s$emit(t);

};

 storm$run();
5cg8jx4n

5cg8jx4n1#

您发布的堆栈跟踪有来自的异常https://github.com/apache/storm/blob/v1.2.2/storm-core/src/jvm/org/apache/storm/multilang/jsonserializer.java#l135. 这一行是试图读取“元组”属性在您的喷口。
storm multilang通过stdin和stdout与您的bolt对话。当bolt进程发出一个元组时,它应该向stdout写入如下内容:

{
    "command": "emit",
    // The ids of the tuples this output tuples should be anchored to
    "anchors": ["1231231", "-234234234"],
    // The id of the stream this tuple was emitted to. Leave this empty to emit to default stream.
    "stream": "1",
    // If doing an emit direct, indicate the task to send the tuple to
    "task": 9,
    // All the values in this tuple
    "tuple": ["field1", 2, 3]
}

注意“tuple”字段是如何成为一个列表的。
您看到的问题是,当您的bolt写入stdout时,它会在“tuple”字段中写入一个字符串。您需要查看stormr包来找出发生这种情况的原因。
由于multilang协议是基于文本的,因此可以通过手动启动r程序并通过stdin向其写入消息来调试bolt。协议描述见http://storm.apache.org/releases/1.1.2/multilang-protocol.html,参见“螺栓”标题。
原始答案:
有两件事不对:
您需要声明希望输出字段具有的名称。所以呢 declarer.declare(new Fields()); 必须是 declarer.declare(new Fields("key", "value")); . 为了解释这是为了什么,输出字段名用在引用字段的下游螺栓中,用在按字段分组的字段中。拆分螺栓中的输出字段应与您在countbolt中读取的字段相匹配(即“key”和“value”)
更新:别在意这一点,你没有正确发射是错误的。这个 emit 方法签名为 emit(Tuple, List<Object>) ,其中元组是锚定(用于将新元组绑定到输入元组以进行确认),并且列表包含希望新元组包含的值。还有其他的 emit 重载,但它们遵循这个基本思想。因为您已经设置了拓扑,所以拆分螺栓将发射到计数螺栓中,所以您希望从拆分螺栓发射一个键和一个值。例如 emit("myKey", 52) .

相关问题