publicscripts/pbatch/pb.rb
2009-11-30 15:59:57 +00:00

352 lines
8.8 KiB
Ruby
Executable file

#!/usr/bin/env ruby
# $Id$
# $URL$
require 'fileutils'
require 'thread'
require 'open3'
JOBSDIR = "jobs"
PARKDIR = "park"
PROCDIR = "processors"
RESULTDIR = "results"
FAILDIR = "failure"
MONITORDIR = "monitor"
RUNMODE = 0755
CONFIG = "config"
@config = { "waittime" => 3, "maxproc" => 3 }
@processor_threads = []
@jobclaimlock = Mutex.new
COUNTERFILE="jobscounter"
RUNNINGFILE="runningcounter"
def usage
puts <<ENDTXT
Usage: pb <option>
add <command> Add the given command as a job.
addfile|af <file> Add the given file as a job.
createconfing Write a default configuration file.
run Start the master scheduler process.
del <jobnr> Delete the given job from the queue.
help|-h Show this help.
park Place all queued jobs in the parkinglot.
unpark Place all jobs from the parkinglot back in the queue.
retry <jobnr> Reschedule a failed job. Destroys all output.
prio <h|n|l> <jobnr> Change the priority of the given job. [unimplemented]
status|st Show the status of the current jobs.
stop <jobnr> Stop the given job. [unimplemented]
stopall Stop all running jobs. [unimplemented]
ENDTXT
exit
end
def readconfig
if FileTest.exists?("#{@basedir}/#{CONFIG}")
File.open("#{@basedir}/#{CONFIG}").each_line {|line|
line.sub!(/#.*/, "")
line.sub!(/^\s*/, "")
line.sub!(/\s*$/, "")
if line.match(/^$/)
next
end
line.match(/([^=]*?)\s*=\s*(.*)/)
case $1
when "maxproc" then @config["maxproc"] = $2
when "waittime" then @config["waittime"] = $2
else puts "unknown option #{$1}"
end
}
end
end
def status
readconfig
# show which jobs are running
puts "Running jobs (#{Dir.entries("#{@basedir}/#{PROCDIR}").size - 2}/#{@config["maxproc"]}):"
# just call 'ls' instead of trying to figure out how to format it nicely for the screen
system("ls" ,"#{@basedir}/#{PROCDIR}/")
# show queues
puts "Jobs queued (#{Dir.entries("#{@basedir}/#{JOBSDIR}").size - 2}):"
system("ls", "#{@basedir}/#{JOBSDIR}/")
puts "Jobs parked (#{Dir.entries("#{@basedir}/#{PARKDIR}").size - 2}):"
puts "Jobs finished: #{Dir.entries("#{@basedir}/#{RESULTDIR}").size - 2}"
puts "Jobs failed (#{Dir.entries("#{@basedir}/#{FAILDIR}").size - 2}):"
system("ls", "#{@basedir}/#{FAILDIR}/")
end
def createdirs
[ JOBSDIR, PARKDIR, PROCDIR, RESULTDIR, FAILDIR, MONITORDIR ].each{|dir|
if ! FileTest.directory?("#{@basedir}/#{dir}")
Dir.mkdir("#{@basedir}/#{dir}")
end
}
end
def increasejobscounter
jobscounter = 0
if FileTest.exists?(COUNTERFILE)
File.new(COUNTERFILE).flock(File::LOCK_EX)
File.open(COUNTERFILE, "r+"){|file|
jobscounter = file.read
if jobscounter.empty?
jobscounter = 0
end
jobscounter = jobscounter.to_i
jobscounter += 1
file.seek(0)
file.puts jobscounter
}
File.new(COUNTERFILE).flock(File::LOCK_UN)
else
# XXX wellicht eerst een touch ofzo doen en dan locken voor de schrijf loop
# geopend wordt
FileUtils.touch(COUNTERFILE)
File.new(COUNTERFILE).flock(File::LOCK_EX)
File.open(COUNTERFILE, "w"){|file|
jobscounter = 1
file.puts jobscounter
}
File.new(COUNTERFILE).flock(File::LOCK_UN)
end
return jobscounter
end
def addjob
jobnumber=sprintf("%06d", increasejobscounter)
puts "New job: #{jobnumber}"
if FileTest.exists?("#{@basedir}/#{JOBSDIR}/#{jobnumber}")
puts "Job #{jobnumber} already scheduled. This should not happen."
exit
end
File.open("#{@basedir}/#{JOBSDIR}/#{jobnumber}", "w"){|file|
file.puts ARGV.join(" ")
}
File.chmod(RUNMODE, "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
end
def addjobfromfile
filename = ARGV[0]
if filename.nil?
puts "Missing filename"
usage
else
if ! FileTest.exists?(filename)
puts "File #{filename} doesn't exist"
usage
end
end
jobnumber = sprintf("%06d", increasejobscounter)
puts "New job: #{jobnumber}"
FileUtils.cp(filename, "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
File.chmod(RUNMODE, "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
end
def createconfig
if ! FileTest.exists?("#{@basedir}/#{CONFIG}")
File.open("#{@basedir}/#{CONFIG}", "w"){|file|
file.puts <<EOT
# how many parallel processes?
maxproc=3
# take a break for how long between process end and starting a new one
waittime=3
EOT
}
end
end
def deljob
jobnumber = ARGV[0]
if jobnumber.nil?
puts "Missing jobnumber"
usage
else
jobnumber = sprintf("%06d", jobnumber.to_i)
if FileTest.exists?("#{@basedir}/#{JOBSDIR}/#{jobnumber}")
File.unlink("#{@basedir}/#{JOBSDIR}/#{jobnumber}")
else
puts "Job '#{jobnumber}' not in queue"
exit
end
end
end
def parkjobs
Dir.entries("#{@basedir}/#{JOBSDIR}").each{|file|
if file == '.' or file == '..'
next
end
FileUtils.mv("#{@basedir}/#{JOBSDIR}/#{file}", "#{@basedir}/#{PARKDIR}")
}
end
def unparkjobs
Dir.entries("#{@basedir}/#{PARKDIR}").each{|file|
if file == '.' or file == '..'
next
end
FileUtils.mv("#{@basedir}/#{PARKDIR}/#{file}", "#{@basedir}/#{JOBSDIR}")
}
end
def unfail
jobnumber = ARGV[0]
if jobnumber.nil?
puts "Missing jobnumber"
usage
else
jobnumber = sprintf("%06d", jobnumber.to_i)
if FileTest.exists?("#{@basedir}/#{FAILDIR}/#{jobnumber}/job")
FileUtils.mv("#{@basedir}/#{FAILDIR}/#{jobnumber}/job", "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
# XXX tja, lekker makkelijk even geen recursie implementeren
system("rm", "-rf", "#{@basedir}/#{FAILDIR}/#{jobnumber}")
else
puts "Job '#{jobnumber}' not in failed dir"
exit
end
end
end
def changeprio
puts "unimplemented"
end
def getjob
Dir.entries("#{@basedir}/#{JOBSDIR}").each{|file|
if file == '.' or file == '..'
next
else
if FileTest.file?("#{@basedir}/#{JOBSDIR}/#{file}")
Dir.mkdir("#{@basedir}/#{PROCDIR}/#{file}")
FileUtils.mv("#{@basedir}/#{JOBSDIR}/#{file}", "#{@basedir}/#{PROCDIR}/#{file}/job")
return file
end
end
}
return nil
end
def failjob(jobnumber)
FileUtils.mv("#{@basedir}/#{PROCDIR}/#{jobnumber}", "#{@basedir}/#{FAILDIR}")
end
def finishjob(jobnumber)
FileUtils.mv("#{@basedir}/#{PROCDIR}/#{jobnumber}", "#{@basedir}/#{RESULTDIR}")
end
def runprocessor
@processor_threads << Thread.new do
begin
# gebruik een mutex voor het claimen van een job!
runjob = nil
@jobclaimlock.synchronize {
runjob = getjob
}
if ! runjob.nil?
starttime = Time.now
puts "Starting job: #{runjob}"
owd = Dir.pwd
# puts "#{@basedir}/#{PROCDIR}/#{runjob}"
Dir.chdir("#{@basedir}/#{PROCDIR}/#{runjob}")
begin
output = File.open("#{@basedir}/#{PROCDIR}/#{runjob}/output", "w")
Open3.popen3("#{@basedir}/#{PROCDIR}/#{runjob}/job", "r+"){ |stdin, stdout, stderr|
# dit moet netter kunnen, nu wordt de volgorde van de diverse outputs veranderd
while !stdout.eof and ! stderr.eof do
#output.print "XXX ERR #{stderr.gets}"
#output.print "XXX OUT #{stdout.gets}"
output.print stderr.gets
output.print stdout.gets
end
}
output.close
finishjob runjob
puts "Finished job: #{runjob} [runtime: #{Time.now-starttime}]"
rescue
failjob runjob
puts "Failed job: #{runjob} [runtime: #{Time.now-starttime}]"
puts $!
end
Dir.chdir(owd)
end
rescue
puts "failure in thread"
puts $!
end
end
end
def stop
puts "unimplemented"
end
def stopall
puts "unimplemented"
end
def master
if Process.euid != 0
puts "If your jobs need root privileges you want to run this as root"
end
while true
sleep @config["waittime"]
# puts "Number of threads: #{@processor_threads.length}"
# dode threads moeten opgeruimd worden
(0...@processor_threads.length).to_a.reverse.each{|thr|
# puts "thr: #{thr}"
# puts @processor_threads[thr].status
if @processor_threads[thr].status == false
# puts "deleting thread #{thr}"
@processor_threads.delete_at(thr)
end
}
if @processor_threads.length < @config["maxproc"]
# nog testen op aanwezig zijn van nieuwe jobs in queue?
runprocessor
end
readconfig
end
# while sleep $waittime; do
# if [ $(cat "${basedir}/${runningfile}") -lt $maxproc ]; then
# # via ls omdat anders de globbing klote is
# jl=$(ls ${basedir}/jobs/)
# if [ -n "$jl" ]; then
# ${pbcommand} process &
# fi
# readconfig
# fi
# done
end
# XXX SIGHUP handler die config opnieuw inleest
@pbcommand = $0
command = ARGV.shift
@basedir = Dir.pwd
createdirs
case command
when "add" then addjob
when "addfile", "af" then addjobfromfile
when "createconfig" then createconfig
when "run" then master
when "del" then deljob
when "help" then usage
when "-h" then usage
when "park" then parkjobs
when "unpark" then unparkjobs
when "retry" then unfail
when "prio" then changeprio
when "process" then runprocessor
when "status", "st" then status
when "stop" then stop
when "stopall" then stopall
else usage
end