parSapply和进度条

7tofc5zh  于 2023-02-27  发布在  其他
关注(0)|答案(2)|浏览(190)

我使用parSapply函数在并行环境中运行一个模拟,下面是我的代码:

runpar <- function(i) MonteCarloKfun(i=i)

# Detect number of cores available
ncores <- detectCores(logical=TRUE)

# Set up parallel environment
cl <- makeCluster(ncores, methods=FALSE)

# Export objects to parallel environment
clusterSetRNGStream(cl,1234567) # not necessary since we do not sample
clusterExport(cl, c("kfunctions","frq","dvec","case","control","polygon", "MonteCarloKfun", "khat", 
                    "as.points", "secal"))

# For 1 parameter use parSapply
outpar <- parSapply(cl,i,runpar)

# close parallel environment
stopCluster(cl)

有没有人知道是否有可能在parSapply函数中添加进度条,理想情况下,我希望类似于pbapply库中的pbapply。

d8tt03nd

d8tt03nd1#

parSapply函数不支持进度条,我不认为有任何真正好的方法可以通过向任务函数添加额外的代码来实现进度条,尽管人们已经做出了勇敢的努力。
doSNOW包支持进度条,所以你既可以直接使用它,也可以编写一个类似于parSapply函数的 Package 器函数。

# This function is similar to "parSapply", but doesn't preschedule
# tasks and doesn't support "simplify" and "USE.NAMES" options
pbSapply <- function(cl, X, FUN, ...) {
  registerDoSNOW(cl)
  pb <- txtProgressBar(max=length(X))
  on.exit(close(pb))
  progress <- function(n) setTxtProgressBar(pb, n)
  opts <- list(progress=progress)
  foreach(i=X, .combine='c', .options.snow=opts) %dopar% {
    FUN(i, ...)
  }
}

您可以轻松地修改此函数以使用tkProgressBarwinProgressBar函数。
下面是pbSapply的使用示例:

library(doSNOW)
cl <- makeSOCKcluster(3)
x <- pbSapply(cl, 1:100, function(i, j) {Sys.sleep(1); i + j}, 100)

请注意,它没有使用预调度,因此如果任务比较小,性能就不如parSapply

7gcisfzg

7gcisfzg2#

    • 更新-2023年2月20日**

您可以使用parabar软件包来实现这一点。免责声明:我是软件包的作者。
可以在交互式R会话中使用该包,如下所示。

# Load the package.
library(parabar)

# Define a task to run in parallel.
task <- function(x) {
    # Sleep a bit.
    Sys.sleep(0.01)

    # Return the result of a computation.
    return(x + 1)
}

# Start a backend that supports progress tracking (i.e., `async`).
backend <- start_backend(cores = 4, cluster_type = "psock", backend_type = "async")

# Configure the bar if necessary, or change the bar engine.
configure_bar(
    format = " > completed :current out of :total tasks [:percent] [:elapsed]"
)

# Run the task.
results <- par_sapply(backend, 1:1000, task)

# Update the progress bar options.
configure_bar(
    format = "[:bar] :percent"
)

# Run the task again.
results <- par_sapply(backend, 1:1000, task)

# Stop the backend.
stop_backend(backend)

如果你需要更大的灵活性(例如,当构建一个R包时),还有一个基于R6类的低级API。

# Create a specification object.
specification <- Specification$new()

# Set the number of cores.
specification$set_cores(cores = 4)

# Set the cluster type.
specification$set_type(type = "psock")

# Create a progress tracking context.
context <- ProgressDecorator$new()

# Get a backend that supports progress-tracking.
backend <- AsyncBackend$new()

# Register the backend with the context.
context$set_backend(backend)

# Start the backend.
context$start(specification)

# Get a modern bar instance.
bar <- ModernBar$new()

# Register the bar with the context.
context$set_bar(bar)

# Configure the bar.
context$configure_bar(
    show_after = 0,
    format = " > completed :current out of :total tasks [:percent] [:elapsed]"
)

# Run a task in parallel (i.e., approx. 3.125 seconds).
context$sapply(x = 1:1000, fun = task)

# Get the task output.
output <- backend$get_output()

# Close the backend.
context$stop()

下面是一个建议的工作流,它可能符合Steve Weston的"勇敢的努力"的特征,但是,通过一些开销,它完成了我主要感兴趣的东西,即:(1)跨平台解决方案,(2)不使用低级别的parallel实现细节,以及(3)关于所使用的依赖关系是节俭的。
简而言之,下面的代码执行以下操作:

  • 函数prepare_file_for_logging创建稍后将用于报告和跟踪并行任务执行的进度的临时文件(即,OS特定位置)。
  • 函数par_sapply_with_progress在后台启动R会话(即,不阻塞主会话)。
  • 在这个后台会话中,它建立一个集群(即,它可以是PSOCKFORK),并通过函数parallel::parSapply并行运行任务。
  • 在每个任务运行期间,工作者(即,集群节点)向临时文件报告进度(即,严格地以新行的形式以避免竞争条件)。
  • 回到主进程,函数track_progress监视临时文件,并根据其内容显示和更新进度条。
  • 主进程将保持阻塞状态,直到进度条完成并且后台进程终止。

使用的库是parallelcallr,以及baseutils中的一些其他函数。为了清楚起见,下面的代码被显式注解。

用法

# Load libraries.
library(parallel)
library(callr)

# How many times to run?
runs <- 40

# Prepare file for logging the progress.
file_name <- prepare_file_for_logging()

# Run the task in parallel without blocking the main process.
process <- par_sapply_with_progress(
    # Cluster specifications.
    cores = 4,
    type = "PSOCK",

    # Where to write the progress.
    file_name = file_name,

    # Task specifications (i.e., just like in `parallel::parSapply`).
    x = 1:runs,
    fun = function(x, y) {
        # Wait a little.
        Sys.sleep(0.5)

        # Do something useful.
        return(x * y)
    },
    args = list(
        y = 10
    )
)

# Monitor the progress (i.e., blocking the main process until completion).
track_progress(
    process = process,
    iterations = runs,
    file_name = file_name,
    cleanup = TRUE
)

# Show the results.
print(process$get_result())

# |=====================================================================| 100%
#  [1]  10  20  30  40  50  60  70  80  90 100 110 120 130 140 150 160 170 180
# [19] 190 200 210 220 230 240 250 260 270 280 290 300 310 320 330 340 350 360
# [37] 370 380 390 400

实施

准备临时文件的函数

# Create and get temporary file name.
prepare_file_for_logging <- function(file_name) {
    # If the file name is missing.
    if(missing(file_name)) {
        # Get a temporary file name (i.e., OS specific).
        file_name <- tempfile()
    }

    # Create the actual file to avoid race conditions.
    file_created <- file.create(file_name)

    # Indicate if something went wrong creating the file.
    stopifnot("Failed to create file." = file_created)

    return(file_name)
}

并行运行任务的函数

# Run task in parallel and log the progress.
par_sapply_with_progress <- function(cores, type, file_name, x, fun, args) {
    # Decorate the task function to enable progress tracking.
    get_decorated_task <- function(task) {
        # Evaluate symbol.
        force(task)

        # Create wrapper.
        return(function(x, file_name, ...) {
            # Update the progress on exit.
            on.exit({
                # Write processed element to file.
                cat("\n", file = file_name, append = TRUE)
            })

            return(task(x, ...))
        })
    }

    # Get the decorated task.
    fun_decorated <- get_decorated_task(fun)

    # Start a background process.
    background_process <- callr::r_bg(function(cores, type, file_name, x, fun_decorated, args) {
        # Make cluster.
        cluster <- parallel::makeCluster(cores, type = type)

        # Close the cluster on exit.
        on.exit({
            # Stop the cluster.
            parallel::stopCluster(cluster)
        })

        # Output.
        output <- do.call(parallel::parSapply, c(
            list(cluster, x, fun_decorated, file_name), args
        ))

        # Return the results to the background process.
        return(output)
    }, args = list(cores, type, file_name, x, fun_decorated, args))

    # Return the background process `R6` object.
    return(background_process)
}

跟踪进度的函数

# Track progress and keep the main process busy.
track_progress <- function(process, iterations, file_name, cleanup = TRUE) {
    if (cleanup) {
        on.exit({
            # Remove the file (i.e., just in case).
            unlink(file_name)
       })
    }

    # Create a progress bar.
    bar <- txtProgressBar(min = 0, max = iterations, initial = NA, style = 3)

    # Record the number of processed iterations (i.e., runs).
    n_tasks_processed <- 0

    # While the process is alive.
    while(n_tasks_processed < iterations) {
        # Get the number of tasks processed.
        n_tasks_processed <- length(scan(file_name, blank.lines.skip = FALSE, quiet = TRUE))

        # If the process that started the workers is finished.
        if(!process$is_alive()) {
            # Indicate that all tasks have been processed.
            n_tasks_processed <- iterations
        }

        # Update the progress bar.
        setTxtProgressBar(bar, n_tasks_processed)
    }

    # Close the progress bar.
    close(bar)

    # Wait for the process to close.
    process$wait()
}

要考虑的事情

关于从临时文件中记录和读取进度,我可以想到两种方法来减少开销:

  • 首先,可以考虑减小报告的粒度(即,也许不必在每次任务运行之后记录进度,而是每五次左右记录一次)。
  • 其次,也可以考虑降低进度条的更新频率,目前track_progress是连续扫描临时文件并更新进度条,但这可能不是必须的,也许更好的方法是在后续文件扫描和进度条更新之间设置一个超时。

最后,我个人更喜欢打开一个集群,然后在代码的不同部分重用它,在这个场景中,我会从callr::r_bg(即短暂的后台R进程)切换到callr::r_session(即永久的R会话)以获得更多的控制(也就是,参见this question)。
我希望这对其他在这个问题上挣扎的人也有帮助。

相关问题