使用sparkscala读取restapi json响应

axkjgtzd  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(463)

关闭。这个问题需要细节或清晰。它目前不接受答案。
**想改进这个问题吗?**通过编辑这个帖子来添加细节并澄清问题。

7个月前关门了。
改进这个问题
我想通过从dataframe应用一些参数来实现api,获取json响应主体,然后从主体中提取特定键的所有不同值。然后我需要将此列添加到第一个Dataframe中。
假设我有一个如下的Dataframe:

  1. df1:
  2. +-----+-------+--------+
  3. | DB | User | UserID |
  4. +-----+-------+--------+
  5. | db1 | user1 | 123 |
  6. | db2 | user2 | 456 |
  7. +-----+-------+--------+

我想通过提供 Df1 作为参数。
如果我的url参数是 db=db1 以及 User=user1 (第一条记录) df1 ),响应将是以下格式的json格式:

  1. {
  2. "data":[
  3. {
  4. "db": "db1"
  5. "User": "User1"
  6. "UserID": 123
  7. "Query": "Select * from A"
  8. "Application": "App1"
  9. },
  10. {
  11. "db": "db1"
  12. "User": "User1"
  13. "UserID": 123
  14. "Query": "Select * from B"
  15. "Application": "App2"
  16. }
  17. ]
  18. }

从这个json文件中,我想得到 Application 作为数组或列表键,并将其作为新列附加到 Df1 我的输出如下所示:

  1. Final df:
  2. +-----+-------+--------+-------------+
  3. | DB | User | UserID | Apps |
  4. +-----+-------+--------+-------------+
  5. | db1 | user1 | 123 | {App1,App2} |
  6. | db2 | user2 | 456 | {App3,App3} |
  7. +-----+-------+--------+-------------+

我已经就如何实现这一目标提出了一个高层次的计划。
添加一个名为response url的新列,该列由input中的多个列构建。
定义一个scala函数,该函数接收url并返回一个应用程序数组,然后将其转换为udf。
通过传递响应url应用udf来创建另一列。
由于我是scala spark的新手,而且从未使用过RESTAPI,请有人在这里帮助我实现这个结果。
任何其他的想法或建议都是受欢迎的。
我用的是spark 1.6。

1tu0hz3e

1tu0hz3e1#

检查下面的代码,您可能需要编写逻辑来调用reset api。一旦你们得到结果,下一个过程就简单了。

  1. scala> val df = Seq(("db1","user1",123),("db2","user2",456)).toDF("db","user","userid")
  2. df: org.apache.spark.sql.DataFrame = [db: string, user: string, userid: int]
  3. scala> df.show(false)
  4. +---+-----+------+
  5. |db |user |userid|
  6. +---+-----+------+
  7. |db1|user1|123 |
  8. |db2|user2|456 |
  9. +---+-----+------+
  10. scala> :paste
  11. // Entering paste mode (ctrl-D to finish)
  12. def invokeRestAPI(db:String,user: String) = {
  13. import org.json4s._
  14. import org.json4s.jackson.JsonMethods._
  15. implicit val formats = DefaultFormats
  16. // Write your invoke logic & for now I am hardcoding your sample json here.
  17. val json_data = parse("""{"data":[ {"db": "db1","User": "User1","UserID": 123,"Query": "Select * from A","Application": "App1"},{"db": "db1","User": "User1","UserID": 123,"Query": "Select * from B","Application": "App2"}]}""")
  18. (json_data \\ "data" \ "Application").extract[Set[String]].toList
  19. }
  20. // Exiting paste mode, now interpreting.
  21. invokeRestAPI: (db: String, user: String)List[String]
  22. scala> val fetch = udf(invokeRestAPI _)
  23. fetch: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(StringType,true),List(StringType, StringType))
  24. scala> df.withColumn("apps",fetch($"db",$"user")).show(false)
  25. +---+-----+------+------------+
  26. |db |user |userid|apps |
  27. +---+-----+------+------------+
  28. |db1|user1|123 |[App1, App2]|
  29. |db2|user2|456 |[App1, App2]|
  30. +---+-----+------+------------+
展开查看全部

相关问题