如何使用java在storm bolts中编写prepare()方法?

t0ybt7op  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(311)

我想从中读取一个文件 Bolt.prepare() 并将读取的文件的对象传递给execute方法。有没有人能给我发一个示例代码,告诉我如何使用java在Storm中实现它?我正在使用:

public void prepare(Map stormConf, TopologyContext context) {
    try {
        this.context = context;
        this.fileReader = new FileInputStream("/home/anji/all-users.csv");
    } catch (FileNotFoundException e) {
        /*throw new RuntimeException("Error reading file "
                + conf.get("inputFile"));*/
    }
    this.collector = collector;

}

我打电话给 FileReaderexecute() 方法: User p = new User(fileReader); 错误消息:

java.io.IOException: Stream Closed
  at java.io.FileInputStream.readBytes(Native Method)
  at java.io.FileInputStream.read(FileInputStream.java:272)
  at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
  at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
  at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
  at java.io.InputStreamReader.read(InputStreamReader.java:184)
  at com.csvreader.CsvReader.checkDataLength(Unknown Source)
  at com.csvreader.CsvReader.readRecord(Unknown Source)
  at com.csvreader.CsvReader.readHeaders(Unknown Source)
6rqinv9w

6rqinv9w1#

只需在bolt中使用包含所有 User . 一次只能读取一次文件 prepare() :

public class MyClass implements IRichBolt {
    private final List<User> users = new ArrayList<User>();

    public void prepare(...) {
        File f = new File(...);
        // open file and create objects (or similar code)
       String line;
       while((line = reader.readLine()) != null) {
           users.add(new Users(line));
       }
    }

    // other methods including .execute(...) can use users 
}

相关问题