无法从kafka使用者将消耗的数据写入hdfs

enxuqcxy  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(300)

我试图使用分区0中的数据,并将接收到的数据写入hdfs上的一个文件,但它引发了一个异常,我看不到任何数据正在写入hdfs文件

import java.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import java.util.Properties

import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConverters._

object WeatherCons {

  def main(args: Array[String]): Unit = {

    val TOPIC = "again"

    val props = new Properties()
    props.put("consumer.timeout.ms", "1500")
    props.put("bootstrap.servers", "104.197.102.208:9092")
    props.put("key.deserializer", 
   "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", 
    "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("group.id", "something")

    val consumer = new KafkaConsumer[String, String](props)

    consumer.subscribe(util.Collections.singletonList(TOPIC))
   /* val topicPartition = new TopicPartition(TOPIC, 0)
    consumer.seekToBeginning(topicPartition)*/
    val conf = new Configuration()
    conf.set("fs.defaultFS", "hdfs://104.197.102.208:8020")
    val fs = FileSystem.get(conf)
    import org.apache.hadoop.fs.FSDataOutputStream
    val fin:FSDataOutputStream = fs.create(new 
    Path("/prash/mySample3.txt"))
    while (true) {
      val records = consumer.poll(100)
      for (record <- records.asScala) {

        val co = record.value().toString
        fin.writeUTF(co)
        fin.writeUTF("\n")

        println(co)
      }
      fin.close()
      fin.flush()
    }
  }
}

它抛出如下异常,没有数据写入hdfs

Exception in thread "main" java.nio.channels.ClosedChannelException
        at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1940)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105)
        at 
 org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.DataOutputStream.writeUTF(DataOutputStream.java:401)
        at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
3pmvbmvn

3pmvbmvn1#

试着冲水而不是关门 fin 在while循环之后。

相关问题