r—如何强制配置单元在不同的缩减器之间平均分配数据?

cbeh67ev  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(187)

假设我想把iris数据集作为一个配置单元表发送到不同的reducer,以便在r上并行运行同一个任务。我可以通过transform函数执行我的r脚本,并使用hive上的横向视图explode对iris数据集和包含“partition”变量的数组进行笛卡尔积,如下面的查询:

set source_table = iris;

    set x_column_names = "sepallenght|sepalwidth|petallength|petalwidth";
    set y_column_name = "species";
    set output_dir = "/r_output";
    set model_name ="paralelism_test";    
    set param_var = params;
    set param_array = array(1,2,3);

    set mapreduce.job.reduces=3;

    select transform(id, sepallenght, sepalwidth, petallength, petalwidth, species, ${hiveconf:param_var})
    using 'controlScript script.R ${hiveconf:x_column_names}${hiveconf:y_column_name}${hiveconf:output_dir}${hiveconf:model_name}${hiveconf:param_var}'
    as (script_result string)
    from 
    (select * 
    from ${hiveconf:source_table} 
    lateral view explode ( ${hiveconf:param_array} ) temp_table 
    as ${hiveconf:param_var}
    distribute by ${hiveconf:param_var}
    ) data_query;

我称之为内存控制脚本,所以为了客观起见,请忽略它。my script.r返回的是它接收到的唯一参数列表(“params”列中填充了“param\u var”数组值)以及它得到的分区的行数,如下所示:


# The aim of this script is to validate the paralel computation of R scripts through Hive.

compute_model <- function(data){
  paste("parameter ",unique(data[ncol(data)]), ", " , nrow(data), "lines")
}

main <- function(args){

  #Reading the input parameters
  #These inputs were passed along the transform's "using" clause, on Hive.
  x_column_names <- as.character(unlist(strsplit(gsub(' ','',args[1]),'\\|')))
  y_column_name <- as.character(args[2])
  target_dir <- as.character(args[3])
  model_name <- as.character(args[4])
  param_var_name <- as.character(args[5])

  #Reading the data table
  f <- file("stdin")
  open(f)
  data <- tryCatch({
    as.data.frame (
      read.table(f, header=FALSE, sep='\t', stringsAsFactors = T, dec='.')
    )}, 
    warning = function(w) cat(w),
    error = function(e) stop(e),
    finally = close(f)
  )

  #Computes the model. Here, the model can be any computation.
  instance_result <- as.character(compute_model(data))

  #writes the result to "stdout" separated by '\t'. This output must be a data frame where
  #each column represents a Hive Table column.
  write.table(instance_result,
              quote = FALSE,
              row.names = FALSE,
              col.names = FALSE,
              sep = "\t",
              dec='.'
  )
}

# Main code

############################################################### 

main(commandArgs(trailingOnly=TRUE))

我要hive做的是在这些缩减器中平均复制iris数据集。当我在param_数组变量上放置顺序值时,它可以正常工作,但是对于array(10、100、1000、10000)和mapreduce.job.reduces=4,或者array(-5、-4、-3、-2、-1,0,1,2,3,4,5)和mapreduce.job.reduces=11这样的值,一些reducer将不会接收任何数据,而其他一些将接收多个键。
问题是:有没有办法确保hive将每个分区分配给不同的reducer?
我说清楚了吗?这样做可能看起来很愚蠢,但我想在hadoop上运行grid search,并且对使用更适合此任务的其他技术有一些限制。
谢谢您!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题