streamparse/python-custom fail()方法不适用于错误元组

klr1opcd  于 2021-06-21  发布在  Storm
关注(0)|答案(0)|浏览(142)

我使用storm实时处理kafka的消息,并使用streamparse构建我的拓扑结构。对于这个用例,我们必须100%保证进入storm的任何消息都得到处理和确认。我已经使用try/catch在bolt上实现了逻辑(见下文),除了将其写入kafka中的另一个“错误”主题之外,我还想让storm重播这些消息。
在我的kafkaspout中,我将tup\u id指定为等于我的消费者从中输入的kafka主题的偏移量id。但是,当我使用错误的变量引用强制bolt出错时,我看不到消息被重放。我确实看到有人在写“错误”Kafka主题,但只有一次——这意味着元组永远不会被重新提交到我的bolt中。我对拓扑消息的设置\u timeout \u sec=60,我希望storm每60秒继续重播一次失败的消息,并让我的错误捕获一直写入错误主题。
Kafka斯波特.py

class kafkaSpout(Spout):

    def initialize(self, stormconf, context):

        self.kafka = KafkaClient(str("host:6667"))#,offsets_channel_socket_timeout_ms=60000)
        self.topic = self.kafka.topics[str("topic-1")]
        self.consumer = self.topic.get_balanced_consumer(consumer_group=str("consumergroup"),auto_commit_enable=False,zookeeper_connect=str("host:2181"))

    def next_tuple(self):
        for message in self.consumer:
            self.emit([json.loads(message.value)],tup_id=message.offset)
            self.log("spout emitting tuple ID (offset): "+str(message.offset))
            self.consumer.commit_offsets()

    def fail(self, tup_id):
        self.log("failing logic for consumer. resubmitting tup id: ",str(tup_id))
        self.emit([json.loads(message.value)],tup_id=message.offset)

工艺螺栓.py

class processBolt(Bolt):

  auto_ack = False
  auto_fail = False

  def initialize(self, conf, ctx):
      self.counts = Counter()
      self.kafka = KafkaClient(str("host:6667"),offsets_channel_socket_timeout_ms=60000)
      self.topic = self.kafka.topics[str("topic-2")]
      self.producer = self.topic.get_producer()

      self.failKafka = KafkaClient(str("host:6667"),offsets_channel_socket_timeout_ms=60000)
      self.failTopic = self.failKafka.topics[str("topic-error")]
      self.failProducer = self.failTopic.get_producer()

  def process(self, tup):
      try:
          self.log("found tup.")
          docId = tup.values[0]
          url = "solrserver.host.com/?id="+str(docId)

          thisIsMyForcedError = failingThisOnPurpose ####### this is what im using to fail my bolt consistent

          data = json.loads(requests.get(url).text)

          if len(data['response']['docs']) > 0:
              self.producer.produce(json.dumps(docId))
              self.log("record FOUND {0}.".format(docId))

          else:
              self.log('record NOT found {0}.'.format(docId)) 

          self.ack(tup)

      except:
          docId = tup.values[0]
          self.failProducer.produce( json.dumps(docId), partition_key=str("ERROR"))
          self.log("TUP FAILED IN PROCESS BOLT: "+str(docId))
          self.fail(tup)

我将感谢任何帮助,如何正确地实现这个案件的自定义失败逻辑。提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题