Fixed job detail crash when running job is killed
[ais.git] / bin / jobrunner.py
1 #!/usr/bin/env python
2
3 from __future__ import division
4 import sys
5 import os
6 import os.path
7 import time
8 import logging
9 import subprocess
10 import socket
11 from ais.db import *
12
13 __all__ = [ \
14     'wakeup_daemon',
15     'DAEMON_WAKEUP_ERROR',
16     ]
17
18 SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
19 RESULT_DIR = '/var/lib/ais/jobs/'
20 ARCHIVE_EXPIRE = '1 day' # postgres format
21
22 def wakeup_daemon():
23     client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
24     try:
25         client.connect(SOCK_FILENAME)
26         client.send('')
27         return True
28     except:
29         return False
30
31 DAEMON_WAKEUP_ERROR = """
32 Your job has been queued, but there was an error contacting the task
33 scheduler. Please repport the error.
34 """
35
36 def jobid_ext_to_filename(jobid, friendly_filename):
37     extension = os.path.splitext(friendly_filename)[-1]
38     return RESULT_DIR + jobid + extension
39     
40 def startup_clean():
41     sqlexec(u'SELECT id, pid, start_time, finish_time FROM job WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
42     while True:
43         row = get_common_cursor().fetchone()
44         if not row:
45             break
46         logging.error('Startup: Job %s is supposed to be running: pid=%s, start_time=%s, finish_time=%s', *row)
47         # TODO: kill pid ?
48     
49     sqlexec(u'UPDATE job SET pid=NULL, start_time=NULL WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
50     dbcommit()
51
52
53 def runjob():
54     """
55     returns True is a job was waiting and has been processed
56     """
57     # remove jobs archived for more than 1 day
58     deleted_jobs = []
59     sqlexec(u"SELECT id, friendly_filename FROM job WHERE archive_time < now() - '%s'::interval" % ARCHIVE_EXPIRE)
60     while True:
61         row = get_common_cursor().fetchone()
62         if row is None:
63             break
64         filename = jobid_ext_to_filename(*row)
65         try:
66             os.unlink(filename)
67         except OSError, err:
68             logging.error('unlinking %s: %s', filename, err)
69         deleted_jobs.append(row[0])
70
71     for jobid in deleted_jobs:
72         sqlexec(u'DELETE FROM job WHERE id=%(id)s', {'id':jobid})
73     dbcommit()
74
75     sqlexec(u'SELECT id, command, friendly_filename, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
76     row = get_common_cursor().fetchone()
77     if row is None:
78         logging.debug('Queue is empty.')
79         return False
80
81     jobid, command, friendly_filename, user_id = row
82     command = 'python -m ais.' + command
83     logging.info('Starting job %s: %s', jobid, command)
84
85     sqlexec(u'UPDATE job SET start_time=now() WHERE id=%(jobid)s', {'jobid': jobid})
86     dbcommit()
87
88     filename = jobid_ext_to_filename(jobid, friendly_filename)
89     output = file(filename, 'wb')
90
91     p = subprocess.Popen(command, stdout=output, shell=True)
92     logging.debug('System process id = %s', p.pid)
93     sqlexec(u'UPDATE job SET pid=' + unicode(p.pid) + ' WHERE id=%(jobid)s', {'jobid': jobid})
94     dbcommit()
95     
96     returncode = p.wait()
97     sqlexec(u'UPDATE job SET pid=NULL, finish_time=now(), result=' + unicode(returncode) + ' WHERE id=%(jobid)s', {'jobid': jobid})
98     dbcommit()
99     logging.info('Job complete: result=%s', returncode)
100
101     sqlexec(u'SELECT notify FROM job where id=%(jobid)s', {'jobid': jobid})
102     row = get_common_cursor().fetchone()
103     if row:
104         notify = row[0]
105         if notify == u'W':
106             sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
107             sqlexec(u'UPDATE job SET notify=NULL WHERE id=%(jobid)s', {'jobid': jobid})
108             dbcommit()
109         # else SMS, Mail ...
110
111     return True
112
113
114 def main():
115     from optparse import OptionParser
116
117     parser = OptionParser()
118     parser.add_option('-d', '--debug',
119         action='store_true', dest='debug', default=False,
120         help='debug mode')
121     parser.add_option('--debug-sql',
122         action='store_true', dest='debug_sql', default=False,
123         help='prints sql statements. Implies --debug')
124     parser.add_option('--daemon',
125         action='store_true', dest='daemonize', default=False,
126         help='runs as a daemon')
127     options, args = parser.parse_args()
128
129     if args:
130         print >> sys.stderr, "That program doesn't take any argument"
131         sys.exit(1)
132
133     if options.debug_sql:
134         sql_setdebug(True)
135         options.debug = True
136
137     if options.debug:
138         loglevel = logging.DEBUG
139     else:
140         loglevel = logging.INFO
141     logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
142
143     if options.daemonize:
144         try:
145             from daemon import DaemonContext
146         except ImportError:
147             from ais.daemon import DaemonContext
148
149         stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
150         #ais_gid = grp.getgrnam('ais').gr_gid
151         dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
152         # todo: pidfile= with import lockfile (squeeze)
153         dctx.open()
154
155     if os.path.exists(SOCK_FILENAME):
156         os.remove(SOCK_FILENAME)
157     server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
158     server.bind(SOCK_FILENAME)
159     os.chmod(SOCK_FILENAME, 0777) # anyone can wake up the daemon
160     
161     #TODO: don't run startup_clean if another daemon is runing, that is using pid file
162     startup_clean()
163     while True:
164         if not runjob():
165             server.recv(1024) # blocks
166
167 if __name__ == '__main__':
168     main()