jenkins 在看起来不是阶段或闭包的管道中运行并行线程

bnlyeluc  于 2023-11-17  发布在  Jenkins
关注(0)|答案(1)|浏览(146)

我有一个流水线,它可以运行50个并行阶段,在每个阶段,我都启动一个Python脚本(使用sh,或bat步骤),它运行该阶段所需的所有逻辑。同一个python脚本创建了一个summary.json文件,其中包括任务及其进度/status。我想从Jenkins轮询这个文件,以更新在python脚本运行时显示在构建页面上的报告。

可能的解决方案:

1.对于每一个并行阶段,我可以生成一个额外的“监视”阶段,它会不断轮询工作区中的文件,以监视和更新报告。这里的问题是,我将使我的阶段数量增加一倍,这在管道视图中看起来很糟糕。
1.对于每一个并行阶段,我可以启动另一个“线程”,它的行为与前面的选项完全相同,只是它不会在管道视图或蓝海中显示为一个阶段。
1.在python模式下运行主python脚本(在后台启动进程,并带有一个dont-kill-me标志),然后继续从同一阶段轮询文件。这里的问题是,我需要监视进程PID,或者有某种方法来检测原始进程是否已终止。另一个问题是,我很可能无法获得日志。
任何人都有任何建议或可能的解决方案?谢谢。

w51jfk4q

w51jfk4q1#

对于每一个并行阶段,我可以生成一个额外的“monitor”阶段,该阶段不断轮询工作区中的文件以查找它所监视的阶段并更新报告

+----------------+                 +-------------------+
 |                |                 |                   |
 |   Parallel     |                 |   Monitor/Report  |
 |   Stages (50)  |---generates---> |   Update Thread   |
 |                |   summary.json  |                   |
 +----------------+                 +-------------------+

字符串

  • 作为每个阶段一部分的脚本在一个单独的节点上执行。因此线程需要在它监视的同一个节点上运行。
  • 所有的groovy代码(包括这个Thread)都在内置节点上运行,所以它不会找到文件。也许我们可以为每个阶段调整这个线程和一个线程,但这会导致控制器上的速度变慢,因为我们会消耗所有的线程。

您可以使用Jenkins的原生parallel步骤来并发运行任务,而不是手动创建线程。Jenkins将适当地处理执行线程。
由于每个阶段都在单独的节点上运行,因此可以使用agent指令为每个阶段或并行分支指定节点。
同一个python脚本创建了一个summary.json文件,其中包括任务及其进度/状态。我想从Jenkins轮询这个文件,以更新在python脚本运行时显示在构建页面上的报告。
为了解决在Python脚本运行时轮询summary.json文件以获取更新的需求,而不需要将阶段数加倍,也不需要使Jenkins控制器过载,我不应该使用while循环来连续轮询Python脚本的状态并检查summary.json文件。该操作是轻量级的,但当扩展到许多阶段时,如果管理不当,它可能会消耗Jenkins控制器上的大量资源。
我最初使用sleep(15)暂停来执行池化,这应该可以防止可能导致高CPU利用率的紧密循环。然而,这仍然意味着每个阶段每15秒独立执行一次I/O操作,这会增加并行阶段的数量。
为了尽量减少对Jenkins控制器的影响,您可以考虑:

  • 如果不需要近实时监控,则增加每次轮询之间的轮询间隔以减少负载。
  • 确保轮询的文件I/O操作使用异步I/O(如果可用)完成。
  • 实施节流机制以限制在特定时间范围内跨所有阶段的文件I/O操作的数量。
  • 将监控过程从Jenkins的外部移动到一个单独的系统,该系统负责监视summary.json文件并更新报告。这意味着您的Jenkins作业需要与此外部系统进行通信以提供更新。
def runInBackground(stageName) {
    // This function runs a Python script in the background
    // and ensures it will not be killed after the Jenkins step ends.
    sh script: "nohup python ${stageName}.py > ${stageName}_output.log 2>&1 & echo \$! > ${stageName}_pid.txt", returnStdout: false
}

pipeline {
    agent any
    stages {
        stage('Execute and Monitor') {
            steps {
                script {
                    def parallelStagesMap = [:]
                    for(int i = 0; i < 50; i++) {
                        def stageName = "Stage-${i}"
                        parallelStagesMap[stageName] = {
                            node {
                                // Run the Python script in the background
                                runInBackground(stageName)
                                // Read the PID
                                def pid = readFile("${stageName}_pid.txt").trim()
                                // Poll the summary.json file and logs for updates
                                while (sh(script: "ps -p ${pid}", returnStatus: true) == 0) {
                                    if (fileExists("${stageName}_summary.json")) {
                                        // Logic to update the report here
                                        // ...
                                    }
                                    // Optionally, read the latest logs and echo them for Jenkins output
                                    def logContent = readFile("${stageName}_output.log").trim()
                                    echo logContent
                                    // Sleep before polling again
                                    sleep(15)
                                }
                            }
                        }
                    }
                    // Execute all stages in parallel
                    parallel parallelStagesMap
                }
            }
        }
    }
    post {
        always {
            // Steps to collect and archive the logs from all stages
            script {
                for(int i = 0; i < 50; i++) {
                    def stageName = "Stage-${i}"
                    archiveArtifacts artifacts: "${stageName}_output.log"
                }
            }
        }
    }
}


每个Python脚本都在后台使用nohup启动,以防止在Jenkins步骤结束时被杀死。输出被重定向到日志文件。
后台进程的PID立即写入文件,以确保以后可以访问它。
while循环用于检查进程是否仍在运行,如果是,则执行summary.json和日志文件的必要轮询。
日志文件被定期读取并回显到Jenkins控制台,以实时保留日志输出。
最后,在构建后的步骤中,每个阶段的日志文件都会被存档。
如果这些更改后负载仍然过高,则可能需要外部监视解决方案来满足所有约束。
注意事项:我不认为Jenkins本身就支持实时更新构建页面。如果你想在Jenkins构建页面上显示实时更新,你可能需要使用支持此功能的Jenkins插件,或者,写入一个文件,然后通过HTTP提供服务,并使用JavaScript轮询该文件,并在真实的更新网页-可以利用HTML Publisher Plugin等插件来显示报告,但对于实时更新,您可能需要自定义解决方案。
虽然这个解决方案应该有效,但它有两个问题:

  • 由于我们没有在读取之间重置日志文件,所以我们将一遍又一遍地打印整个日志。
  • 由于我们在每次迭代中都要阅读和打印日志,因此在管道中会有很多很多的打印,每个打印都会添加额外的“步骤”。结合前面的观点,我们几乎永远不会得到整个日志的步骤。

关于这两个问题:
1.防止日志重复打印:您可以跟踪最后一次读取的位置,只打印新的内容,而不是在每次迭代中阅读并打印整个日志文件。
这可以通过维护一个存储最后读取行号的变量并使用tailsed等命令从该行向前读取来实现。
1.管理流水线步骤:为了避免流水线中创建过多的步骤,您可以限制日志阅读和回显的频率。
与其在循环的每次迭代中执行此操作,不如引入一个计数器,并仅在一定数量的迭代之后执行此操作。
修改后的文本将是:

def runInBackground(stageName) {
    sh script: "nohup python ${stageName}.py > ${stageName}_output.log 2>&1 & echo \$! > ${stageName}_pid.txt", returnStdout: false
}

def readNewLogContent(stageName, lastLineRead) {
    def newLastLineRead = sh(script: "wc -l < ${stageName}_output.log", returnStdout: true).trim().toInteger()
    if (newLastLineRead > lastLineRead) {
        def newContent = sh(script: "tail -n +${lastLineRead + 1} ${stageName}_output.log", returnStdout: true).trim()
        echo newContent
    }
    return newLastLineRead
}

pipeline {
    agent any
    stages {
        stage('Execute and Monitor') {
            steps {
                script {
                    def parallelStagesMap = [:]
                    for(int i = 0; i < 50; i++) {
                        def stageName = "Stage-${i}"
                        parallelStagesMap[stageName] = {
                            node {
                                runInBackground(stageName)
                                def pid = readFile("${stageName}_pid.txt").trim()
                                def lastLineRead = 0
                                def iterationCounter = 0
                                while (sh(script: "ps -p ${pid}", returnStatus: true) == 0) {
                                    if (fileExists("${stageName}_summary.json")) {
                                        // Logic to update the report here
                                        // ...
                                    }
                                    if (iterationCounter++ % 5 == 0) { // Adjust the modulus value as needed
                                        lastLineRead = readNewLogContent(stageName, lastLineRead)
                                    }
                                    sleep(15)
                                }
                            }
                        }
                    }
                    parallel parallelStagesMap
                }
            }
        }
    }
    post {
        always {
            script {
                for(int i = 0; i < 50; i++) {
                    def stageName = "Stage-${i}"
                    archiveArtifacts artifacts: "${stageName}_output.log"
                }
            }
        }
    }
}


readNewLogContent函数只从日志文件中读取自上次读取位置以来的新内容。

iterationCounter变量用于控制日志阅读的频率。可以根据读取日志的频率调整模数值。调整if (iterationCounter++ % 5 == 0)条件中的模数值,以平衡日志更新和管道步骤创建的频率。
lastLineRead变量跟踪日志文件已读取的行号,以确保每次只打印新内容。

相关问题