我的流媒体jsondata在这里:
{
"visibility": "public",
"response": "yes",
"guests": 0,
"member": {
"member_id": 145170662,
"photo": "http:\/\/photos2.meetupstatic.com\/photos\/member\/b\/8\/1\/7\/thumb_255947127.jpeg",
"member_name": "Claude"
},
"rsvp_id": 1645188104,
"mtime": 1483074100082,
"event": {
"event_name": "Kayakers' 65th (Sun) : FishMkt Reserve to Ascot(Garvey Pk)",
"event_id": "236414110",
"time": 1483833600000,
"event_url": "https:\/\/www.meetup.com\/swankayakers\/events\/236414110\/"
},
"group": {
"group_topics": [{
"urlkey": "kayaking",
"topic_name": "Kayaking"
}, {
"urlkey": "newintown",
"topic_name": "New In Town"
}, {
"urlkey": "socialnetwork",
"topic_name": "Social Networking"
}, {
"urlkey": "wellness",
"topic_name": "Wellness"
}, {
"urlkey": "outdoors",
"topic_name": "Outdoors"
}, {
"urlkey": "multicultural-couples",
"topic_name": "Multicultural Couples"
}, {
"urlkey": "scuba-diving",
"topic_name": "Scuba Diving"
}, {
"urlkey": "singles-over-50",
"topic_name": "Singles Over 50"
}, {
"urlkey": "scuba-diving-adventures",
"topic_name": "Scuba Diving Adventures"
}, {
"urlkey": "female-fitness",
"topic_name": "female fitness"
}, {
"urlkey": "fun-females-over-40",
"topic_name": "Fun Females Over 40"
}, {
"urlkey": "singles-over-40",
"topic_name": "Singles Over 40"
}, {
"urlkey": "couples-over-40",
"topic_name": "Couples over 40+"
}, {
"urlkey": "kayaking-and-canoeing",
"topic_name": "Kayaking and Canoeing"
}, {
"urlkey": "nature",
"topic_name": "Nature"
}],
"group_city": "Perth",
"group_country": "au",
"group_id": 18906617,
"group_name": "Swan River* Kayaking",
"group_lon": 115.84,
"group_urlname": "swankayakers",
"group_lat": -31.96
}
}
我从kafka读取json数据以触发流式传输,并在下面的代码中使用sparksql(数据集)将json值插入到配置单元数据库y中
data.write().mode("overwrite").saveAsTable("dk.jsondata");
我正在使用这些版本:
spark 2.0.0,
kafka_2.11-0.8.2.1,
Hive 1.2.1
所以请给出一个使用sparkjava的解决方案
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/*import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalog.Function;*/
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.AbstractJavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
//Kafka To SparkStreaming and Json Parsing Working Code and DataAnalysis
public class KafkaToSparkStreaming
{
public static void main(String arr[]) throws InterruptedException
{
JavaSparkContext sc=null;
JavaStreamingContext ssc=null;
SQLContext sqlContext=null;
SparkConf conf = new SparkConf();
conf.set("spark.app.name", "SparkReceiver"); //The name of application. This will appear in the UI and in log data.
//conf.set("spark.ui.port", "7077"); //Port for application's dashboard, which shows memory and workload data.
conf.set("dynamicAllocation.enabled","false"); //Which scales the number of executors registered with this application up and down based on the workload
//conf.set("spark.cassandra.connection.host", "localhost"); //Cassandra Host Adddress/IP
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); //For serializing objects that will be sent over the network or need to be cached in serialized form.
//conf.setMaster("local");
conf.set("spark.streaming.stopGracefullyOnShutdown", "true");
sc = new JavaSparkContext(conf);
// Create the context with 2 seconds batch size
ssc = new JavaStreamingContext(sc, new Duration(2000));
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", "localhost:2181"); //Make all kafka data for this cluster appear under a particular path.
kafkaParams.put("group.id", "testgroup"); //String that uniquely identifies the group of consumer processes to which this consumer belongs
kafkaParams.put("metadata.broker.list", "localhost:9092"); //Producer can find a one or more Brokers to determine the Leader for each topic.
kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); //Serializer to use when preparing the message for transmission to the Broker.
kafkaParams.put("request.required.acks", "1"); //Producer to require an acknowledgement from the Broker that the message was received.
Set<String> topics = Collections.singleton("RsvpsJsonData2");
//Create an input DStream for Receiving data from socket
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams, topics);
System.out.println(directKafkaStream);
directKafkaStream.print();
JavaDStream<Long> data=directKafkaStream.count();
data.print();
//json parsing
JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
public String call(Tuple2<String,String> message) throws Exception {
return message._2();
};
});
//json.print();
final SparkSession spark = SparkSession
.builder()
.appName("Json Parsing Example")
.getOrCreate();;
json.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
if(!rdd.isEmpty()){
Dataset<Row> data = spark.read().json(rdd).select("mtime");
data.printSchema();
data.show();
data.createOrReplaceTempView("jsonData");
data.filter("rsvp_id");
data.write().mode("overwrite").saveAsTable("dk.jsondata");
}
}
});
ssc.start();
ssc.awaitTermination();
}
}
请给出解决方案
2条答案
按热度按时间lnlaulya1#
它在spark 2.2.0和kafka 0.10.0.0上发生了变化
imzjd6km2#
在创建sparksession时,是否可以尝试使用.enablehivesupport()