#!/usr/bin/env ruby # $Id$ # $URL$ JOBSDIR = "jobs" PARKDIR = "park" PROCDIR = "processors" RESULTDIR = "results" FAILDIR = "failure" MONITORDIR = "monitor" RUNMODE = 0755 CONFIG = "config" @config = { "waittime" => 3, "maxproc" => 3 } COUNTERFILE="jobscounter" RUNNINGFILE="runningcounter" def usage puts < add >-->-------Add the given command as a job. addfile|af >------Add the given file as a job. createconfing>-->-------Write a default configuration file. run>---->------->-------Start the master scheduler process. del >---->-------Delete the given job from the queue. help|-h>>------->-------Show this help. park>--->------->-------Place all queued jobs in the parkinglot. unpark>->------->-------Place all jobs from the parkinglot back in the queue. retry >-->-------Reschedule a failed job. Destroys all output. prio >---Change the priority of the given job.>--[unimplemented] status|st>------>-------Show the status of the current jobs. stop >--->-------Stop the given job.>----[unimplemented] stopall>>------->-------Stop all running jobs.>-[unimplemented] ENDTXT exit end def readconfig File.open("#{@basedir}/#{CONFIG}").each_line {|line| line.sub!(/#.*/, "") line.sub!(/^\s*/, "") line.sub!(/\s*$/, "") if line.match(/^$/) next end line.match(/([^=]*?)\s*=\s*(.*)/) case $1 when "maxproc" then @config["maxproc"] = $2 when "waittime" then @config["waittime"] = $2 else puts "unknown option #{$1}" end } end def status readconfig # show which jobs are running puts "Running jobs (#{Dir.entries("#{@basedir}/#{PROCDIR}").size - 2}/#{@config["maxproc"]}):" # just call 'ls' instead of trying to figure out how to format it nicely for the screen system("ls" ,"#{@basedir}/#{PROCDIR}/") # show queues puts "Jobs queued (#{Dir.entries("#{@basedir}/#{JOBSDIR}").size - 2}):" system("ls", "#{@basedir}/#{JOBSDIR}/") puts "Jobs parked (#{Dir.entries("#{@basedir}/#{PARKDIR}").size - 2}):" puts "Jobs finished: #{Dir.entries("#{@basedir}/#{RESULTDIR}").size - 2}" puts "Jobs failed (#{Dir.entries("#{@basedir}/#{FAILDIR}").size - 2}):" system("ls", "#{@basedir}/#{FAILDIR}/") end def createdirs [ JOBSDIR, PARKDIR, PROCDIR, RESULTDIR, FAILDIR, MONITORDIR ].each{|dir| if ! FileTest.directory?("#{@basedir}/#{dir}") Dir.mkdir("#{@basedir}/#{dir}") end } end def increasejobscounter jobscounter = 0 if FileTest.exists?(COUNTERFILE) File.new(COUNTERFILE).flock(File::LOCK_EX) File.open(COUNTERFILE, "r+"){|file| jobscounter = file.read if jobscounter.empty? jobscounter = 0 end jobscounter = jobscounter.to_i jobscounter += 1 file.seek(0) file.puts jobscounter } File.new(COUNTERFILE, "r+").flock(File::LOCK_UN) else File.open(COUNTERFILE, "w"){|file| File.new(COUNTERFILE).flock(File::LOCK_EX) jobscounter = 1 file.puts jobscounter } File.new(COUNTERFILE, "r+").flock(File::LOCK_UN) end return jobscounter end def addjob jobnumber=sprintf("%06d", increasejobscounter) puts "New job: #{jobnumber}" if FileTest.exists?("#{@basedir}/#{JOBSDIR}/#{jobnumber}") puts "Job #{jobnumber} already scheduled. This should not happen." exit end File.open("#{@basedir}/#{JOBSDIR}/#{jobnumber}"){|file| file.puts ARGV.join(" ") } File.chmod(RUNMODE, "#{@basedir}/#{JOBSDIR}/#{jobnumber}") end def addjobfromfile filename = ARGV[0] if filename.nil? puts "Missing filename" usage else if ! FileTest.exists?(filename) puts "File #{filename} doesn't exist" usage end end jobnumber = sprintf("%06d", increasejobscounter) puts "New job: #{jobnumber}" File.copy(filename, "#{@basedir}/#{JOBSDIR}/#{jobnumber}") File.chmod(RUNMODE, "#{@basedir}/#{JOBSDIR}/#{jobnumber}") end def createconfig if ! FileTest.exists?("#{@basedir}/#{CONFIG}") File.new("#{@basedir}/#{CONFIG}", "w"){|file| file.puts <"${basedir}/$runningfile" # # while sleep $waittime; do # if [ $(cat "${basedir}/${runningfile}") -lt $maxproc ]; then # # via ls omdat anders de globbing klote is # jl=$(ls ${basedir}/jobs/) # if [ -n "$jl" ]; then # ${pbcommand} process & # fi # readconfig # fi # done end def deljob jobnumber = ARGV[0] if jobnumber.nil? puts "Missing jobnumber" usage else jobnumber = sprintf("%06d", jobnumber.to_i) if FileTest.exists?("#{@basedir}/#{JOBSDIR}/#{jobnumber}") File.unlink("#{@basedir}/#{JOBSDIR}/#{jobnumber}") else puts "Job '#{jobnumber}' not in queue" exit end end end def parkjobs Dir.entries("#{@basedir}/#{JOBSDIR}").each{|file| if file == '.' or file == '..' next end File.move("#{@basedir}/#{JOBSDIR}/#{file}", "#{@basedir}/#{PARKDIR}") } end def unparkjobs Dir.entries("#{@basedir}/#{PARKDIR}").each{|file| if file == '.' or file == '..' next end File.move("#{@basedir}/#{PARKDIR}/#{file}", "#{@basedir}/#{JOBSDIR}") } end def unfail jobnumber = ARGV[0] if jobnumber.nil? puts "Missing jobnumber" usage else jobnumber = sprintf("%06d", jobnumber.to_i) if FileTest.exists?("#{@basedir}/#{FAILDIR}/#{jobnumber}/job") File.move("#{@basedir}/#{FAILDIR}/#{jobnumber}/job", "#{@basedir}/#{JOBSDIR}/#{jobnumber}") # XXX tja, lekker makkelijk even geen recursie implementeren system("rm", "-rf", "#{@basedir}/#{FAILDIR}/#{jobnumber}") else puts "Job '#{jobnumber}' not in failed dir" exit end end end def changeprio puts "unimplemented" end def runprocessor # incrementrunning # sleep 1 # runjob=$(getjob) # if [ -n "${runjob}" ]; then # echo "Starting job: ${runjob}" # owd=$(pwd) # cd "${basedir}/processors/${runjob}" # if "${basedir}/processors/${runjob}/job" 2>&1 > "${basedir}/processors/${runjob}/output" ;then # finishjob ${runjob} # echo "Finished job: ${runjob}" # else # failjob ${runjob} # echo "Failed job: ${runjob}" # fi # cd $owd # fi # decrementrunning end def stop puts "unimplemented" end def stopall puts "unimplemented" end pbcommand = $0 command = ARGV.shift @basedir = Dir.pwd createdirs case command when "add" then addjob when "addfile", "af" then addjobfromfile when "createconfig" then createconfig when "run" then master when "del" then deljob when "help" then usage when "-h" then usage when "park" then parkjobs when "unpark" then unparkjobs when "retry" then unfail when "prio" then changeprio when "process" then runprocessor when "status", "st" then status when "stop" then stop when "stopall" then stopall else usage end