在我的应用程序中,我需要从kafka接收流数据,并且需要对spark中接收到的流数据应用timeseries模型。
我能够从kafka读取流数据,但是我不知道如何在流数据上应用timeseries模型。
任何人请给我建议timeseries如何工作和用例。
数据集:
725030:14732,2008,01,01,00,5.0,-3.9,1020.4,270,4.6,2,0.0,0.0
725030:14732,2008,01,01,01,5.0,-3.3,1020.6,290,4.1,2,0.0,0.0
725030:14732,2008,01,01,02,5.0,-3.3,1020.0,310,3.1,2,0.0,0.0
725030:14732,2008,01,01,03,4.4,-2.8,1020.1,300,1.5,2,0.0,0.0
725030:14732,2008,01,01,04,3.3,-4.4,1020.5,240,2.6,0,0.0,0.0
sparkjava代码如下:
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
//KafkatoSparkStreaming Working Code
/*this code is converting KafkaToSparkstreaming into DataSet by useing SparkJava
* */
public class a {
public static void main(String arr[]) throws InterruptedException
{
SparkConf conf = new SparkConf();
conf.set("spark.app.name", "SparkReceiver"); //The name of application. This will appear in the UI and in log data.
//conf.set("spark.ui.port", "7077"); //Port for application's dashboard, which shows memory and workload data.
conf.set("dynamicAllocation.enabled","false"); //Which scales the number of executors registered with this application up and down based on the workload
//conf.set("spark.cassandra.connection.host", "localhost"); //Cassandra Host Adddress/IP
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); //For serializing objects that will be sent over the network or need to be cached in serialized form.
conf.setMaster("local");
conf.set("spark.streaming.stopGracefullyOnShutdown", "true");
JavaSparkContext sc = new JavaSparkContext(conf);
// Create the context with 2 seconds batch size
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", "localhost:2181"); //Make all kafka data for this cluster appear under a particular path.
kafkaParams.put("group.id", "testgroup"); //String that uniquely identifies the group of consumer processes to which this consumer belongs
kafkaParams.put("metadata.broker.list", "localhost:9092"); //Producer can find a one or more Brokers to determine the Leader for each topic.
kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); //Serializer to use when preparing the message for transmission to the Broker.
kafkaParams.put("request.required.acks", "1"); //Producer to require an acknowledgement from the Broker that the message was received.
Set<String> topics = Collections.singleton("ny-2008.csv");
//Create an input DStream for Receiving data from socket
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams, topics);
//Create JavaDStream<String>
JavaDStream<String> msgDataStream = directKafkaStream.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
//Create JavaRDD<Row>
msgDataStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
@Override
public Row call(String msg) {
Row row = RowFactory.create(msg);
return row;
}
});
//Create Schema
StructType schema = DataTypes.createStructType(new StructField[] {DataTypes.createStructField("Message", DataTypes.StringType, true)});
//Get Spark 2.0 session
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> msgDataFrame = spark.createDataFrame(rowRDD, schema);
msgDataFrame.show();
msgDataFrame.createOrReplaceTempView("weatherTemporaryData");
msgDataFrame.select("280").show();
}
});
ssc.start();
ssc.awaitTermination();
}
}
class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;
public static SparkSession getInstance(SparkConf sparkConf) {
if (instance == null) {
instance = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();
}
return instance;
}
}
1条答案
按热度按时间zmeyuzjn1#
从统计学的Angular 来看,时间序列模型使用一个时移窗口。例如,您将尝试从n个过去的值中预测未来值。如果这正是您想要实现的,那么您应该看看窗口函数。jacek在这方面写了一篇很好的文章:windowaggregate操作符。
简而言之,为了帮助您理解发生了什么,您必须创建一个windowspec示例来指定:
一种分区方案,例如基于记录的事件类
分区中的本地顺序,在您的例子中是事件时间戳
那你就用
lag
函数获取在给定事件之前发生的事件。对于timeseries模型推断,必须小心平稳性,因为连续事件不是独立的。在某些情况下,通过差分可以获得更好的平稳性(它消除了趋势);您将不考虑事件值,而是考虑它们与前一个事件的差异
lag
函数将再次成为您的朋友,但您需要n+1值。一旦完成差分和滞后处理,就可以尝试应用任何sparkml库统计建模方法。