elasticsearch激发流媒体

bwitn5fc  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(325)

我在分析日志,我有这样的架构:
Kafka->Spark流->ElasticSearch
我的主要目标是在流媒体中创建机器学习模型。我想我可以做两件事:
1) Kafka->spark streaming(ml)->ElasticSearch
2) Kafka->spark streaming->elasticsearch->spark streaming(ml)
-我认为第二种架构是最好的,因为spark流媒体将直接使用索引数据。你怎么认为?对吗-我们能否轻松地将spark流媒体与elasticsearch实时连接起来-如果我们在spark streaming(elasticsearch之后)中创建一个模型,我们必须在这个地方使用这个模型(elasticsearch之后)还是可以在spark streaming(kafka之后的directery)中使用它#use==predict in real time-在elasticsearch之后创建模型是否会使我们的模型保持静态(或者不是在real time approach中)
谢谢您。

6vl6ewon

6vl6ewon1#

你是说这个?
Kafka->spark流媒体->elasticsearch数据库

val sqlContext = new SQLContext(sc)

//kafka group
val group_id = "receiveScanner"
// kafka topic
val topic = Map("testStreaming"-> 1)
// zk connect
val zkParams = Map(
  "zookeeper.connect" ->"localhost",
  "zookeeper.connection.timeout.ms" -> "10000",
  "group.id" -> group_id)

// Kafka
val kafkaConsumer = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,zkParams,topic,StorageLevel.MEMORY_ONLY_SER)
val receiveData = kafkaConsumer.map(_._2 )
// printer kafka data
receiveData.print()
receiveData.foreachRDD{ rdd=>
  val transform = rdd.map{ line =>
    val data = Json.parse(line)
    // play json parse
    val id = (data \ "id").asOpt[Int] match { case Some(x) => x; case None => 0}
    val name = ( data \ "name"  ).asOpt[String] match { case Some(x)=> x ; case None => "" }
    val age = (data \ "age").asOpt[Int] match { case Some(x) => x; case None => 0}
    val address = ( data \ "address"  ).asOpt[String] match { case Some(x)=> x ; case None => "" }
    Row(id,name,age,address)
  }

  val transfromrecive = sqlContext.createDataFrame(transform,schameType)
  import org.apache.spark.sql.functions._
  import org.elasticsearch.spark.sql._
  //filter age < 20 , to ES database
  transfromrecive.where(col("age").<(20)).orderBy(col("age").asc)
    .saveToEs("member/user",Map("es.mapping.id" -> "id"))
}

}

/*Dataframe名称/

def schameType =  StructType(
  StructField("id",IntegerType,false)::
  StructField("name",StringType,false)::
  StructField("age",IntegerType,false)::
  StructField("address",StringType,false)::
  Nil
)

相关问题