hadoop上的runr距离函数

qij5mzcb  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(229)

我有一系列的rDataframe(千)。每个变量都有一个分类变量(productid)和一个连续变量(sales)。我还创建了一个距离函数(my\u distance),用于计算同一Dataframe中两个productid之间的距离。由于每个Dataframe中有数百个productid和数千个Dataframe,所以我想探索一下使用hadoop来加速这个过程的机会。现在我使用for循环来迭代所有Dataframe,并使用mcmapply来计算给定Dataframe中productid之间的所有距离。我想知道这是否可以在hadoop中实现,以便在集群的节点上利用并行计算。不要注意距离函数的内容,因为它只是一个示例。

library(parallel)
library(reshape2)

calcDist <- function(x1, x2) {
  return(sqrt(sum(x1^2-x2^2)))
}

my_distance <- function(df, id1, id2) {
  x1 <- df[df$productId==id1,c('sales')]  
  x2 <- df[df$productId==id2,c('sales')]
  distx <- calcDist(x1, x2)
  return(distx)
}

productId <- c(1,1,1,1,2,2,2,2,3,3,3,3)
sales <- runif(length(productId), min=0, max=100)

df <-data.frame(productId,sales)

...mcmapply()
gg0vcinb

gg0vcinb1#

这里有一个可行的解决方案,注意你的函数有时会返回 NaN ,我没有调查,因为你的问题似乎更倾向于整个程序。
我曾经 localhost 作为工人,你只需要更换 hostNumbers 使用集群上节点名称的字符向量,或者初始化集群。只要你用它来代替 devClust Map会有用的。


# define supporting data structures

productId <- rep(c(1, 2, 3), each = 4)
sales.gen <- function() runif(length(productId), min = 0, max = 100)
df.gen <- function(x) data.frame(productId, sales = sales.gen())
dfList <- lapply(as.list(1:10), df.gen)

library(parallel)
on.exit(stopCluster(devClust))

# here you should use a vector of your nodes' names

hostNumbers <- c("localhost")

# builds a list structure of host names, one per node

hostFrame <- lapply(hostNumbers, function(x) list(host = x));

# replicates each node `kCPUs` times, so number of cpus on each node is equal

# NOTE: total number of workers cannot exceed 128 in base R!

kCPUs <- 2 
hostList <- rep(hostFrame, kCPUs);

# initialize the socket cluster, outfile = "" specifies that individual cpu commands & logs be printed to stdout, w

# which will be our log file for R.

devClust <- makePSOCKcluster(hostList)

# define functions

calcDist <- function(x1, x2) {
  return(sqrt(sum(x1^2-x2^2)))
}

my_distance <- function(df, id1, id2) {
  x1 <- df[df$productId==id1,c('sales')]  
  x2 <- df[df$productId==id2,c('sales')]
  distx <- calcDist(x1, x2)
  return(distx)
}

# export function definitions to cluster

functionDefs <- Filter(function(x) is.function(get(x, .GlobalEnv)), ls(.GlobalEnv))
clusterExport(devClust, functionDefs)

# run distance calcs. Note that we quote the function because it has

# already been exported to the workers so there is no need to serialize again

list.of.distance.calcs <- clusterMap(devClust, 'my_distance', dfList, MoreArgs = list(id1 = 1, id2 = 2))

相关问题