publicscripts/pbatch/pb.rb

346 lines
8.8 KiB
Ruby
Raw Normal View History

2009-01-29 14:32:09 +00:00
#!/usr/bin/env ruby
# $Id$
# $URL$
2009-02-06 09:18:55 +00:00
require 'fileutils'
2009-01-29 14:32:09 +00:00
JOBSDIR = "jobs"
PARKDIR = "park"
PROCDIR = "processors"
RESULTDIR = "results"
FAILDIR = "failure"
MONITORDIR = "monitor"
2009-01-30 10:58:31 +00:00
RUNMODE = 0755
2009-01-29 14:32:09 +00:00
CONFIG = "config"
@config = { "waittime" => 3, "maxproc" => 3 }
COUNTERFILE="jobscounter"
RUNNINGFILE="runningcounter"
def usage
puts <<ENDTXT
Usage: pb <option>
add <command>>-->-------Add the given command as a job.
addfile|af <file>>------Add the given file as a job.
createconfing>-->-------Write a default configuration file.
run>---->------->-------Start the master scheduler process.
del <jobnr>>---->-------Delete the given job from the queue.
help|-h>>------->-------Show this help.
park>--->------->-------Place all queued jobs in the parkinglot.
unpark>->------->-------Place all jobs from the parkinglot back in the queue.
retry <jobnr>>-->-------Reschedule a failed job. Destroys all output.
prio <h|n|l> <jobnr>>---Change the priority of the given job.>--[unimplemented]
status|st>------>-------Show the status of the current jobs.
stop <jobnr>>--->-------Stop the given job.>----[unimplemented]
stopall>>------->-------Stop all running jobs.>-[unimplemented]
ENDTXT
exit
end
def readconfig
2009-02-06 08:50:41 +00:00
if FileTest.exists?("#{@basedir}/#{CONFIG}")
File.open("#{@basedir}/#{CONFIG}").each_line {|line|
line.sub!(/#.*/, "")
line.sub!(/^\s*/, "")
line.sub!(/\s*$/, "")
if line.match(/^$/)
next
end
line.match(/([^=]*?)\s*=\s*(.*)/)
case $1
when "maxproc" then @config["maxproc"] = $2
when "waittime" then @config["waittime"] = $2
else puts "unknown option #{$1}"
end
}
end
2009-01-29 14:32:09 +00:00
end
def status
readconfig
# show which jobs are running
puts "Running jobs (#{Dir.entries("#{@basedir}/#{PROCDIR}").size - 2}/#{@config["maxproc"]}):"
# just call 'ls' instead of trying to figure out how to format it nicely for the screen
system("ls" ,"#{@basedir}/#{PROCDIR}/")
# show queues
puts "Jobs queued (#{Dir.entries("#{@basedir}/#{JOBSDIR}").size - 2}):"
system("ls", "#{@basedir}/#{JOBSDIR}/")
puts "Jobs parked (#{Dir.entries("#{@basedir}/#{PARKDIR}").size - 2}):"
puts "Jobs finished: #{Dir.entries("#{@basedir}/#{RESULTDIR}").size - 2}"
puts "Jobs failed (#{Dir.entries("#{@basedir}/#{FAILDIR}").size - 2}):"
system("ls", "#{@basedir}/#{FAILDIR}/")
end
def createdirs
[ JOBSDIR, PARKDIR, PROCDIR, RESULTDIR, FAILDIR, MONITORDIR ].each{|dir|
if ! FileTest.directory?("#{@basedir}/#{dir}")
Dir.mkdir("#{@basedir}/#{dir}")
end
}
end
def increasejobscounter
jobscounter = 0
2009-01-30 09:08:44 +00:00
if FileTest.exists?(COUNTERFILE)
File.new(COUNTERFILE).flock(File::LOCK_EX)
File.open(COUNTERFILE, "r+"){|file|
jobscounter = file.read
if jobscounter.empty?
jobscounter = 0
end
jobscounter = jobscounter.to_i
2009-01-29 14:32:09 +00:00
jobscounter += 1
file.seek(0)
file.puts jobscounter
}
2009-02-11 09:20:35 +00:00
File.new(COUNTERFILE).flock(File::LOCK_UN)
2009-01-29 14:32:09 +00:00
else
2009-02-11 09:20:35 +00:00
# XXX wellicht eerst een touch ofzo doen en dan locken voor de schrijf loop
# geopend wordt
FileUtils.touch(COUNTERFILE)
File.new(COUNTERFILE).flock(File::LOCK_EX)
2009-01-30 09:08:44 +00:00
File.open(COUNTERFILE, "w"){|file|
2009-01-29 14:32:09 +00:00
jobscounter = 1
file.puts jobscounter
}
2009-02-11 09:20:35 +00:00
File.new(COUNTERFILE).flock(File::LOCK_UN)
2009-01-29 14:32:09 +00:00
end
return jobscounter
end
def addjob
2009-01-30 09:08:44 +00:00
jobnumber=sprintf("%06d", increasejobscounter)
2009-01-30 10:58:31 +00:00
puts "New job: #{jobnumber}"
if FileTest.exists?("#{@basedir}/#{JOBSDIR}/#{jobnumber}")
puts "Job #{jobnumber} already scheduled. This should not happen."
exit
end
2009-02-06 08:50:41 +00:00
File.open("#{@basedir}/#{JOBSDIR}/#{jobnumber}", "w"){|file|
2009-01-30 10:58:31 +00:00
file.puts ARGV.join(" ")
}
File.chmod(RUNMODE, "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
2009-01-29 14:32:09 +00:00
end
def addjobfromfile
2009-01-30 12:10:53 +00:00
filename = ARGV[0]
if filename.nil?
puts "Missing filename"
usage
else
if ! FileTest.exists?(filename)
puts "File #{filename} doesn't exist"
usage
end
end
2009-01-30 12:51:15 +00:00
jobnumber = sprintf("%06d", increasejobscounter)
2009-01-30 12:10:53 +00:00
puts "New job: #{jobnumber}"
File.copy(filename, "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
File.chmod(RUNMODE, "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
2009-01-29 14:32:09 +00:00
end
def createconfig
if ! FileTest.exists?("#{@basedir}/#{CONFIG}")
2009-02-06 08:50:41 +00:00
File.open("#{@basedir}/#{CONFIG}", "w"){|file|
2009-01-29 14:32:09 +00:00
file.puts <<EOT
# how many parallel processes?
maxproc=3
# take a break for how long between process end and starting a new one
waittime=3
EOT
}
end
end
def deljob
2009-01-30 12:51:15 +00:00
jobnumber = ARGV[0]
if jobnumber.nil?
puts "Missing jobnumber"
usage
else
jobnumber = sprintf("%06d", jobnumber.to_i)
if FileTest.exists?("#{@basedir}/#{JOBSDIR}/#{jobnumber}")
File.unlink("#{@basedir}/#{JOBSDIR}/#{jobnumber}")
else
puts "Job '#{jobnumber}' not in queue"
exit
end
end
2009-01-29 14:32:09 +00:00
end
def parkjobs
2009-01-30 12:51:15 +00:00
Dir.entries("#{@basedir}/#{JOBSDIR}").each{|file|
if file == '.' or file == '..'
next
end
2009-02-06 09:18:55 +00:00
FileUtils.mv("#{@basedir}/#{JOBSDIR}/#{file}", "#{@basedir}/#{PARKDIR}")
2009-01-30 12:51:15 +00:00
}
2009-01-29 14:32:09 +00:00
end
def unparkjobs
2009-01-30 12:51:15 +00:00
Dir.entries("#{@basedir}/#{PARKDIR}").each{|file|
if file == '.' or file == '..'
next
end
2009-02-06 09:18:55 +00:00
FileUtils.mv("#{@basedir}/#{PARKDIR}/#{file}", "#{@basedir}/#{JOBSDIR}")
2009-01-30 12:51:15 +00:00
}
2009-01-29 14:32:09 +00:00
end
def unfail
2009-01-30 12:51:15 +00:00
jobnumber = ARGV[0]
if jobnumber.nil?
puts "Missing jobnumber"
usage
else
jobnumber = sprintf("%06d", jobnumber.to_i)
if FileTest.exists?("#{@basedir}/#{FAILDIR}/#{jobnumber}/job")
2009-02-06 09:18:55 +00:00
FileUtils.mv("#{@basedir}/#{FAILDIR}/#{jobnumber}/job", "#{@basedir}/#{JOBSDIR}/#{jobnumber}")
2009-01-30 12:51:15 +00:00
# XXX tja, lekker makkelijk even geen recursie implementeren
system("rm", "-rf", "#{@basedir}/#{FAILDIR}/#{jobnumber}")
else
puts "Job '#{jobnumber}' not in failed dir"
exit
end
end
2009-01-29 14:32:09 +00:00
end
def changeprio
2009-01-30 14:59:04 +00:00
puts "unimplemented"
2009-01-29 14:32:09 +00:00
end
2009-02-11 09:20:35 +00:00
def incrementrunning
# als ik threading ga gebruiken is dit niet meer nodig (want geen runningfile nodig)
File.new(RUNNINGFILE).flock(File::LOCK_EX)
File.open(RUNNINGFILE, "r+"){|file|
running = file.read
running = running.to_i + 1
file.seek(0)
file.trunc
file.puts running
}
File.new(RUNNINGFILE).flock(File::LOCK_UN)
end
def decrementrunning
# als ik threading ga gebruiken is dit niet meer nodig (want geen runningfile nodig)
File.new(RUNNINGFILE).flock(File::LOCK_EX)
File.open(RUNNINGFILE, "r+"){|file|
running = file.read
running = running.to_i - 1
if running < 0
running = 0
end
file.seek(0)
file.trunc
file.puts running
}
File.new(RUNNINGFILE).flock(File::LOCK_UN)
end
def getjob
Dir.entries("#{@basedir}/#{JOBSDIR}").each{|file|
if file == '.' or file == '..'
next
else
if FileTest.file?("#{@basedir}/#{JOBSDIR}/#{file}")
Dir.mkdir("#{@basedir}/#{PROCDIR}/#{file}")
FileUtils.mv("#{@basedir}/#{JOBSDIR}/#{file}", "#{@basedir}/#{PROCDIR}/#{file}/job")
return file
end
end
}
end
def failjob(jobnumber)
FileUtils.mv("#{@basedir}/#{PROCDIR}/#{jobnumber}", "#{@basedir}/#{FAILDIR}")
end
def finishjob(jobnumber)
FileUtils.mv("#{@basedir}/#{PROCDIR}/#{jobnumber}", "#{@basedir}/#{RESULTDIR}")
end
2009-01-29 14:32:09 +00:00
def runprocessor
2009-02-11 09:20:35 +00:00
incrementrunning
sleep 1 # XXX niet nodig als locking echt werkt (of bij threading)
runjob = getjob
if ! runjob.nil?
puts "Starting job: #{runjob}"
owd = Dir.pwd
Dir.cd("#{@basedir}/#{PROCDIR}/#{runjob}")
# XXX IPC nog even uitzoeken... popen3 ?
2009-01-30 14:59:04 +00:00
# if "${basedir}/processors/${runjob}/job" 2>&1 > "${basedir}/processors/${runjob}/output" ;then
# finishjob ${runjob}
2009-02-11 09:20:35 +00:00
puts "Finished job: #{runjob}"
2009-01-30 14:59:04 +00:00
# else
# failjob ${runjob}
2009-02-11 09:20:35 +00:00
puts "Failed job: #{runjob}"
2009-01-30 14:59:04 +00:00
# fi
2009-02-11 09:20:35 +00:00
Dir.cd(owd)
end
decrementrunning
2009-01-29 14:32:09 +00:00
end
def stop
2009-01-30 14:59:04 +00:00
puts "unimplemented"
2009-01-29 14:32:09 +00:00
end
def stopall
2009-01-30 14:59:04 +00:00
puts "unimplemented"
2009-01-29 14:32:09 +00:00
end
2009-02-11 09:20:35 +00:00
def master
if Process.euid != 0
puts "If your jobs need root priviledges (XXX spelling) you want to run this as root"
end
# als ik threading ga gebruiken is dit niet meer nodig (want geen runningfile nodig)
if ! FileTest.file?(RUNNINGFILE)
FileUtils.touch(RUNNINGFILE)
end
File.new(RUNNINGFILE).flock(File::LOCK_EX)
File.open(RUNNINGFILE, "w"){|file|
file.puts "0"
}
File.new(RUNNINGFILE).flock(File::LOCK_UN)
# while sleep $waittime; do
# if [ $(cat "${basedir}/${runningfile}") -lt $maxproc ]; then
# # via ls omdat anders de globbing klote is
# jl=$(ls ${basedir}/jobs/)
# if [ -n "$jl" ]; then
# ${pbcommand} process &
# fi
# readconfig
# fi
# done
end
# XXX SIGHUP handler die config opnieuw inleest
@pbcommand = $0
2009-01-29 14:32:09 +00:00
command = ARGV.shift
@basedir = Dir.pwd
createdirs
case command
when "add" then addjob
2009-01-30 12:10:53 +00:00
when "addfile", "af" then addjobfromfile
2009-01-29 14:32:09 +00:00
when "createconfig" then createconfig
when "run" then master
2009-01-30 14:59:04 +00:00
when "del" then deljob
2009-01-29 14:32:09 +00:00
when "help" then usage
when "-h" then usage
when "park" then parkjobs
when "unpark" then unparkjobs
2009-01-30 14:59:04 +00:00
when "retry" then unfail
when "prio" then changeprio
when "process" then runprocessor
2009-01-29 14:32:09 +00:00
when "status", "st" then status
2009-01-30 14:59:04 +00:00
when "stop" then stop
2009-01-29 14:32:09 +00:00
when "stopall" then stopall
else usage
end