Save job logs, repport the size
[ais.git] / bin / jobrunner.py
1 #!/usr/bin/env python
2
3 from __future__ import division
4 import sys
5 import os
6 import os.path
7 import time
8 import logging
9 import subprocess
10 import socket
11 from random import SystemRandom
12 from ais.db import *
13
14 from ais.djais.settings import AIS_BASE_URL, NOTIFICATION_EMAIL
15
16 __all__ = [ \
17     'wakeup_daemon',
18     'DAEMON_WAKEUP_ERROR',
19     'make_unique_job_id',
20     ]
21
22 SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
23 RESULT_DIR = '/var/lib/ais/jobs/'
24 ARCHIVE_EXPIRE = '1 day' # postgres interval format
25
26 def wakeup_daemon():
27     client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
28     try:
29         client.connect(SOCK_FILENAME)
30         client.send('')
31         return True
32     except:
33         return False
34
35 DAEMON_WAKEUP_ERROR = """
36 Your job has been queued, but there was an error contacting the job
37 scheduler. Please repport the error.
38 """
39
40 def make_unique_job_id():
41     def make_id():
42         rnd = SystemRandom()
43         source = u'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
44         result = u''
45         for i in range(8):
46             result += source[int(rnd.random()*len(source))]
47         return result
48     return make_id() # TODO check it's unique
49
50
51 def addjob(user_id, command, friendly_filename, notify=None):
52     jobid = make_unique_job_id()
53     sqlexec(u'INSERT INTO job (id, user_id, command, friendly_filename, notify) VALUES (%(id)s, %(user_id)s, %(cmd)s, %(friendly_filename)s, %(notify)s)', {
54         'id':                   jobid,
55         'user_id':              user_id,
56         'cmd':                  command,
57         'friendly_filename':    friendly_filename,
58         'notify':               notify})
59     dbcommit()
60     wakeup_daemon()
61     return jobid
62
63
64 def jobid_ext_to_filenames(jobid, friendly_filename):
65     extension = os.path.splitext(friendly_filename)[-1]
66     return RESULT_DIR + jobid + extension, RESULT_DIR + jobid + '.log'
67     
68 def startup_clean():
69     sqlexec(u'SELECT id, pid, start_time, finish_time FROM job WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
70     while True:
71         row = get_common_cursor().fetchone()
72         if not row:
73             break
74         logging.error('Startup: Job %s is supposed to be running: pid=%s, start_time=%s, finish_time=%s', *row)
75         # TODO: kill pid ?
76     
77     sqlexec(u'UPDATE job SET pid=NULL, start_time=NULL WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
78     dbcommit()
79
80
81 def sendmail(fromaddr, toaddr, subject, message):
82     import smtplib
83     server = smtplib.SMTP('localhost')
84     server.set_debuglevel(1)
85     message = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n%s" \
86        % (fromaddr, toaddr, subject, message)
87     try:
88         server.sendmail(fromaddr, toaddr, message)
89     except:
90         logging.error('Error sending notification to %s', toaddr)
91     server.quit()
92
93
94 def runjob():
95     """
96     returns True if a job was waiting and has been processed
97     """
98     # remove jobs archived for more than 1 day
99     deleted_jobs = []
100     sqlexec(u"SELECT id, friendly_filename FROM job WHERE archive_time < now() AT TIME ZONE 'GMT' - '%s'::interval" % ARCHIVE_EXPIRE)
101     while True:
102         row = get_common_cursor().fetchone()
103         if row is None:
104             break
105         filename, logfilename = jobid_ext_to_filenames(*row)
106         try:
107             os.unlink(filename)
108         except OSError, err:
109             logging.error('unlinking %s: %s', filename, err)
110         try:
111             os.unlink(logfilename)
112         except OSError, err:
113             logging.error('unlinking %s: %s', logfilename, err)
114         deleted_jobs.append(row[0])
115
116     for jobid in deleted_jobs:
117         sqlexec(u'DELETE FROM job WHERE id=%(id)s', {'id':jobid})
118     dbcommit()
119
120     sqlexec(u'SELECT id, command, friendly_filename, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
121     row = get_common_cursor().fetchone()
122     if row is None:
123         dbcommit() # Do not leave a transaction open
124         logging.debug('Queue is empty.')
125         return False
126
127     jobid, command, friendly_filename, user_id = row
128     command = 'python -m ais.' + command
129     logging.info('Starting job %s: %s', jobid, command)
130
131     sqlexec(u"UPDATE job SET start_time=now() AT TIME ZONE 'GMT' WHERE id=%(jobid)s", {'jobid': jobid})
132     dbcommit()
133
134     filename, logfilename = jobid_ext_to_filenames(jobid, friendly_filename)
135     output = file(filename, 'wb')
136     logfile = file(logfilename, 'wb')
137
138     p = subprocess.Popen(command, stdout=output, stderr=logfile, shell=True)
139     logging.debug('System process id = %s', p.pid)
140     sqlexec(u'UPDATE job SET pid=' + unicode(p.pid) + ' WHERE id=%(jobid)s', {'jobid': jobid})
141     dbcommit()
142     
143     returncode = p.wait()
144     sqlexec(u"UPDATE job SET pid=NULL, finish_time=now() AT TIME ZONE 'GMT', result=" + unicode(returncode) + " WHERE id=%(jobid)s", {'jobid': jobid})
145     dbcommit()
146     logging.info('Job complete: result=%s', returncode)
147
148     sqlexec(u'SELECT notify FROM job where id=%(jobid)s', {'jobid': jobid})
149     row = get_common_cursor().fetchone()
150     if row:
151         notify = row[0]
152         if notify == u'W': # Web
153             sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
154             sqlexec(u'UPDATE job SET notify=NULL WHERE id=%(jobid)s', {'jobid': jobid})
155             dbcommit()
156
157         elif notify == u'M': # Notification
158             sqlexec('SELECT email FROM "user" WHERE id=%(user_id)s', {'user_id':user_id})
159             row = get_common_cursor().fetchone()
160             email = None
161             if row is not None:
162                 email = row[0]
163             if not email:
164                 logging.error("Can't find email of user %s", user_id)
165             else:
166                 sendmail(fromaddr=NOTIFICATION_EMAIL, toaddr=email, subject='Job complete', message='Your job is complete. You can download it at %s/job/' % AIS_BASE_URL)
167         # else SMS...
168
169     return True
170
171
172 def daemon(nice=0, foreground=False):
173     # be nice
174     os.nice(nice)
175
176     if not foreground:
177         try:
178             from daemon import DaemonContext
179         except ImportError:
180             from ais.daemon import DaemonContext
181
182         stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
183         #ais_gid = grp.getgrnam('ais').gr_gid
184         dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
185         # todo: pidfile= with import lockfile (squeeze)
186         dctx.open()
187
188     if os.path.exists(SOCK_FILENAME):
189         os.remove(SOCK_FILENAME)
190     server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
191     server.bind(SOCK_FILENAME)
192     os.chmod(SOCK_FILENAME, 0777) # anyone can wake up the daemon
193     #TODO: set receive queue size to 0 or 1 byte
194     
195     #TODO: don't run startup_clean if another daemon is runing, that is using pid file
196     startup_clean()
197
198     while True:
199         if not runjob():
200             server.recv(1024) # blocks
201
202     
203 def main():
204     from optparse import OptionParser
205
206     parser = OptionParser(usage='%prog [options] [userid command friendlyname]')
207     parser.add_option('--foreground',
208         action='store_true', dest='foreground', default=False,
209         help='runs daemon in foreground.')
210     parser.add_option('--nice',
211         action='store', type='int', dest='nice', default=5,
212         help='set scheduling nice value. Default = %default')
213     parser.add_option('-d', '--debug',
214         action='store_true', dest='debug', default=False,
215         help='debug mode')
216     parser.add_option('--debug-sql',
217         action='store_true', dest='debug_sql', default=False,
218         help='prints sql statements. Implies --debug')
219     
220     parser.add_option('--notification-code',
221         action='store', dest='notify', default='W',
222         help='Job complete notification method: W=web M=mail')
223     options, args = parser.parse_args()
224     
225     if options.debug_sql:
226         sql_setdebug(True)
227         options.debug = True
228
229     if options.debug:
230         loglevel = logging.DEBUG
231     else:
232         loglevel = logging.INFO
233     logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
234
235     if args:
236         assert len(args)==3, 'There must be either no arguments (daemon) or 3 of them'
237         user_id = int(args[0])
238         command = args[1]
239         friendly_name = args[2]
240         jobid = addjob(user_id, command, friendly_name, notify=options.notify)
241         print >> sys.stderr, 'Job', jobid, 'queued'
242         sys.exit(0)
243
244
245     daemon(foreground = options.foreground)
246
247 if __name__ == '__main__':
248     main()