给定一个包含要sqoop的表列表的文件,该脚本将启动一个包含选项列表的sqoop导入命令。这里的intel在“scheduler”中,这是我从这里借用的,意思是我希望脚本启动的子进程不超过一个变量中定义的最大数量,监视它们,一旦其中一个完成,就启动另一个来填充队列。这将一直执行到sqoop表的末尾。
脚本和调度程序工作正常,只是脚本在子shell完成其作业之前结束。
我试着插入 wait
在脚本的末尾,但这样它会等待我按enter键。
对不起,我不能透露完整的剧本。希望你能理解。
谢谢你的帮助。
# !/bin/bash
# Script to parallel offloading RDB tables to Hive via Sqoop
confFile=$1
listOfTables=$2
# Source configuration values
. "$confFile"
# This file contains various configuration options, as long as "parallels",
# which is the number of concurrent jobs I want to launch
# Some nice functions.
usage () {
...
}
doSqoop() {
This function launches a Sqoop command compiled with informations extracted
# in the while loop. It also writes 2 log files and look for Sqoop RC.
}
queue() {
queue="$queue $1"
num=$(($num+1))
}
regeneratequeue() {
oldrequeue=$queue
queue=""
num=0
for PID in $oldrequeue
do
if [ -d /proc/"$PID" ] ; then
queue="$queue $PID"
num=$(($num+1))
fi
done
}
checkqueue() {
oldchqueue=$queue
for PID in $oldchqueue
do
if [ ! -d /proc/"$PID" ] ; then
regeneratequeue # at least one PID has finished
break
fi
done
}
# Check for mandatory values.
...
#### HeavyLifting ####
# Since I have a file containing the list of tables to Sqoop along with other
# useful arguments like sourceDB, sourceTable, hiveDB, HiveTable, number of parallels,
# etc, all in the same line, I use awk to grab them and then
# I pass them to the function doSqoop().
# So, here I:
# 1. create a temp folder
# 2. grab values from line with awk
# 3. launch doSqoop() as below:
# 4. Monitor spawned jobs
awk '!/^($|#)/' < "$listOfTables" | { while read -r line;
do
# look for the folder or create it
# .....
# extract values fro line with awk
# ....
# launch doSqoop() with this line:
(doSqoop) &
PID=$!
queue $PID
while [[ "$num" -ge "$parallels" ]]; do
checkqueue
sleep 0.5
done
done; }
# Here I tried to put wait, without success.
编辑(2)
好的,所以我设法实现了迪比的建议,据我所知,这是正确的。我没有实现达菲所说的,因为我不太明白,我没有时间。
现在的问题是,我在dosqoop函数中移动了一些代码,它无法创建日志所需的/tmp文件夹。
我不明白怎么了。下面是代码,后面是错误。请考虑查询参数非常长并且包含空格
脚本
# !/bin/bash
# Script to download lot of tables in parallel with Sqoop and write them to Hive
confFile=$1
listOfTables=$2
# Source configuration values
. "$confFile"
# TODO: delete sqoop tmp directory after jobs ends #
doSqoop() {
local origSchema="$1"
local origTable="$2"
local hiveSchema="$3"
local hiveTable="$4"
local splitColumn="$5"
local sqoopParallels="$6"
local query="$7"
local logFileSummary="$databaseBaseDir"/"$hiveTable"-summary.log
local logFileRaw="$databaseBaseDir/"$hiveTable"-raw.log
databaseBaseDir="$baseDir"/"$origSchema"-"$hiveSchema"
[ -d "$databaseBaseDir" ] || mkdir -p "$databaseBaseDir"
if [[ $? -ne 0 ]]; then
echo -e "Unable to complete the process. \n
Cannot create logs folder $databaseBaseDir"
exit 1
fi
echo "#### [$(date +%Y-%m-%dT%T)] Creating Hive table $hiveSchema.$hiveTable from source table $origSchema.$origTable ####" | tee -a "$logFileSummary" "$logFileRaw"
echo -e "\n\n"
quote="'"
sqoop import -Dmapred.job.queuename="$yarnQueue" -Dmapred.job.name="$jobName" \
--connect "$origServer" \
--username SQOOP --password-file file:///"$passwordFile" \
--delete-target-dir \
--target-dir "$targetTmpHdfsDir"/"$hiveTable" \
--outdir "$dirJavaCode" \
--hive-import \
--hive-database "$hiveSchema" \
--hive-table "$hiveTable" \
--hive-partition-key "$hivePartitionName" --hive-partition-value "$hivePartitionValue" \
--query "$quote $query where \$CONDITIONS $quote" \
--null-string '' --null-non-string '' \
--num-mappers 1 \
--fetch-size 2000000 \
--as-textfile \
-z --compression-codec org.apache.hadoop.io.compress.SnappyCodec |& tee -a "$logFileRaw"
sqoopRc=$?
if [[ $sqoopRc -ne 0 ]]; then
echo "[$(date +%Y-%m-%dT%T)] Error importing $hiveSchema.$hiveTable !" | tee -a "$logFileSummary" "$logFileRaw"
echo "$hiveSchema.$hiveTable" >> $databaseBaseDir/failed_imports.txt
fi
echo "Tail of : $logFileRaw" >> "$logFileSummary"
tail -10 "$logFileRaw" >> "$logFileSummary"
}
export -f doSqoop
# Check for mandatory values.
if [[ ! -f "$confFile" ]]; then
echo -e " $confFile does not appear to be a valid file.\n"
usage
fi
if [[ ! -f "$listOfTables" ]]; then
echo -e " $listOfTables does not appear to be a valid file.\n"
usage
fi
if [[ -z "${username+x}" ]]; then
echo -e " A valid username is required to access the Source.\n"
usage
fi
if [[ ! -f "$passwordFile" ]]; then
echo -e " Password File $password does not appear to be a valid file.\n"
usage
fi
if [[ -z "${origServer+x}" ]]; then
echo -e " Sqoop connection string is required.\n"
usage
fi
#### HeavyLifting ####
awk -F"|" '!/^($|#)/ {print $1 $2 $3 $4 $5 $6 $7}' < "$listOfTables" | xargs -n7 -P$parallels bash -c "doSqoop {}"
错误
mkdir: cannot create directory `/{}-'mkdir: : Permission deniedcannot create directory `/{}-'
mkdir: : Permission denied
cannot create directory `/{}-': Permission denied
Unable to complete the process.
Cannot create logs folder /{}-
mkdir: cannot create directory `/{}-': Permission denied
Unable to complete the process.
Cannot create logs folder /{}-
Unable to complete the process.
Cannot create logs folder /{}-
Unable to complete the process.
Cannot create logs folder /{}-
mkdir: cannot create directory `/{}-': Permission denied
Unable to complete the process.
Cannot create logs folder /{}-
mkdir: cannot create directory `/{}-': Permission denied
mkdir: cannot create directory `/{}-': Permission denied
Unable to complete the process.
Cannot create logs folder /{}-
mkdir: mkdir: cannot create directory `/{}-'cannot create directory `/{}-': Permission denied: Permission denied
Unable to complete the process.
Cannot create logs folder /{}-
Unable to complete the process.
Cannot create logs folder /{}-
Unable to complete the process.
Cannot create logs folder /{}-
3条答案
按热度按时间bvjxkvbb1#
既然你在推
doSqoop
做背景工作&
,唯一限制脚本执行时间的是sleep 0.5
不管花多长时间checkqueue
跑。你考虑过使用
xargs
并行运行函数?我认为接近您的用例的示例:
结果:
有时候,让别人
xargs
为你做这项工作。编辑:
示例将3个参数传递到函数中,最多并行8个操作:
6rqinv9w2#
使用gnu parallel,您可能可以:
如果您只希望每个cpu核心有一个进程:
9njqaruj3#
过了一段时间,我现在有一些时间来回答我的问题,因为我真的不希望别人陷入这种问题。
我经历了不止一个问题,与代码中的bug和xargs的使用有关。事后来看,根据我的经验,我可以肯定地建议不要将xargs用于这类东西。bash不是最适合这样做的语言,但是如果您被迫这样做,请考虑使用gnu parallel。我很快就会把剧本移到这里。
关于这些问题:
我在向函数传递参数时遇到问题。首先是因为它们包含我没有注意到的特殊字符,然后是因为我没有使用-i args。我用xargs的选项解决了这个问题,即从中间的换行符中清除输入行
-l1 -I args
. 通过这种方式,它将行作为单个参数,将其传递给函数(在这里我用awk解析它们)。我尝试实现的调度程序不起作用。最后,我使用xargs来并行化函数的执行,并在函数内部编写自定义代码,以编写一些控制文件,帮助我理解(在脚本末尾)哪些地方出错,哪些地方有效。
xargs不提供为单独的作业收集输出的工具。它只是把它倒在标准输出上。我用hadoop工作,我有很多输出,只是一团糟。
同样,如果您将xargs与其他shell命令(如find、cat、zip等)一起使用,那么xargs很好。如果您有我的用例,就不要使用它。别这样,你会白发苍苍的。相反,花一些时间学习gnu并行,或者更好地使用功能齐全的语言(如果可以的话)。