een parallele batch scheduler in shell
This commit is contained in:
parent
ef13833b4e
commit
c20c39f12d
4 changed files with 327 additions and 0 deletions
285
pbatch/pb
Executable file
285
pbatch/pb
Executable file
|
|
@ -0,0 +1,285 @@
|
|||
#!/bin/bash
|
||||
|
||||
counterfile=jobscounter
|
||||
counterlock=counterlock
|
||||
runninglock=runninglock
|
||||
runningfile=runningcounter
|
||||
basedir=$(pwd)
|
||||
runmode=0755
|
||||
waittime=${waittime:=3}
|
||||
maxproc=${maxproc:=3}
|
||||
pbcommand=$0
|
||||
|
||||
die() { echo "$*" >&2; exit 1; }
|
||||
|
||||
# We need unique jobs numbers, preferably sequentially.
|
||||
# So just use a counter file.
|
||||
increasejobscounter() {
|
||||
(
|
||||
flock -x 200
|
||||
if [ -e $counterfile ]; then
|
||||
jobscounter=$(cat $counterfile)
|
||||
fi
|
||||
jobscounter=$(expr $jobscounter + 1 )
|
||||
echo $jobscounter > $counterfile
|
||||
# nodig zodat aanroepende functies kunnen weten welk
|
||||
# jobnummer ze mogen gebruiken
|
||||
echo ${jobscounter}
|
||||
) 200>$counterlock
|
||||
# XXX hoe haal je lockfile weer netjes weg?
|
||||
# het antwoord is waarschijnlijk: niet
|
||||
}
|
||||
|
||||
addjob() {
|
||||
# command
|
||||
jobnumber=$(increasejobscounter)
|
||||
echo "New job: ${jobnumber}"
|
||||
if [ -e "${basedir}/jobs/${jobnumber}" ]; then
|
||||
die "Job ${jobnumber} already scheduled. This should not happen."
|
||||
fi
|
||||
echo "$*" > "${basedir}/jobs/${jobnumber}"
|
||||
chmod ${runmode} "${basedir}/jobs/${jobnumber}"
|
||||
}
|
||||
|
||||
addjobfromfile() {
|
||||
# filename
|
||||
jobnumber=$(increasejobscounter)
|
||||
echo "New job: ${jobnumber}"
|
||||
filename=$1
|
||||
if [ -f "${filename}" ]; then
|
||||
if [ -e "${basedir}/jobs/${jobnumber}" ]; then
|
||||
die "Job ${jobnumber} already scheduled. This should not happen."
|
||||
fi
|
||||
cp "${filename}" "${basedir}/jobs/${jobnumber}"
|
||||
chmod ${runmode} "${basedir}/jobs/${jobnumber}"
|
||||
else
|
||||
die "file not found: ${filename}"
|
||||
fi
|
||||
}
|
||||
|
||||
deljob() {
|
||||
# jobnumber
|
||||
jobnumber=$1
|
||||
if [ -f "${basedir}/jobs/${jobnumber}" ]; then
|
||||
if rm "${basedir}/jobs/${jobnumber}" ; then
|
||||
echo "Job removed: ${jobnumber}"
|
||||
else
|
||||
die "Couldn't remove job: ${jobnumber}"
|
||||
fi
|
||||
else
|
||||
die "Job '${jobnumber}' not in queue"
|
||||
fi
|
||||
}
|
||||
|
||||
parkjobs() {
|
||||
# XXX probably needs locking too, so it doens't clash with starting jobs
|
||||
if [ -n "$(ls ${basedir}/jobs/*)" ]; then
|
||||
mv ${basedir}/jobs/* ${basedir}/park/
|
||||
fi
|
||||
}
|
||||
|
||||
unparkjobs() {
|
||||
if [ -n "$(ls ${basedir}/park/*)" ]; then
|
||||
mv ${basedir}/park/* ${basedir}/jobs/
|
||||
fi
|
||||
}
|
||||
|
||||
unfail() {
|
||||
# jobnumber
|
||||
jobnumber=$1
|
||||
if [ -d "${basedir}/failure/${jobnumber}" ]; then
|
||||
if [ -f "${basedir}/failure/${jobnumber}/job" ]; then
|
||||
mv "${basedir}/failure/${jobnumber}/job" "${basedir}/jobs/${jobnumber}"
|
||||
rm -rf "${basedir}/failure/${jobnumber}"
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
changeprio() {
|
||||
# prio
|
||||
# jobnumber
|
||||
echo unimplemented
|
||||
}
|
||||
|
||||
status() {
|
||||
readconfig
|
||||
# show which jobs are running
|
||||
echo "Running jobs ($(ls ${basedir}/processors/|wc -w)/${maxproc}):"
|
||||
ls "${basedir}/processors/"
|
||||
# show queues
|
||||
echo "Jobs queued ($(ls ${basedir}/jobs/|wc -w)):"
|
||||
ls "${basedir}/jobs/"
|
||||
echo "Jobs parked: $(ls ${basedir}/park/|wc -w)"
|
||||
echo "Jobs finished: $(ls ${basedir}/results/|wc -w)"
|
||||
echo "Jobs failed ($(ls ${basedir}/failure/|wc -w)):"
|
||||
ls "${basedir}/failure/"
|
||||
}
|
||||
|
||||
stop() {
|
||||
# stop a jobnumber
|
||||
echo unimplemented
|
||||
}
|
||||
|
||||
stopall() {
|
||||
# stop all jobs and scheduler
|
||||
echo unimplemented
|
||||
}
|
||||
|
||||
usage() {
|
||||
echo unimplemented
|
||||
cat <<ENDTXT
|
||||
Usage: pb <option>
|
||||
|
||||
add <command> Add the given command as a job.
|
||||
addfile|af <file> Add the given file as a job.
|
||||
createconfing Write a default configuration file.
|
||||
run Start the master scheduler process.
|
||||
del <jobnr> Delete the given job from the queue.
|
||||
help|-h Show this help.
|
||||
park Place all queued jobs in the parkinglot.
|
||||
unpark Place all jobs from the parkinglot back in the queue.
|
||||
retry <jobnr> Reschedule a failed job. Destroys all output.
|
||||
prio <h|n|l> <jobnr> Change the priority of the given job. [unimplemented]
|
||||
status|st Show the status of the current jobs.
|
||||
stop <jobnr> Stop the given job. [unimplemented]
|
||||
stopall Stop all running jobs. [unimplemented]
|
||||
ENDTXT
|
||||
}
|
||||
|
||||
readconfig() {
|
||||
if [ -f ${basedir}/config ]; then
|
||||
source ${basedir}/config
|
||||
fi
|
||||
}
|
||||
|
||||
createdirs() {
|
||||
# setup dirs that will be used
|
||||
dirs="jobs processors results failure monitor park"
|
||||
for dir in $dirs; do
|
||||
if [ ! -d "${basedir}/${dir}" ]; then
|
||||
if ! mkdir "${basedir}/${dir}"; then
|
||||
echo "couldn't create directory ${basedir}/${dir}"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
createconfig() {
|
||||
if [ ! -e ${basedir}/config ]; then
|
||||
echo "# how many parallel processes?" >> ${basedir}/config
|
||||
echo "maxproc=${maxproc}" >> ${basedir}/config
|
||||
echo "# take a break for how long between process end and starting a new one" >> ${basedir}/config
|
||||
echo "waittime=${waittime}" >> ${basedir}/config
|
||||
fi
|
||||
}
|
||||
|
||||
incrementrunning() {
|
||||
(
|
||||
flock -x 210
|
||||
if [ -e $runningfile ]; then
|
||||
echo $(expr $(cat $runningfile) + 1) > $runningfile
|
||||
fi
|
||||
) 210>$runninglock
|
||||
}
|
||||
|
||||
decrementrunning() {
|
||||
(
|
||||
flock -x 220
|
||||
if [ -e $runningfile ]; then
|
||||
runningcounter=$(cat $runningfile)
|
||||
fi
|
||||
runningcounter=$(expr $runningcounter - 1 )
|
||||
if [ $runningcounter -lt 0 ]; then
|
||||
echo 0 > $runningfile
|
||||
else
|
||||
echo $runningcounter > $runningfile
|
||||
fi
|
||||
) 220>$runninglock
|
||||
}
|
||||
|
||||
getjob() {
|
||||
# Meh, ik ken geen handige manier om er een uit
|
||||
# het rijtje te halen. Dan maar zo.
|
||||
# lock hier omheen?
|
||||
for jl in ${basedir}/jobs/*;do
|
||||
if [ -n "${jl}" ]; then
|
||||
echo ${jl##*/}
|
||||
mkdir ${basedir}/processors/${jl##*/}
|
||||
mv $jl ${basedir}/processors/${jl##*/}/job
|
||||
fi
|
||||
break
|
||||
done
|
||||
}
|
||||
|
||||
failjob() {
|
||||
jobnumber=$1
|
||||
mv "${basedir}/processors/${jobnumber}" "${basedir}/failure"
|
||||
}
|
||||
|
||||
finishjob() {
|
||||
jobnumber=$1
|
||||
mv "${basedir}/processors/${jobnumber}" "${basedir}/results"
|
||||
}
|
||||
|
||||
runprocessor() {
|
||||
incrementrunning
|
||||
sleep 1
|
||||
runjob=$(getjob)
|
||||
if [ -n "${runjob}" ]; then
|
||||
echo "Starting job: ${runjob}"
|
||||
owd=$(pwd)
|
||||
cd "${basedir}/processors/${runjob}"
|
||||
if "${basedir}/processors/${runjob}/job" 2>&1 > "${basedir}/processors/${runjob}/output" ;then
|
||||
finishjob ${runjob}
|
||||
echo "Finished job: ${runjob}"
|
||||
else
|
||||
failjob ${runjob}
|
||||
echo "Failed job: ${runjob}"
|
||||
fi
|
||||
cd $owd
|
||||
fi
|
||||
decrementrunning
|
||||
}
|
||||
|
||||
master() {
|
||||
echo 0 >"${basedir}/$runningfile"
|
||||
|
||||
while sleep $waittime; do
|
||||
if [ $(cat "${basedir}/${runningfile}") -lt $maxproc ]; then
|
||||
# via ls omdat anders de globbing klote is
|
||||
jl=$(ls ${basedir}/jobs/)
|
||||
if [ -n "$jl" ]; then
|
||||
${pbcommand} process &
|
||||
fi
|
||||
readconfig
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
###################################
|
||||
|
||||
# test voor voldoende args
|
||||
command=$1
|
||||
shift
|
||||
|
||||
createdirs
|
||||
|
||||
case "$command" in
|
||||
add) addjob "$*" ;;
|
||||
addfile|af) addjobfromfile "$*" ;;
|
||||
createconfig) createconfig ;;
|
||||
run) master ;;
|
||||
del) deljob "$*" ;;
|
||||
help) usage ;;
|
||||
-h) usage ;;
|
||||
park) parkjobs ;;
|
||||
unpark) unparkjobs ;;
|
||||
retry) unfail "$*" ;;
|
||||
prio) changeprio "$*" ;;
|
||||
process) runprocessor "$*" ;;
|
||||
status|st) status ;;
|
||||
stop) stop "$*" ;;
|
||||
stopall) stopall ;;
|
||||
*) die "Usage: pb (add|addfile) <command>" ;;
|
||||
esac
|
||||
Loading…
Add table
Add a link
Reference in a new issue