我正在尝试使用flume将数据花费到spark,然后将数据添加到hbase~我已经尝试使用flume+spark+hdfs,这是可行的。以下是源代码:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import com.google.common.collect.Lists;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
import scala.Tuple2;
import scala.Tuple4;
public class JavaFlumeEventTest {
private static final Pattern SPACE = Pattern.compile(" ");
private static Configuration conf = null;
/**
* initial
*/
static {
conf = HBaseConfiguration.create();
conf.addResource(new Path("file:///etc/hbase/conf/hbase-site.xml"));
conf.addResource(new Path("file:///etc/hadoop/conf/hdfs-site.xml"));
conf.addResource(new Path("file:///etc/hadoop/conf/core-site.xml"));
conf.addResource(new Path("file:///etc/hadoop/conf/mapred-site.xml"));
conf.addResource(new Path("file:///etc/hadoop/conf/yarn-site.xml"));
conf.set("hbase.zookeeper.quorum", "elephant,tiger,horse");
conf.set("hbase.zookeeper.property.clientPort","2181");
conf.set("hbase.master", "elephant" + ":60000");
conf.set("hbase.cluster.distributed", "true");
conf.set("hbase.rootdir", "hdfs://elephant:8020/hbase");
}
/**
* Add new record
* @param tableName
* @param rowKey
* @param family
* @param qualifier
* @param value
*/
public static void addRecord (String tableName, String rowKey, String family, String qualifier, String value){
try {
System.out.println("===========HTable =========="+conf);
HTable table = new HTable(conf, tableName);
System.out.println("===========put ==========");
Put put = new Put(Bytes.toBytes(rowKey));
System.out.println("===========put Add==========");
put.add(Bytes.toBytes(family),Bytes.toBytes(qualifier),Bytes.toBytes(value));
System.out.println("===========table put ==========");
table.put(put);
System.out.println("insert recored " + rowKey + " to table " + tableName +" ok.");
} catch (IOException e) {
System.out.println("===========IOException ==========");
e.printStackTrace();
}
}
private JavaFlumeEventTest() {
}
public static void main(String[] args) {
String host = args[0];
int port = Integer.parseInt(args[1]);
Duration batchInterval = new Duration(Integer.parseInt(args[2]));
final String tableName = args[3];
final String columnFamily = args[4];
SparkConf sparkConf = new SparkConf()
.setAppName("JavaFlumeEventTest")
.set("spark.executor.memory", "256m");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
final Broadcast<String> broadcastTableName = ssc.sparkContext().broadcast(tableName);
final Broadcast<String> broadcastColumnFamily = ssc.sparkContext().broadcast(columnFamily);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
JavaDStream<String>
words = flumeStream.flatMap(new FlatMapFunction<SparkFlumeEvent,String>(){
@Override
public Iterable<String> call(SparkFlumeEvent arg0) throws Exception {
String body = new String(arg0.event().getBody().array(), Charset.forName("UTF-8"));
return Lists.newArrayList(SPACE.split(body));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
wordCounts.foreach(new Function2<JavaPairRDD<String,Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> values,
Time time) throws Exception {
values.foreach(new VoidFunction<Tuple2<String, Integer>> () {
@Override
public void call(Tuple2<String, Integer> tuple){
System.out.println("===========insert record========"+tuple._1()+"=="+tuple._2().toString());
JavaFlumeEventTest.addRecord("mytable","PutInpu",columnFamily,tuple._1(),tuple._2().toString());
System.out.println("===========Done record========"+tuple._1());
}} );
return null;
}});
flumeStream.count().map(new Function<Long, String>() {
@Override
public String call(Long in) {
return "Received " + in + " flume events.";
}
}).print();
ssc.start();
}
}
我把它作为一个可运行的jar导出,从spark开始
./bin/spark-submit --class JavaFlumeEventTest --master local[15] /home/training/software/JavaFlumeEventTest3.jar elephant 11000 5000 mytable cf
没有例外,但没有数据添加到hbase~我发现线程停止在
HTable table = new HTable(conf, tableName);
这是Spark端子日志~
15/02/04 21:36:05 INFO DAGScheduler: Job 72 finished: print at JavaFlumeEventTest.java:139, took 0.056793 s
-------------------------------------------
Time: 1423103765000 ms
-------------------------------------------
(have,3)
(example,,1)
(dependencies,1)
(linked,1)
(1111,28)
(non-Spark,1)
(do,1)
(some,1)
(Hence,,1)
(from,2)
...
15/02/04 21:36:05 INFO JobScheduler: Finished job streaming job 1423103765000 ms.0 from job set of time 1423103765000 ms
15/02/04 21:36:05 INFO JobScheduler: Starting job streaming job 1423103765000 ms.1 from job set of time 1423103765000 ms
15/02/04 21:36:05 INFO SparkContext: Starting job: foreach at JavaFlumeEventTest.java:141
15/02/04 21:36:05 INFO DAGScheduler: Got job 73 (foreach at JavaFlumeEventTest.java:141) with 15 output partitions (allowLocal=false)
15/02/04 21:36:05 INFO DAGScheduler: Final stage: Stage 146(foreach at JavaFlumeEventTest.java:141)
15/02/04 21:36:05 INFO DAGScheduler: Parents of final stage: List(Stage 145)
15/02/04 21:36:05 INFO DAGScheduler: Missing parents: List()
15/02/04 21:36:05 INFO DAGScheduler: Submitting Stage 146 (ShuffledRDD[114] at reduceByKey at JavaFlumeEventTest.java:132), which has no missing parents
15/02/04 21:36:05 INFO MemoryStore: ensureFreeSpace(2544) called with curMem=141969, maxMem=280248975
15/02/04 21:36:05 INFO MemoryStore: Block broadcast_86 stored as values in memory (estimated size 2.5 KB, free 267.1 MB)
15/02/04 21:36:05 INFO MemoryStore: ensureFreeSpace(1862) called with curMem=144513, maxMem=280248975
15/02/04 21:36:05 INFO MemoryStore: Block broadcast_86_piece0 stored as bytes in memory (estimated size 1862.0 B, free 267.1 MB)
15/02/04 21:36:05 INFO BlockManagerInfo: Added broadcast_86_piece0 in memory on localhost:41505 (size: 1862.0 B, free: 267.2 MB)
15/02/04 21:36:05 INFO BlockManagerMaster: Updated info of block broadcast_86_piece0
15/02/04 21:36:05 INFO SparkContext: Created broadcast 86 from getCallSite at DStream.scala:294
15/02/04 21:36:05 INFO DAGScheduler: Submitting 15 missing tasks from Stage 146 (ShuffledRDD[114] at reduceByKey at JavaFlumeEventTest.java:132)
15/02/04 21:36:05 INFO TaskSchedulerImpl: Adding task set 146.0 with 15 tasks
15/02/04 21:36:05 INFO TaskSetManager: Starting task 0.0 in stage 146.0 (TID 466, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 1.0 in stage 146.0 (TID 467, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 2.0 in stage 146.0 (TID 468, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 3.0 in stage 146.0 (TID 469, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 4.0 in stage 146.0 (TID 470, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 5.0 in stage 146.0 (TID 471, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 6.0 in stage 146.0 (TID 472, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 7.0 in stage 146.0 (TID 473, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 8.0 in stage 146.0 (TID 474, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 9.0 in stage 146.0 (TID 475, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 10.0 in stage 146.0 (TID 476, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 11.0 in stage 146.0 (TID 477, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 12.0 in stage 146.0 (TID 478, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO TaskSetManager: Starting task 13.0 in stage 146.0 (TID 479, localhost, PROCESS_LOCAL, 1122 bytes)
15/02/04 21:36:05 INFO Executor: Running task 0.0 in stage 146.0 (TID 466)
15/02/04 21:36:05 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
15/02/04 21:36:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
===========insert record========have==3
===========HTable ==========Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hbase-default.xml, hbase-site.xml, file:/etc/hbase/conf/hbase-site.xml, file:/etc/hadoop/conf/hdfs-site.xml, file:/etc/hadoop/conf/core-site.xml, file:/etc/hadoop/conf/mapred-site.xml, file:/etc/hadoop/conf/yarn-site.xml
15/02/04 21:36:05 INFO Executor: Running task 1.0 in stage 146.0 (TID 467)
15/02/04 21:36:05 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
15/02/04 21:36:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
===========insert record========1111==28
===========HTable ==========Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hbase-default.xml, hbase-site.xml, file:/etc/hbase/conf/hbase-site.xml, file:/etc/hadoop/conf/hdfs-site.xml, file:/etc/hadoop/conf/core-site.xml, file:/etc/hadoop/conf/mapred-site.xml, file:/etc/hadoop/conf/yarn-site.xml
15/02/04 21:36:05 INFO Executor: Running task 2.0 in stage 146.0 (TID 468)
...
...
15/02/04 21:36:05 INFO ContextCleaner: Cleaned shuffle 1
15/02/04 21:36:05 INFO ContextCleaner: Cleaned shuffle 0
15/02/04 21:36:05 INFO ZooKeeper: Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
15/02/04 21:36:05 INFO ZooKeeper: Client environment:host.name=elephant
15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.version=1.7.0_45
15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.vendor=Oracle Corporation
15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.home=/usr/java/jdk1.7.0_45-cloudera/jre
15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.class.path=::/home/training/software/spark-1.2.0-bin-hadoop2.3/conf:/home/training/software/spark-1.2.0-bin-hadoop2.3/lib/spark-assembly-1.2.0-hadoop2.3.0.jar:/home/training/software/spark-1.2.0-bin-hadoop2.3/lib/datanucleus-api-jdo-3.2.6.jar:/home/training/software/spark-1.2.0-bin-hadoop2.3/lib/datanucleus-core-3.2.10.jar:/home/training/software/spark-1.2.0-bin-hadoop2.3/lib/datanucleus-rdbms-3.2.9.jar
15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.io.tmpdir=/tmp
15/02/04 21:36:05 INFO ZooKeeper: Client environment:java.compiler=<NA>
15/02/04 21:36:05 INFO ZooKeeper: Client environment:os.name=Linux
15/02/04 21:36:05 INFO ZooKeeper: Client environment:os.arch=amd64
15/02/04 21:36:05 INFO ZooKeeper: Client environment:os.version=2.6.32-279.el6.x86_64
15/02/04 21:36:05 INFO ZooKeeper: Client environment:user.name=training
15/02/04 21:36:05 INFO ZooKeeper: Client environment:user.home=/home/training
15/02/04 21:36:05 INFO ZooKeeper: Client environment:user.dir=/home/training/software/spark-1.2.0-bin-hadoop2.3
15/02/04 21:36:05 INFO ZooKeeper: Initiating client connection, connectString=tiger:2181,elephant:2181,horse:2181 sessionTimeout=90000 watcher=hconnection-0x575b43dd, quorum=tiger:2181,elephant:2181,horse:2181, baseZNode=/hbase
15/02/04 21:36:05 INFO RecoverableZooKeeper: Process identifier=hconnection-0x575b43dd connecting to ZooKeeper ensemble=tiger:2181,elephant:2181,horse:2181
15/02/04 21:36:05 INFO ClientCnxn: Opening socket connection to server tiger/192.168.137.12:2181. Will not attempt to authenticate using SASL (unknown error)
15/02/04 21:36:05 INFO ClientCnxn: Socket connection established to tiger/192.168.137.12:2181, initiating session
15/02/04 21:36:05 INFO ClientCnxn: Session establishment complete on server tiger/192.168.137.12:2181, sessionid = 0x24b573f71f00007, negotiated timeout = 40000
15/02/04 21:36:10 INFO JobScheduler: Added jobs for time 1423103770000 ms
15/02/04 21:36:15 INFO JobScheduler: Added jobs for time 1423103775000 ms
15/02/04 21:36:20 INFO JobScheduler: Added jobs for time 1423103780000 ms
顺便说一句,我可以用java向hbase添加数据,但是flume和spark~能帮我解决这个问题吗?泰铢~
暂无答案!
目前还没有任何答案,快来回答吧!