我正在尝试使用spark\u apply对spark集群计算由两列组成的数据的kmeans。数据是从配置单元中查询出来的,如下所示
> samplog1
# Source: lazy query [?? x 6]
# Database: spark_connection
id time1 latitude longitude timestamp hr
<chr> <dbl> <dbl> <dbl> <chr> <int>
1 fffc68e3-866e-4be5-b1bc-5d21b89622ae 1.509338e+12 1.373545 104.1265 2017-10-30 04:29:59 4
2 fffc7412-deb1-4587-9c22-29ca833865ed 1.509332e+12 5.701320 117.4892 2017-10-30 02:49:47 2
3 fffd16d5-83f1-4ea1-95de-34b1fcad392b 1.509338e+12 5.334012 100.2172 2017-10-30 04:25:44 4
4 fffc68e3-866e-4be5-b1bc-5d21b89622ae 1.509338e+12 1.373545 104.1265 2017-10-30 04:29:44 4
5 fffd16d5-83f1-4ea1-95de-34b1fcad392b 1.509332e+12 5.334061 100.2173 2017-10-30 02:58:30 2
6 fffd16d5-83f1-4ea1-95de-34b1fcad392b 1.509339e+12 5.334012 100.2172 2017-10-30 04:55:41 4
7 fffc7412-deb1-4587-9c22-29ca833865ed 1.509339e+12 5.729879 117.5787 2017-10-30 04:49:07 4
8 fffc68e3-866e-4be5-b1bc-5d21b89622ae 1.509340e+12 1.373545 104.1265 2017-10-30 05:02:08 5
9 fffc7412-deb1-4587-9c22-29ca833865ed 1.509325e+12 5.701320 117.4892 2017-10-30 00:53:12 0
10 fffc7412-deb1-4587-9c22-29ca833865ed 1.509336e+12 5.670300 117.4990 2017-10-30 04:08:12 4
我传递给spark\u apply的函数如下。它应该按id和hr获取数据组,计算每个组的kmeans,计算每个组代表的行的分数(置信度),并返回成员数和置信度最高的中心:
kms <- function(idLogs){
tryCatch({
km <- sparklyr::ml_kmeans(idLogs, centers = 3, features = c("latitude","longitude"))
km1 <- copy_to(sc, km$centers, overwrite = T)
cluster <- sdf_predict(km)
clustCounts <- cluster %>% group_by(prediction) %>%
tally %>%
mutate(conf=n/sum(n),
prediction=prediction+1)
clustCounts1 <- merge(clustCounts, km1, by.x=3, by.y=0)
clustCounts1 <- copy_to(sc, clustCounts1, overwrite = T)
clustCounts2 <- clustCounts1 %>% filter(., conf==max(conf)) %>% select(latitude, longitude, conf)
return(data.frame(clustCounts2))
}, error = function(e) {
return(
data.frame(string_id = c(0), string_categories = c("error"))
)
})
}
我把它当作
spark_apply(x = samplog1, f = kms, group_by = c("id","hr"))
但是,我收到一个关于“id”列不明确的错误。
Error: org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous, could be: id#1569, id#1571.;
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:171)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4$$anonfun$26.apply(Analyzer.scala:470)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4$$anonfun$26.apply(Analyzer.scala:470)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4.applyOrElse(Analyzer.scala:470)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4.applyOrElse(Analyzer.scala:466)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:108)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:122)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:122)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:466)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:346)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:346)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:327)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:37)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:37)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:35)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2141)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:721)
at org.apache.spark.sql.DataFrame.selectExpr(DataFrame.scala:754)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at sparklyr.Invoke$.invoke(invoke.scala:102)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
at sparklyr.StreamHandler$.read(stream.scala:62)
at sparklyr.BackendHandler.channelRead0(handler.scala:52)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToM
从我所看到的解释来看,当连接共享id的Dataframe时,可能会发生这种情况。在这种情况下,我没有加入任何Dataframe。唯一可能的罪魁祸首是merge函数,但组成Dataframe没有任何id列。我是sparkyr和spark\u应用程序的新手,非常感谢我可以写我的函数完全错误。我把整个脚本贴在下面,以防它可能会暴露出其他问题。我希望这不会把事情搞得一团糟:
Sys.setenv(HIVE_HOME="/opt/cloudera/parcels/CDH/lib/hive/")
kms <- function(idLogs){
tryCatch({
km <- sparklyr::ml_kmeans(idLogs, centers = 3, features = c("latitude","longitude"))
km1 <- copy_to(sc, km$centers, overwrite = T)
cluster <- sdf_predict(km)
clustCounts <- cluster %>% group_by(prediction) %>%
tally %>%
mutate(conf=n/sum(n),
prediction=prediction+1)
clustCounts1 <- merge(clustCounts, km1, by.x=3, by.y=0)
clustCounts1 <- copy_to(sc, clustCounts1, overwrite = T)
clustCounts2 <- clustCounts1 %>% filter(., conf==max(conf)) %>% select(latitude, longitude, conf)
return(data.frame(clustCounts2))
}, error = function(e) {
return(
data.frame(string_id = c(0), string_categories = c("error"))
)
})
}
sc <- spark_connect(master = "yarn-client",
version="1.6.0",
spark_home = '/opt/cloudera/parcels/CDH/lib/spark/')
tbl_change_db(sc, "clustergps")
samplog <- tbl(sc, "part6")
samplog <- mutate(samplog, timestamp = from_unixtime(time1/1000))
samplog <- mutate(samplog, hr = hour(timestamp))
samplog1 <- samplog %>% filter(id == 'fffd16d5-83f1-4ea1-95de-34b1fcad392b' |
id == 'fffc7412-deb1-4587-9c22-29ca833865ed' |
id == 'fffc68e3-866e-4be5-b1bc-5d21b89622ae')
likelyLocs <- spark_apply(samplog1, kms, group_by = list("id","hr"))
1条答案
按热度按时间x3naxklr1#
所以我想给你一些反馈。我可以通过设置spark\u apply的“columns”参数来解决这个问题,该参数定义输出列的名称。我发现将它设置为任何字符串/字符串值向量都有效。