352 lines
8.8 KiB
Ruby
Executable file
352 lines
8.8 KiB
Ruby
Executable file
#!/usr/bin/env ruby
|
|
|
|
# $Id$
|
|
# $URL$
|
|
|
|
require 'fileutils'
|
|
require 'thread'
|
|
require 'open3'
|
|
|
|
JOBSDIR = "jobs"
|
|
PARKDIR = "park"
|
|
PROCDIR = "processors"
|
|
RESULTDIR = "results"
|
|
FAILDIR = "failure"
|
|
MONITORDIR = "monitor"
|
|
RUNMODE = 0755
|
|
CONFIG = "config"
|
|
@config = { "waittime" => 3, "maxproc" => 3 }
|
|
@processor_threads = []
|
|
@jobclaimlock = Mutex.new
|
|
|
|
COUNTERFILE="jobscounter"
|
|
RUNNINGFILE="runningcounter"
|
|
|
|
def usage
|
|
puts <<ENDTXT
|
|
Usage: pb <option>
|
|
|
|
add <command> Add the given command as a job.
|
|
addfile|af <file> Add the given file as a job.
|
|
createconfing Write a default configuration file.
|
|
run Start the master scheduler process.
|
|
del <jobnr> Delete the given job from the queue.
|
|
help|-h Show this help.
|
|
park Place all queued jobs in the parkinglot.
|
|
unpark Place all jobs from the parkinglot back in the queue.
|
|
retry <jobnr> Reschedule a failed job. Destroys all output.
|
|
prio <h|n|l> <jobnr> Change the priority of the given job. [unimplemented]
|
|
status|st Show the status of the current jobs.
|
|
stop <jobnr> Stop the given job. [unimplemented]
|
|
stopall Stop all running jobs. [unimplemented]
|
|
ENDTXT
|
|
exit
|
|
end
|
|
|
|
def readconfig
|
|
if FileTest.exists?("#{@basedir}/#{CONFIG}")
|
|
File.open("#{@basedir}/#{CONFIG}").each_line {|line|
|
|
line.sub!(/#.*/, "")
|
|
line.sub!(/^\s*/, "")
|
|
line.sub!(/\s*$/, "")
|
|
if line.match(/^$/)
|
|
next
|
|
end
|
|
line.match(/([^=]*?)\s*=\s*(.*)/)
|
|
case $1
|
|
when "maxproc" then @config["maxproc"] = $2
|
|
when "waittime" then @config["waittime"] = $2
|
|
else puts "unknown option #{$1}"
|
|
end
|
|
}
|
|
end
|
|
end
|
|
|
|
def status
|
|
readconfig
|
|
# show which jobs are running
|
|
puts "Running jobs (#{Dir.entries("#{@basedir}/#{PROCDIR}").size - 2}/#{@config["maxproc"]}):"
|
|
# just call 'ls' instead of trying to figure out how to format it nicely for the screen
|
|
system("ls" ,"#{@basedir}/#{PROCDIR}/")
|
|
# show queues
|
|
puts "Jobs queued (#{Dir.entries("#{@basedir}/#{JOBSDIR}").size - 2}):"
|
|
system("ls", "#{@basedir}/#{JOBSDIR}/")
|
|
puts "Jobs parked (#{Dir.entries("#{@basedir}/#{PARKDIR}").size - 2}):"
|
|
puts "Jobs finished: #{Dir.entries("#{@basedir}/#{RESULTDIR}").size - 2}"
|
|
puts "Jobs failed (#{Dir.entries("#{@basedir}/#{FAILDIR}").size - 2}):"
|
|
system("ls", "#{@basedir}/#{FAILDIR}/")
|
|
end
|
|
|
|
def createdirs
|
|
[ JOBSDIR, PARKDIR, PROCDIR, RESULTDIR, FAILDIR, MONITORDIR ].each{|dir|
|
|
if ! FileTest.directory?("#{@basedir}/#{dir}")
|
|
Dir.mkdir("#{@basedir}/#{dir}")
|
|
end
|
|
}
|
|
end
|
|
|
|
def increasejobscounter
|
|
jobscounter = 0
|
|
if FileTest.exists?(COUNTERFILE)
|
|
File.new(COUNTERFILE).flock(File::LOCK_EX)
|
|
File.open(COUNTERFILE, "r+"){|file|
|
|
jobscounter = file.read
|
|
if jobscounter.empty?
|
|
jobscounter = 0
|
|
end
|
|
jobscounter = jobscounter.to_i
|
|
jobscounter += 1
|
|
file.seek(0)
|
|
file.puts jobscounter
|
|
}
|
|
File.new(COUNTERFILE).flock(File::LOCK_UN)
|
|
else
|
|
# XXX wellicht eerst een touch ofzo doen en dan locken voor de schrijf loop
|
|
# geopend wordt
|
|
FileUtils.touch(COUNTERFILE)
|
|
File.new(COUNTERFILE).flock(File::LOCK_EX)
|
|
File.open(COUNTERFILE, "w"){|file|
|
|
jobscounter = 1
|
|
file.puts jobscounter
|
|
}
|
|
File.new(COUNTERFILE).flock(File::LOCK_UN)
|
|
end
|
|
return jobscounter
|
|
end
|
|
|
|
def addjob
|
|
jobnumber=sprintf("%06d", increasejobscounter)
|
|
puts "New job: #{jobnumber}"
|
|
if FileTest.exists?("#{@basedir}/#{JOBSDIR}/#{jobnumber}")
|
|
puts "Job #{jobnumber} already scheduled. This should not happen."
|
|
exit
|
|
end
|
|
File.open("#{@basedir}/#{JOBSDIR}/#{jobnumber}", "w"){|file|
|
|
file.puts ARGV.join(" ")
|
|
}
|
|
File.chmod(RUNMODE, "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
|
|
end
|
|
|
|
def addjobfromfile
|
|
filename = ARGV[0]
|
|
if filename.nil?
|
|
puts "Missing filename"
|
|
usage
|
|
else
|
|
if ! FileTest.exists?(filename)
|
|
puts "File #{filename} doesn't exist"
|
|
usage
|
|
end
|
|
end
|
|
jobnumber = sprintf("%06d", increasejobscounter)
|
|
puts "New job: #{jobnumber}"
|
|
FileUtils.cp(filename, "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
|
|
File.chmod(RUNMODE, "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
|
|
end
|
|
|
|
def createconfig
|
|
if ! FileTest.exists?("#{@basedir}/#{CONFIG}")
|
|
File.open("#{@basedir}/#{CONFIG}", "w"){|file|
|
|
file.puts <<EOT
|
|
# how many parallel processes?
|
|
maxproc=3
|
|
# take a break for how long between process end and starting a new one
|
|
waittime=3
|
|
EOT
|
|
}
|
|
end
|
|
end
|
|
|
|
def deljob
|
|
jobnumber = ARGV[0]
|
|
if jobnumber.nil?
|
|
puts "Missing jobnumber"
|
|
usage
|
|
else
|
|
jobnumber = sprintf("%06d", jobnumber.to_i)
|
|
if FileTest.exists?("#{@basedir}/#{JOBSDIR}/#{jobnumber}")
|
|
File.unlink("#{@basedir}/#{JOBSDIR}/#{jobnumber}")
|
|
else
|
|
puts "Job '#{jobnumber}' not in queue"
|
|
exit
|
|
end
|
|
end
|
|
end
|
|
|
|
def parkjobs
|
|
Dir.entries("#{@basedir}/#{JOBSDIR}").each{|file|
|
|
if file == '.' or file == '..'
|
|
next
|
|
end
|
|
FileUtils.mv("#{@basedir}/#{JOBSDIR}/#{file}", "#{@basedir}/#{PARKDIR}")
|
|
}
|
|
end
|
|
|
|
def unparkjobs
|
|
Dir.entries("#{@basedir}/#{PARKDIR}").each{|file|
|
|
if file == '.' or file == '..'
|
|
next
|
|
end
|
|
FileUtils.mv("#{@basedir}/#{PARKDIR}/#{file}", "#{@basedir}/#{JOBSDIR}")
|
|
}
|
|
end
|
|
|
|
def unfail
|
|
jobnumber = ARGV[0]
|
|
if jobnumber.nil?
|
|
puts "Missing jobnumber"
|
|
usage
|
|
else
|
|
jobnumber = sprintf("%06d", jobnumber.to_i)
|
|
if FileTest.exists?("#{@basedir}/#{FAILDIR}/#{jobnumber}/job")
|
|
FileUtils.mv("#{@basedir}/#{FAILDIR}/#{jobnumber}/job", "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
|
|
# XXX tja, lekker makkelijk even geen recursie implementeren
|
|
system("rm", "-rf", "#{@basedir}/#{FAILDIR}/#{jobnumber}")
|
|
else
|
|
puts "Job '#{jobnumber}' not in failed dir"
|
|
exit
|
|
end
|
|
end
|
|
end
|
|
|
|
def changeprio
|
|
puts "unimplemented"
|
|
end
|
|
|
|
def getjob
|
|
Dir.entries("#{@basedir}/#{JOBSDIR}").each{|file|
|
|
if file == '.' or file == '..'
|
|
next
|
|
else
|
|
if FileTest.file?("#{@basedir}/#{JOBSDIR}/#{file}")
|
|
Dir.mkdir("#{@basedir}/#{PROCDIR}/#{file}")
|
|
FileUtils.mv("#{@basedir}/#{JOBSDIR}/#{file}", "#{@basedir}/#{PROCDIR}/#{file}/job")
|
|
return file
|
|
end
|
|
end
|
|
}
|
|
return nil
|
|
end
|
|
|
|
def failjob(jobnumber)
|
|
FileUtils.mv("#{@basedir}/#{PROCDIR}/#{jobnumber}", "#{@basedir}/#{FAILDIR}")
|
|
end
|
|
|
|
def finishjob(jobnumber)
|
|
FileUtils.mv("#{@basedir}/#{PROCDIR}/#{jobnumber}", "#{@basedir}/#{RESULTDIR}")
|
|
end
|
|
|
|
def runprocessor
|
|
@processor_threads << Thread.new do
|
|
begin
|
|
# gebruik een mutex voor het claimen van een job!
|
|
runjob = nil
|
|
@jobclaimlock.synchronize {
|
|
runjob = getjob
|
|
}
|
|
|
|
if ! runjob.nil?
|
|
starttime = Time.now
|
|
puts "Starting job: #{runjob}"
|
|
owd = Dir.pwd
|
|
# puts "#{@basedir}/#{PROCDIR}/#{runjob}"
|
|
Dir.chdir("#{@basedir}/#{PROCDIR}/#{runjob}")
|
|
begin
|
|
output = File.open("#{@basedir}/#{PROCDIR}/#{runjob}/output", "w")
|
|
Open3.popen3("#{@basedir}/#{PROCDIR}/#{runjob}/job", "r+"){ |stdin, stdout, stderr|
|
|
# dit moet netter kunnen, nu wordt de volgorde van de diverse outputs veranderd
|
|
while !stdout.eof and ! stderr.eof do
|
|
#output.print "XXX ERR #{stderr.gets}"
|
|
#output.print "XXX OUT #{stdout.gets}"
|
|
output.print stderr.gets
|
|
output.print stdout.gets
|
|
end
|
|
}
|
|
output.close
|
|
finishjob runjob
|
|
puts "Finished job: #{runjob} [runtime: #{Time.now-starttime}]"
|
|
rescue
|
|
failjob runjob
|
|
puts "Failed job: #{runjob} [runtime: #{Time.now-starttime}]"
|
|
puts $!
|
|
end
|
|
Dir.chdir(owd)
|
|
end
|
|
rescue
|
|
puts "failure in thread"
|
|
puts $!
|
|
end
|
|
end
|
|
end
|
|
|
|
def stop
|
|
puts "unimplemented"
|
|
end
|
|
|
|
def stopall
|
|
puts "unimplemented"
|
|
end
|
|
|
|
def master
|
|
if Process.euid != 0
|
|
puts "If your jobs need root privileges you want to run this as root"
|
|
end
|
|
|
|
while true
|
|
sleep @config["waittime"]
|
|
# puts "Number of threads: #{@processor_threads.length}"
|
|
# dode threads moeten opgeruimd worden
|
|
(0...@processor_threads.length).to_a.reverse.each{|thr|
|
|
# puts "thr: #{thr}"
|
|
# puts @processor_threads[thr].status
|
|
if @processor_threads[thr].status == false
|
|
# puts "deleting thread #{thr}"
|
|
@processor_threads.delete_at(thr)
|
|
end
|
|
}
|
|
if @processor_threads.length < @config["maxproc"]
|
|
# nog testen op aanwezig zijn van nieuwe jobs in queue?
|
|
runprocessor
|
|
end
|
|
readconfig
|
|
end
|
|
|
|
# while sleep $waittime; do
|
|
# if [ $(cat "${basedir}/${runningfile}") -lt $maxproc ]; then
|
|
# # via ls omdat anders de globbing klote is
|
|
# jl=$(ls ${basedir}/jobs/)
|
|
# if [ -n "$jl" ]; then
|
|
# ${pbcommand} process &
|
|
# fi
|
|
# readconfig
|
|
# fi
|
|
# done
|
|
end
|
|
|
|
# XXX SIGHUP handler die config opnieuw inleest
|
|
|
|
@pbcommand = $0
|
|
|
|
command = ARGV.shift
|
|
@basedir = Dir.pwd
|
|
|
|
createdirs
|
|
|
|
case command
|
|
when "add" then addjob
|
|
when "addfile", "af" then addjobfromfile
|
|
when "createconfig" then createconfig
|
|
when "run" then master
|
|
when "del" then deljob
|
|
when "help" then usage
|
|
when "-h" then usage
|
|
when "park" then parkjobs
|
|
when "unpark" then unparkjobs
|
|
when "retry" then unfail
|
|
when "prio" then changeprio
|
|
when "process" then runprocessor
|
|
when "status", "st" then status
|
|
when "stop" then stop
|
|
when "stopall" then stopall
|
|
else usage
|
|
end
|