flume+spark+hbase无法工作

zzzyeukh  于 2021-06-04  发布在  Hadoop
关注(0)|答案(0)|浏览(237)

我正在尝试使用flume将数据花费到spark,然后将数据添加到hbase~我已经尝试使用flume+spark+hdfs,这是可行的。以下是源代码:

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import com.google.common.collect.Lists;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;

import scala.Tuple2;
import scala.Tuple4;

public class JavaFlumeEventTest {
    private static final Pattern SPACE = Pattern.compile(" ");
    private static Configuration conf = null;

    /**
     * initial
     */
    static {
        conf = HBaseConfiguration.create();         
        conf.addResource(new Path("file:///etc/hbase/conf/hbase-site.xml"));
        conf.addResource(new Path("file:///etc/hadoop/conf/hdfs-site.xml"));
        conf.addResource(new Path("file:///etc/hadoop/conf/core-site.xml"));
        conf.addResource(new Path("file:///etc/hadoop/conf/mapred-site.xml"));
        conf.addResource(new Path("file:///etc/hadoop/conf/yarn-site.xml"));
        conf.set("hbase.zookeeper.quorum", "elephant,tiger,horse");
        conf.set("hbase.zookeeper.property.clientPort","2181");
        conf.set("hbase.master", "elephant" + ":60000");
        conf.set("hbase.cluster.distributed", "true");
        conf.set("hbase.rootdir", "hdfs://elephant:8020/hbase");
    }
    /**
     * Add new record
     * @param tableName
     * @param rowKey
     * @param family
     * @param qualifier
     * @param value
     */
    public static void addRecord (String tableName, String rowKey, String family, String qualifier, String value){
        try {
            System.out.println("===========HTable =========="+conf);
            HTable table = new HTable(conf, tableName);
            System.out.println("===========put ==========");
            Put put = new Put(Bytes.toBytes(rowKey));
            System.out.println("===========put Add==========");
            put.add(Bytes.toBytes(family),Bytes.toBytes(qualifier),Bytes.toBytes(value));
            System.out.println("===========table put ==========");
            table.put(put);
            System.out.println("insert recored " + rowKey + " to table " + tableName +" ok.");
        } catch (IOException e) {
            System.out.println("===========IOException ==========");
            e.printStackTrace();
        }
    }
  private JavaFlumeEventTest() {
  }

  public static void main(String[] args) {

    String host = args[0];
    int port = Integer.parseInt(args[1]);
    Duration batchInterval = new Duration(Integer.parseInt(args[2]));
    final String tableName = args[3];
    final String columnFamily = args[4];
    SparkConf sparkConf = new SparkConf()
    .setAppName("JavaFlumeEventTest")
    .set("spark.executor.memory", "256m");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);

    final Broadcast<String> broadcastTableName = ssc.sparkContext().broadcast(tableName);
    final Broadcast<String> broadcastColumnFamily = ssc.sparkContext().broadcast(columnFamily);

    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
    JavaDStream<String>
    words = flumeStream.flatMap(new FlatMapFunction<SparkFlumeEvent,String>(){

        @Override
        public Iterable<String> call(SparkFlumeEvent arg0) throws Exception {
            String body = new String(arg0.event().getBody().array(), Charset.forName("UTF-8"));
            return Lists.newArrayList(SPACE.split(body));
        }

    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
              new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) {
                  return new Tuple2<String, Integer>(s, 1);
                }
              }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer i1, Integer i2) {
                  return i1 + i2;
                }
              });

    wordCounts.print();

    wordCounts.foreach(new Function2<JavaPairRDD<String,Integer>, Time, Void>() {

        @Override
        public Void call(JavaPairRDD<String, Integer> values,
                Time time) throws Exception {

            values.foreach(new VoidFunction<Tuple2<String, Integer>> () {

                @Override
                public void call(Tuple2<String, Integer> tuple){
                    System.out.println("===========insert record========"+tuple._1()+"=="+tuple._2().toString());
                    JavaFlumeEventTest.addRecord("mytable","PutInpu",columnFamily,tuple._1(),tuple._2().toString());
                    System.out.println("===========Done record========"+tuple._1());
                }} );

            return null;
        }});

    flumeStream.count().map(new Function<Long, String>() {
      @Override
      public String call(Long in) {
        return "Received " + in + " flume events.";
      }
    }).print();

    ssc.start();
  }
}

我把它作为一个可运行的jar导出,从spark开始

./bin/spark-submit --class JavaFlumeEventTest --master local[15] /home/training/software/JavaFlumeEventTest3.jar elephant 11000 5000 mytable cf

没有例外,但没有数据添加到hbase~我发现线程停止在

HTable table = new HTable(conf, tableName);

这是Spark端子日志~

15/02/04 21:36:05 INFO DAGScheduler: Job 72 finished: print at JavaFlumeEventTest.java:139, took 0.056793 s
    -------------------------------------------
    Time: 1423103765000 ms
    -------------------------------------------
    (have,3)
    (example,,1)
    (dependencies,1)
    (linked,1)
    (1111,28)
    (non-Spark,1)
    (do,1)
    (some,1)
    (Hence,,1)
    (from,2)
    ...

    15/02/04 21:36:05 INFO JobScheduler: Finished job streaming job 1423103765000 ms.0 from job set of time 1423103765000 ms
    15/02/04 21:36:05 INFO JobScheduler: Starting job streaming job 1423103765000 ms.1 from job set of time 1423103765000 ms
    15/02/04 21:36:05 INFO SparkContext: Starting job: foreach at JavaFlumeEventTest.java:141
    15/02/04 21:36:05 INFO DAGScheduler: Got job 73 (foreach at JavaFlumeEventTest.java:141) with 15 output partitions (allowLocal=false)
    15/02/04 21:36:05 INFO DAGScheduler: Final stage: Stage 146(foreach at JavaFlumeEventTest.java:141)
    15/02/04 21:36:05 INFO DAGScheduler: Parents of final stage: List(Stage 145)
    15/02/04 21:36:05 INFO DAGScheduler: Missing parents: List()
    15/02/04 21:36:05 INFO DAGScheduler: Submitting Stage 146 (ShuffledRDD[114] at reduceByKey at JavaFlumeEventTest.java:132), which has no missing parents
    15/02/04 21:36:05 INFO MemoryStore: ensureFreeSpace(2544) called with curMem=141969, maxMem=280248975
    15/02/04 21:36:05 INFO MemoryStore: Block broadcast_86 stored as values in memory (estimated size 2.5 KB, free 267.1 MB)
    15/02/04 21:36:05 INFO MemoryStore: ensureFreeSpace(1862) called with curMem=144513, maxMem=280248975
    15/02/04 21:36:05 INFO MemoryStore: Block broadcast_86_piece0 stored as bytes in memory (estimated size 1862.0 B, free 267.1 MB)
    15/02/04 21:36:05 INFO BlockManagerInfo: Added broadcast_86_piece0 in memory on localhost:41505 (size: 1862.0 B, free: 267.2 MB)
    15/02/04 21:36:05 INFO BlockManagerMaster: Updated info of block broadcast_86_piece0
    15/02/04 21:36:05 INFO SparkContext: Created broadcast 86 from getCallSite at DStream.scala:294
    15/02/04 21:36:05 INFO DAGScheduler: Submitting 15 missing tasks from Stage 146 (ShuffledRDD[114] at reduceByKey at JavaFlumeEventTest.java:132)
    15/02/04 21:36:05 INFO TaskSchedulerImpl: Adding task set 146.0 with 15 tasks
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 0.0 in stage 146.0 (TID 466, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 1.0 in stage 146.0 (TID 467, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 2.0 in stage 146.0 (TID 468, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 3.0 in stage 146.0 (TID 469, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 4.0 in stage 146.0 (TID 470, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 5.0 in stage 146.0 (TID 471, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 6.0 in stage 146.0 (TID 472, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 7.0 in stage 146.0 (TID 473, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 8.0 in stage 146.0 (TID 474, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 9.0 in stage 146.0 (TID 475, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 10.0 in stage 146.0 (TID 476, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 11.0 in stage 146.0 (TID 477, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 12.0 in stage 146.0 (TID 478, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO TaskSetManager: Starting task 13.0 in stage 146.0 (TID 479, localhost, PROCESS_LOCAL, 1122 bytes)
    15/02/04 21:36:05 INFO Executor: Running task 0.0 in stage 146.0 (TID 466)
    15/02/04 21:36:05 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
    15/02/04 21:36:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
    ===========insert record========have==3
    ===========HTable ==========Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hbase-default.xml, hbase-site.xml, file:/etc/hbase/conf/hbase-site.xml, file:/etc/hadoop/conf/hdfs-site.xml, file:/etc/hadoop/conf/core-site.xml, file:/etc/hadoop/conf/mapred-site.xml, file:/etc/hadoop/conf/yarn-site.xml
    15/02/04 21:36:05 INFO Executor: Running task 1.0 in stage 146.0 (TID 467)
    15/02/04 21:36:05 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
    15/02/04 21:36:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
    ===========insert record========1111==28
    ===========HTable ==========Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hbase-default.xml, hbase-site.xml, file:/etc/hbase/conf/hbase-site.xml, file:/etc/hadoop/conf/hdfs-site.xml, file:/etc/hadoop/conf/core-site.xml, file:/etc/hadoop/conf/mapred-site.xml, file:/etc/hadoop/conf/yarn-site.xml
    15/02/04 21:36:05 INFO Executor: Running task 2.0 in stage 146.0 (TID 468)
...
...
    15/02/04 21:36:05 INFO ContextCleaner: Cleaned shuffle 1
    15/02/04 21:36:05 INFO ContextCleaner: Cleaned shuffle 0
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:host.name=elephant
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.version=1.7.0_45
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.vendor=Oracle Corporation
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.home=/usr/java/jdk1.7.0_45-cloudera/jre
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.class.path=::/home/training/software/spark-1.2.0-bin-hadoop2.3/conf:/home/training/software/spark-1.2.0-bin-hadoop2.3/lib/spark-assembly-1.2.0-hadoop2.3.0.jar:/home/training/software/spark-1.2.0-bin-hadoop2.3/lib/datanucleus-api-jdo-3.2.6.jar:/home/training/software/spark-1.2.0-bin-hadoop2.3/lib/datanucleus-core-3.2.10.jar:/home/training/software/spark-1.2.0-bin-hadoop2.3/lib/datanucleus-rdbms-3.2.9.jar
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.io.tmpdir=/tmp
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.compiler=<NA>
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:os.name=Linux
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:os.arch=amd64
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:os.version=2.6.32-279.el6.x86_64
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:user.name=training
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:user.home=/home/training
    15/02/04 21:36:05 INFO ZooKeeper: Client environment:user.dir=/home/training/software/spark-1.2.0-bin-hadoop2.3
    15/02/04 21:36:05 INFO ZooKeeper: Initiating client connection, connectString=tiger:2181,elephant:2181,horse:2181 sessionTimeout=90000 watcher=hconnection-0x575b43dd, quorum=tiger:2181,elephant:2181,horse:2181, baseZNode=/hbase
    15/02/04 21:36:05 INFO RecoverableZooKeeper: Process identifier=hconnection-0x575b43dd connecting to ZooKeeper ensemble=tiger:2181,elephant:2181,horse:2181
    15/02/04 21:36:05 INFO ClientCnxn: Opening socket connection to server tiger/192.168.137.12:2181. Will not attempt to authenticate using SASL (unknown error)
    15/02/04 21:36:05 INFO ClientCnxn: Socket connection established to tiger/192.168.137.12:2181, initiating session
    15/02/04 21:36:05 INFO ClientCnxn: Session establishment complete on server tiger/192.168.137.12:2181, sessionid = 0x24b573f71f00007, negotiated timeout = 40000
    15/02/04 21:36:10 INFO JobScheduler: Added jobs for time 1423103770000 ms
    15/02/04 21:36:15 INFO JobScheduler: Added jobs for time 1423103775000 ms
    15/02/04 21:36:20 INFO JobScheduler: Added jobs for time 1423103780000 ms

顺便说一句,我可以用java向hbase添加数据,但是flume和spark~能帮我解决这个问题吗?泰铢~

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题