publicscripts/pbatch/pb.rb
2009-02-11 09:20:35 +00:00

345 lines
8.8 KiB
Ruby
Executable file

#!/usr/bin/env ruby
# $Id$
# $URL$
require 'fileutils'
JOBSDIR = "jobs"
PARKDIR = "park"
PROCDIR = "processors"
RESULTDIR = "results"
FAILDIR = "failure"
MONITORDIR = "monitor"
RUNMODE = 0755
CONFIG = "config"
@config = { "waittime" => 3, "maxproc" => 3 }
COUNTERFILE="jobscounter"
RUNNINGFILE="runningcounter"
def usage
puts <<ENDTXT
Usage: pb <option>
add <command>>-->-------Add the given command as a job.
addfile|af <file>>------Add the given file as a job.
createconfing>-->-------Write a default configuration file.
run>---->------->-------Start the master scheduler process.
del <jobnr>>---->-------Delete the given job from the queue.
help|-h>>------->-------Show this help.
park>--->------->-------Place all queued jobs in the parkinglot.
unpark>->------->-------Place all jobs from the parkinglot back in the queue.
retry <jobnr>>-->-------Reschedule a failed job. Destroys all output.
prio <h|n|l> <jobnr>>---Change the priority of the given job.>--[unimplemented]
status|st>------>-------Show the status of the current jobs.
stop <jobnr>>--->-------Stop the given job.>----[unimplemented]
stopall>>------->-------Stop all running jobs.>-[unimplemented]
ENDTXT
exit
end
def readconfig
if FileTest.exists?("#{@basedir}/#{CONFIG}")
File.open("#{@basedir}/#{CONFIG}").each_line {|line|
line.sub!(/#.*/, "")
line.sub!(/^\s*/, "")
line.sub!(/\s*$/, "")
if line.match(/^$/)
next
end
line.match(/([^=]*?)\s*=\s*(.*)/)
case $1
when "maxproc" then @config["maxproc"] = $2
when "waittime" then @config["waittime"] = $2
else puts "unknown option #{$1}"
end
}
end
end
def status
readconfig
# show which jobs are running
puts "Running jobs (#{Dir.entries("#{@basedir}/#{PROCDIR}").size - 2}/#{@config["maxproc"]}):"
# just call 'ls' instead of trying to figure out how to format it nicely for the screen
system("ls" ,"#{@basedir}/#{PROCDIR}/")
# show queues
puts "Jobs queued (#{Dir.entries("#{@basedir}/#{JOBSDIR}").size - 2}):"
system("ls", "#{@basedir}/#{JOBSDIR}/")
puts "Jobs parked (#{Dir.entries("#{@basedir}/#{PARKDIR}").size - 2}):"
puts "Jobs finished: #{Dir.entries("#{@basedir}/#{RESULTDIR}").size - 2}"
puts "Jobs failed (#{Dir.entries("#{@basedir}/#{FAILDIR}").size - 2}):"
system("ls", "#{@basedir}/#{FAILDIR}/")
end
def createdirs
[ JOBSDIR, PARKDIR, PROCDIR, RESULTDIR, FAILDIR, MONITORDIR ].each{|dir|
if ! FileTest.directory?("#{@basedir}/#{dir}")
Dir.mkdir("#{@basedir}/#{dir}")
end
}
end
def increasejobscounter
jobscounter = 0
if FileTest.exists?(COUNTERFILE)
File.new(COUNTERFILE).flock(File::LOCK_EX)
File.open(COUNTERFILE, "r+"){|file|
jobscounter = file.read
if jobscounter.empty?
jobscounter = 0
end
jobscounter = jobscounter.to_i
jobscounter += 1
file.seek(0)
file.puts jobscounter
}
File.new(COUNTERFILE).flock(File::LOCK_UN)
else
# XXX wellicht eerst een touch ofzo doen en dan locken voor de schrijf loop
# geopend wordt
FileUtils.touch(COUNTERFILE)
File.new(COUNTERFILE).flock(File::LOCK_EX)
File.open(COUNTERFILE, "w"){|file|
jobscounter = 1
file.puts jobscounter
}
File.new(COUNTERFILE).flock(File::LOCK_UN)
end
return jobscounter
end
def addjob
jobnumber=sprintf("%06d", increasejobscounter)
puts "New job: #{jobnumber}"
if FileTest.exists?("#{@basedir}/#{JOBSDIR}/#{jobnumber}")
puts "Job #{jobnumber} already scheduled. This should not happen."
exit
end
File.open("#{@basedir}/#{JOBSDIR}/#{jobnumber}", "w"){|file|
file.puts ARGV.join(" ")
}
File.chmod(RUNMODE, "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
end
def addjobfromfile
filename = ARGV[0]
if filename.nil?
puts "Missing filename"
usage
else
if ! FileTest.exists?(filename)
puts "File #{filename} doesn't exist"
usage
end
end
jobnumber = sprintf("%06d", increasejobscounter)
puts "New job: #{jobnumber}"
File.copy(filename, "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
File.chmod(RUNMODE, "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
end
def createconfig
if ! FileTest.exists?("#{@basedir}/#{CONFIG}")
File.open("#{@basedir}/#{CONFIG}", "w"){|file|
file.puts <<EOT
# how many parallel processes?
maxproc=3
# take a break for how long between process end and starting a new one
waittime=3
EOT
}
end
end
def deljob
jobnumber = ARGV[0]
if jobnumber.nil?
puts "Missing jobnumber"
usage
else
jobnumber = sprintf("%06d", jobnumber.to_i)
if FileTest.exists?("#{@basedir}/#{JOBSDIR}/#{jobnumber}")
File.unlink("#{@basedir}/#{JOBSDIR}/#{jobnumber}")
else
puts "Job '#{jobnumber}' not in queue"
exit
end
end
end
def parkjobs
Dir.entries("#{@basedir}/#{JOBSDIR}").each{|file|
if file == '.' or file == '..'
next
end
FileUtils.mv("#{@basedir}/#{JOBSDIR}/#{file}", "#{@basedir}/#{PARKDIR}")
}
end
def unparkjobs
Dir.entries("#{@basedir}/#{PARKDIR}").each{|file|
if file == '.' or file == '..'
next
end
FileUtils.mv("#{@basedir}/#{PARKDIR}/#{file}", "#{@basedir}/#{JOBSDIR}")
}
end
def unfail
jobnumber = ARGV[0]
if jobnumber.nil?
puts "Missing jobnumber"
usage
else
jobnumber = sprintf("%06d", jobnumber.to_i)
if FileTest.exists?("#{@basedir}/#{FAILDIR}/#{jobnumber}/job")
FileUtils.mv("#{@basedir}/#{FAILDIR}/#{jobnumber}/job", "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
# XXX tja, lekker makkelijk even geen recursie implementeren
system("rm", "-rf", "#{@basedir}/#{FAILDIR}/#{jobnumber}")
else
puts "Job '#{jobnumber}' not in failed dir"
exit
end
end
end
def changeprio
puts "unimplemented"
end
def incrementrunning
# als ik threading ga gebruiken is dit niet meer nodig (want geen runningfile nodig)
File.new(RUNNINGFILE).flock(File::LOCK_EX)
File.open(RUNNINGFILE, "r+"){|file|
running = file.read
running = running.to_i + 1
file.seek(0)
file.trunc
file.puts running
}
File.new(RUNNINGFILE).flock(File::LOCK_UN)
end
def decrementrunning
# als ik threading ga gebruiken is dit niet meer nodig (want geen runningfile nodig)
File.new(RUNNINGFILE).flock(File::LOCK_EX)
File.open(RUNNINGFILE, "r+"){|file|
running = file.read
running = running.to_i - 1
if running < 0
running = 0
end
file.seek(0)
file.trunc
file.puts running
}
File.new(RUNNINGFILE).flock(File::LOCK_UN)
end
def getjob
Dir.entries("#{@basedir}/#{JOBSDIR}").each{|file|
if file == '.' or file == '..'
next
else
if FileTest.file?("#{@basedir}/#{JOBSDIR}/#{file}")
Dir.mkdir("#{@basedir}/#{PROCDIR}/#{file}")
FileUtils.mv("#{@basedir}/#{JOBSDIR}/#{file}", "#{@basedir}/#{PROCDIR}/#{file}/job")
return file
end
end
}
end
def failjob(jobnumber)
FileUtils.mv("#{@basedir}/#{PROCDIR}/#{jobnumber}", "#{@basedir}/#{FAILDIR}")
end
def finishjob(jobnumber)
FileUtils.mv("#{@basedir}/#{PROCDIR}/#{jobnumber}", "#{@basedir}/#{RESULTDIR}")
end
def runprocessor
incrementrunning
sleep 1 # XXX niet nodig als locking echt werkt (of bij threading)
runjob = getjob
if ! runjob.nil?
puts "Starting job: #{runjob}"
owd = Dir.pwd
Dir.cd("#{@basedir}/#{PROCDIR}/#{runjob}")
# XXX IPC nog even uitzoeken... popen3 ?
# if "${basedir}/processors/${runjob}/job" 2>&1 > "${basedir}/processors/${runjob}/output" ;then
# finishjob ${runjob}
puts "Finished job: #{runjob}"
# else
# failjob ${runjob}
puts "Failed job: #{runjob}"
# fi
Dir.cd(owd)
end
decrementrunning
end
def stop
puts "unimplemented"
end
def stopall
puts "unimplemented"
end
def master
if Process.euid != 0
puts "If your jobs need root priviledges (XXX spelling) you want to run this as root"
end
# als ik threading ga gebruiken is dit niet meer nodig (want geen runningfile nodig)
if ! FileTest.file?(RUNNINGFILE)
FileUtils.touch(RUNNINGFILE)
end
File.new(RUNNINGFILE).flock(File::LOCK_EX)
File.open(RUNNINGFILE, "w"){|file|
file.puts "0"
}
File.new(RUNNINGFILE).flock(File::LOCK_UN)
# while sleep $waittime; do
# if [ $(cat "${basedir}/${runningfile}") -lt $maxproc ]; then
# # via ls omdat anders de globbing klote is
# jl=$(ls ${basedir}/jobs/)
# if [ -n "$jl" ]; then
# ${pbcommand} process &
# fi
# readconfig
# fi
# done
end
# XXX SIGHUP handler die config opnieuw inleest
@pbcommand = $0
command = ARGV.shift
@basedir = Dir.pwd
createdirs
case command
when "add" then addjob
when "addfile", "af" then addjobfromfile
when "createconfig" then createconfig
when "run" then master
when "del" then deljob
when "help" then usage
when "-h" then usage
when "park" then parkjobs
when "unpark" then unparkjobs
when "retry" then unfail
when "prio" then changeprio
when "process" then runprocessor
when "status", "st" then status
when "stop" then stop
when "stopall" then stopall
else usage
end