如何在网络连接(tcp ip)关闭时停止spark流上下文?

ffscu2ro  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(328)

换句话说,我不想在spark streaming上下文中将duration设置为一个值,而是想将其设置为(socket close time-socket open time)

t3psigkw

t3psigkw1#

通过在流侦听器中执行ssc.stop(),当您的流出错或终止时,此函数将触发停止。还有另一个触发器,比如批处理何时开始、何时完成等。

ssc.addStreamingListener(new StreamingListener() {

    @Override
      public void onReceiverError(StreamingListenerReceiverError receiverError) {
      System.out.println("Do what u want");
      ssc.stop();
      }

    @Override
      public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
      System.out.println("Do what u want");
      ssc.stop(true,true); 
      }
    }

    ssc.start();
bejyjqdl

bejyjqdl2#

您可以使用streaminglistner接口监听正在断开连接的接收器,然后关闭流上下文。
这是用作

// define listener
class MyListener extends StreamingListener {
  override def onReceiverStopped(...) {
    streamingContext.stop()
  }
} 

// attach listener
streamingContext. addStreamingListener(new MyListener())

相关问题