storm-bolt执行的比其父代多

3zwjbxry  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(335)

我有一个拓扑结构,其中包含一个Kafka和2螺栓。
boltparsejsoninput及其执行方法:

public void execute(Tuple input) {
    // TODO Auto-generated method stub
    String data = input.getString(4);
    js = new JSONObject(data);

    String userId = js.getString("userId");
    String timestamp = js.getString("timestamp");
    counter++;
    System.out.println(counter);
    collector.emit(input, new Values(userId, timestamp));
    collector.ack(input);
}

boltinsertredis及其执行方法

public void execute(Tuple input) {
    // TODO Auto-generated method stub
    String userId = input.getStringByField("userId");
    int timestamp = 0;
    try {

        timestamp = convertTimestampToEpoch(input.getStringByField("timestamp"));
    } catch (ParseException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }

    String timestep = this.prefix + timestamp/10;
    String curTimestamp = jedis.hget(timestep, userId);
    if(curTimestamp == null || Integer.parseInt(curTimestamp) < timestamp) {
        jedis.hset(timestep, userId, Integer.toString(timestamp));
    }
    collector.ack(input);
}

boltinsertredis从boltparsejsoninput获取输入

builder.setBolt("ParseJsonInput-Bolt", new BoltParseJsonInput()).shuffleGrouping("Kafka-Spout");
    builder.setBolt("BoltRedisUserLastActive-Bolt", new BoltRedisUserLastActive()).shuffleGrouping("ParseJsonInput-Bolt");

但是当我将这个拓扑提交到storm中时,boltinsertredis执行的比boltparsejsoninput更多

你能解释一下这里有什么问题吗?

e4eetjau

e4eetjau1#

我发现我的parsejsonbolt在消息25700处出现了一个异常,并在该点上不断重放执行。当我试着接球时,效果很好

相关问题