spark streaming customreceiver未知主机异常

rsl1atfo  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(267)

我是新的Spark流。我想在线传输一个url以便从某个url检索信息,我使用javacustomreceiver来传输一个url。
这是我正在使用的代码(源代码)

public class JavaCustomReceiver extends Receiver<String> {

    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {

        SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));

        JavaReceiverInputDStream<String> lines = ssc.receiverStream(
            new JavaCustomReceiver("http://stream.meetup.com/2/rsvps", 80));

        JavaDStream<String> words = lines.flatMap(new 

              FlatMapFunction<String, String>() {

                 @Override
                 public Iterator<String> call(String x) {
                     return Arrays.asList(SPACE.split(x)).iterator();
                 }
              });

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
              new PairFunction<String, String, Integer>() {

                 @Override
                 public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<>(s, 1);
                 }
              }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer i1, Integer i2) {
                    return i1 + i2;
                }
            });

    wordCounts.print();
    ssc.start();
    ssc.awaitTermination();
}

String host = null;
int port = -1;

public JavaCustomReceiver(String host_, int port_) {
    super(StorageLevel.MEMORY_AND_DISK_2());
    host = host_;
    port = port_;
}

public void onStart() {

    new Thread() {
        @Override
        public void run() {
            receive();
        }
    }.start();
}

public void onStop() {

}

private void receive() {
    try {
        Socket socket = null;
        BufferedReader reader = null;
        String userInput = null;
        try {
            // connect to the server
            socket = new Socket(host, port);
            reader = new BufferedReader(
                    new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
            // Until stopped or connection broken continue reading
            while (!isStopped() && (userInput = reader.readLine()) != null) {
                System.out.println("Received data '" + userInput + "'");
                store(userInput);
            }
        } finally {
            Closeables.close(reader, /* swallowIOException = */ true);
            Closeables.close(socket, /* swallowIOException = */ true);
        }

        restart("Trying to connect again");
    } catch (ConnectException ce) {
        // restart if could not connect to server
        restart("Could not connect", ce);
    } catch (Throwable t) {
        restart("Error receiving data", t);
    }
}
  }

但是,我不断得到一个java.net.unknownhostexception
我怎样才能解决这个问题?我使用的代码有什么问题?

yquaqz18

yquaqz181#

在读取引用的自定义接收器的代码之后,很明显,它是一个连接到 host:port 而不是一个可以接收url的http接收器。您必须将代码更改为从http端点读取。

相关问题