Added a proper job_detail page that display information, and reloads itself.
[ais.git] / bin / jobrunner.py
1 #!/usr/bin/env python
2
3 __all__ = [ \
4     'wakeup_daemon',
5     'DAEMON_WAKEUP_ERROR',
6     ]
7
8 import sys
9 import os
10 import time
11 import logging
12 import subprocess
13 import socket
14 from ais.db import *
15
16 SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
17
18 def wakeup_daemon():
19     client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
20     try:
21         client.connect(SOCK_FILENAME)
22         client.send('')
23         return True
24     except:
25         return False
26
27 DAEMON_WAKEUP_ERROR = """
28 Your job has been queued, but there was an error contacting the task
29 scheduler. Please repport the error.
30 """
31
32 def runjob():
33     """
34     returns 0 if no job was waiting
35     returns 1 if a job was already runing
36     returns 2 when a job has been processed
37     """
38     # remove jobs archived for more than 1 day
39     # TODO: delete kmz/csv files too
40     sqlexec(u"DELETE FROM job WHERE archive_time < now() - '1 day'::interval")
41     dbcommit()
42
43     sqlexec(u'SELECT id, pid FROM job WHERE start_time IS NOT NULL AND finish_time IS NULL')
44     row = get_common_cursor().fetchone()
45     if row is not None:
46         logging.debug('Job %s is running: pid=%s', row[0], row[1])
47         return 1
48
49     sqlexec(u'SELECT id, command, extension, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
50     row = get_common_cursor().fetchone()
51     if row is None:
52         logging.debug('Queue is empty.')
53         return 0
54
55     jobid, command, extension, user_id = row
56     logging.info('Starting job %s: %s', jobid, command)
57
58     sqlexec(u'UPDATE job SET start_time=now() WHERE id=%(jobid)s', {'jobid': jobid})
59     dbcommit()
60
61     output = file('/var/lib/ais/jobs/'+unicode(jobid)+'.'+extension, 'wb')
62     p = subprocess.Popen(command, stdout=output, shell=True)
63     logging.debug('System process id = %s', p.pid)
64     sqlexec(u'UPDATE job SET pid=' + unicode(p.pid) + ' WHERE id=%(jobid)s', {'jobid': jobid})
65     dbcommit()
66     
67     returncode = p.wait()
68     sqlexec(u'UPDATE job SET pid=NULL, finish_time=now(), result=' + unicode(returncode) + ' WHERE id=%(jobid)s', {'jobid': jobid})
69     dbcommit()
70     logging.info('Job complete: result=%s', returncode)
71
72     sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
73     dbcommit()
74
75     return 2
76
77
78 def main():
79     from optparse import OptionParser
80
81     parser = OptionParser()
82     parser.add_option('-d', '--debug',
83         action='store_true', dest='debug', default=False,
84         help='debug mode')
85     parser.add_option('--debug-sql',
86         action='store_true', dest='debug_sql', default=False,
87         help='prints sql statements. Implies --debug')
88     parser.add_option('--daemon',
89         action='store_true', dest='daemonize', default=False,
90         help='runs as a daemon')
91     parser.add_option('--loop',
92         action='store_true', dest='loop', default=False,
93         help='keeps running forever')
94     options, args = parser.parse_args()
95
96     if args:
97         print >> sys.stderr, "That program doesn't take any argument"
98         sys.exit(1)
99
100     if options.debug_sql:
101         sql_setdebug(True)
102         options.debug = True
103
104     if options.debug:
105         loglevel = logging.DEBUG
106     else:
107         loglevel = logging.INFO
108     logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
109
110     if options.daemonize:
111         try:
112             from daemon import DaemonContext
113         except ImportError:
114             from ais.daemon import DaemonContext
115
116         stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
117         #ais_gid = grp.getgrnam('ais').gr_gid
118         dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
119         # todo: pidfile= with import lockfile (squeeze)
120         dctx.open()
121
122     if options.loop:
123         if os.path.exists(SOCK_FILENAME):
124             os.remove(SOCK_FILENAME)
125         server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
126         server.bind(SOCK_FILENAME)
127         os.chmod(SOCK_FILENAME, 0777)
128
129         while True:
130             r = runjob()
131             if r == 0:
132                 server.recv(1024) # blocks
133             elif r == 1:
134                 logging.error('Allready running?')
135                 sys.exit(1)
136             # else loop now
137     else:
138         runjob()
139
140 if __name__ == '__main__':
141     main()