No longer storing the 'python -m ais.' prefix in job commands
[ais.git] / bin / jobrunner.py
1 #!/usr/bin/env python
2
3 __all__ = [ \
4     'wakeup_daemon',
5     'DAEMON_WAKEUP_ERROR',
6     ]
7
8 import sys
9 import os
10 import os.path
11 import time
12 import logging
13 import subprocess
14 import socket
15 from ais.db import *
16
17 SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
18
19 def wakeup_daemon():
20     client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
21     try:
22         client.connect(SOCK_FILENAME)
23         client.send('')
24         return True
25     except:
26         return False
27
28 DAEMON_WAKEUP_ERROR = """
29 Your job has been queued, but there was an error contacting the task
30 scheduler. Please repport the error.
31 """
32
33 def startup_clean():
34     sqlexec(u'SELECT id, pid, start_time, finish_time FROM job WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
35     while True:
36         row = get_common_cursor().fetchone()
37         if not row:
38             break
39         logging.error('Startup: Job %s is supposed to be running: pid=%s, start_time=%s, finish_time=%s', *row)
40         # TODO: kill pid ?
41     
42     sqlexec(u'UPDATE job SET pid=NULL, start_time=NULL WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
43     dbcommit()
44
45 def runjob():
46     """
47     returns True is a job was waiting and has been processed
48     """
49     # remove jobs archived for more than 1 day
50     # TODO: delete kmz/csv files too
51     sqlexec(u"DELETE FROM job WHERE archive_time < now() - '1 day'::interval")
52     dbcommit()
53
54     sqlexec(u'SELECT id, command, friendly_filename, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
55     row = get_common_cursor().fetchone()
56     if row is None:
57         logging.debug('Queue is empty.')
58         return False
59
60     jobid, command, friendly_filename, user_id = row
61     command = 'python -m ais.' + command
62     logging.info('Starting job %s: %s', jobid, command)
63
64     sqlexec(u'UPDATE job SET start_time=now() WHERE id=%(jobid)s', {'jobid': jobid})
65     dbcommit()
66
67     extension = os.path.splitext(friendly_filename)[-1]
68     output = file('/var/lib/ais/jobs/'+unicode(jobid)+extension, 'wb')
69     p = subprocess.Popen(command, stdout=output, shell=True)
70     logging.debug('System process id = %s', p.pid)
71     sqlexec(u'UPDATE job SET pid=' + unicode(p.pid) + ' WHERE id=%(jobid)s', {'jobid': jobid})
72     dbcommit()
73     
74     returncode = p.wait()
75     sqlexec(u'UPDATE job SET pid=NULL, finish_time=now(), result=' + unicode(returncode) + ' WHERE id=%(jobid)s', {'jobid': jobid})
76     dbcommit()
77     logging.info('Job complete: result=%s', returncode)
78
79     sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
80     dbcommit()
81
82     return True
83
84
85 def main():
86     from optparse import OptionParser
87
88     parser = OptionParser()
89     parser.add_option('-d', '--debug',
90         action='store_true', dest='debug', default=False,
91         help='debug mode')
92     parser.add_option('--debug-sql',
93         action='store_true', dest='debug_sql', default=False,
94         help='prints sql statements. Implies --debug')
95     parser.add_option('--daemon',
96         action='store_true', dest='daemonize', default=False,
97         help='runs as a daemon')
98     options, args = parser.parse_args()
99
100     if args:
101         print >> sys.stderr, "That program doesn't take any argument"
102         sys.exit(1)
103
104     if options.debug_sql:
105         sql_setdebug(True)
106         options.debug = True
107
108     if options.debug:
109         loglevel = logging.DEBUG
110     else:
111         loglevel = logging.INFO
112     logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
113
114     if options.daemonize:
115         try:
116             from daemon import DaemonContext
117         except ImportError:
118             from ais.daemon import DaemonContext
119
120         stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
121         #ais_gid = grp.getgrnam('ais').gr_gid
122         dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
123         # todo: pidfile= with import lockfile (squeeze)
124         dctx.open()
125
126     if os.path.exists(SOCK_FILENAME):
127         os.remove(SOCK_FILENAME)
128     server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
129     server.bind(SOCK_FILENAME)
130     os.chmod(SOCK_FILENAME, 0777) # anyone can wake up the daemon
131     
132     #TODO: don't run startup_clean if another daemon is runing, that is using pid file
133     startup_clean()
134     while True:
135         if not runjob():
136             server.recv(1024) # blocks
137
138 if __name__ == '__main__':
139     main()