15 SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
18 client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
19 client.connect(SOCK_FILENAME)
24 returns 0 if no job was waiting
25 returns 1 if a job was already runing
26 returns 2 when a job has been processed
28 # remove jobs archived for more than 1 day
29 # TODO: delete kmz/csv files too
30 sqlexec(u"DELETE FROM job WHERE archive_time < now() - '1 day'::interval")
33 sqlexec(u'SELECT id, pid FROM job WHERE start_time IS NOT NULL AND finish_time IS NULL')
34 row = get_common_cursor().fetchone()
36 logging.debug('Job %s is running: pid=%s', row[0], row[1])
39 sqlexec(u'SELECT id, command, extension, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
40 row = get_common_cursor().fetchone()
42 logging.debug('Queue is empty.')
45 jobid, command, extension, user_id = row
46 logging.info('Starting job %s: %s', jobid, command)
48 sqlexec(u'UPDATE job SET start_time=now() WHERE id=%(jobid)s', {'jobid': jobid})
51 output = file('/var/lib/ais/jobs/'+unicode(jobid)+'.'+extension, 'wb')
52 p = subprocess.Popen(command, stdout=output, shell=True)
53 logging.debug('System process id = %s', p.pid)
54 sqlexec(u'UPDATE job SET pid=' + unicode(p.pid) + ' WHERE id=%(jobid)s', {'jobid': jobid})
58 sqlexec(u'UPDATE job SET pid=NULL, finish_time=now(), result=' + unicode(returncode) + ' WHERE id=%(jobid)s', {'jobid': jobid})
60 logging.info('Job complete: result=%s', returncode)
62 sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
69 from optparse import OptionParser
71 parser = OptionParser()
72 parser.add_option('-d', '--debug',
73 action='store_true', dest='debug', default=False,
75 parser.add_option('--debug-sql',
76 action='store_true', dest='debug_sql', default=False,
77 help='prints sql statements. Implies --debug')
78 parser.add_option('--daemon',
79 action='store_true', dest='daemonize', default=False,
80 help='runs as a daemon')
81 parser.add_option('--loop',
82 action='store_true', dest='loop', default=False,
83 help='keeps running forever')
84 options, args = parser.parse_args()
87 print >> sys.stderr, "That program doesn't take any argument"
95 loglevel = logging.DEBUG
97 loglevel = logging.INFO
98 logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
100 if options.daemonize:
102 from daemon import DaemonContext
104 from ais.daemon import DaemonContext
106 stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
107 #ais_gid = grp.getgrnam('ais').gr_gid
108 dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
109 # todo: pidfile= with import lockfile (squeeze)
113 if os.path.exists(SOCK_FILENAME):
114 os.remove(SOCK_FILENAME)
115 server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
116 server.bind(SOCK_FILENAME)
117 os.chmod(SOCK_FILENAME, 0777)
122 server.recv(1024) # blocks
124 logging.error('Allready running?')
130 if __name__ == '__main__':