Fix url where to fetch the job resulst
[ais.git] / bin / jobrunner.py
1 #!/usr/bin/env python
2
3 from __future__ import division
4 import sys
5 import os
6 import os.path
7 import time
8 import logging
9 import subprocess
10 import socket
11 from random import SystemRandom
12 from ais.db import *
13
14 from ais.djais.settings import AIS_BASE_URL, NOTIFICATION_EMAIL
15
16 __all__ = [ \
17     'wakeup_daemon',
18     'DAEMON_WAKEUP_ERROR',
19     'make_unique_job_id',
20     ]
21
22 SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
23 RESULT_DIR = '/var/lib/ais/jobs/'
24 ARCHIVE_EXPIRE = '1 day' # postgres interval format
25
26 def wakeup_daemon():
27     client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
28     try:
29         client.connect(SOCK_FILENAME)
30         client.send('')
31         return True
32     except:
33         return False
34
35 DAEMON_WAKEUP_ERROR = """
36 Your job has been queued, but there was an error contacting the job
37 scheduler. Please repport the error.
38 """
39
40 def make_unique_job_id():
41     def make_id():
42         rnd = SystemRandom()
43         source = u'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
44         result = u''
45         for i in range(8):
46             result += source[int(rnd.random()*len(source))]
47         return result
48     return make_id() # TODO check it's unique
49
50
51 def addjob(user_id, command, friendly_filename, notify=None):
52     jobid = make_unique_job_id()
53     sqlexec(u'INSERT INTO job (id, user_id, command, friendly_filename, notify) VALUES (%(id)s, %(user_id)s, %(cmd)s, %(friendly_filename)s, %(notify)s)', {
54         'id':                   jobid,
55         'user_id':              user_id,
56         'cmd':                  command,
57         'friendly_filename':    friendly_filename,
58         'notify':               notify})
59     dbcommit()
60     wakeup_daemon()
61     return jobid
62
63
64 def jobid_ext_to_filename(jobid, friendly_filename):
65     extension = os.path.splitext(friendly_filename)[-1]
66     return RESULT_DIR + jobid + extension
67     
68 def startup_clean():
69     sqlexec(u'SELECT id, pid, start_time, finish_time FROM job WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
70     while True:
71         row = get_common_cursor().fetchone()
72         if not row:
73             break
74         logging.error('Startup: Job %s is supposed to be running: pid=%s, start_time=%s, finish_time=%s', *row)
75         # TODO: kill pid ?
76     
77     sqlexec(u'UPDATE job SET pid=NULL, start_time=NULL WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
78     dbcommit()
79
80
81 def sendmail(fromaddr, toaddr, subject, message):
82     import smtplib
83     server = smtplib.SMTP('localhost')
84     server.set_debuglevel(1)
85     message = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n%s" \
86        % (fromaddr, toaddr, subject, message)
87     try:
88         server.sendmail(fromaddr, toaddr, message)
89     except:
90         logging.error('Error sending notification to %s', toaddr)
91     server.quit()
92
93
94 def runjob():
95     """
96     returns True if a job was waiting and has been processed
97     """
98     # remove jobs archived for more than 1 day
99     deleted_jobs = []
100     sqlexec(u"SELECT id, friendly_filename FROM job WHERE archive_time < now() AT TIME ZONE 'GMT' - '%s'::interval" % ARCHIVE_EXPIRE)
101     while True:
102         row = get_common_cursor().fetchone()
103         if row is None:
104             break
105         filename = jobid_ext_to_filename(*row)
106         try:
107             os.unlink(filename)
108         except OSError, err:
109             logging.error('unlinking %s: %s', filename, err)
110         deleted_jobs.append(row[0])
111
112     for jobid in deleted_jobs:
113         sqlexec(u'DELETE FROM job WHERE id=%(id)s', {'id':jobid})
114     dbcommit()
115
116     sqlexec(u'SELECT id, command, friendly_filename, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
117     row = get_common_cursor().fetchone()
118     if row is None:
119         dbcommit() # Do not leave a transaction open
120         logging.debug('Queue is empty.')
121         return False
122
123     jobid, command, friendly_filename, user_id = row
124     command = 'python -m ais.' + command
125     logging.info('Starting job %s: %s', jobid, command)
126
127     sqlexec(u"UPDATE job SET start_time=now() AT TIME ZONE 'GMT' WHERE id=%(jobid)s", {'jobid': jobid})
128     dbcommit()
129
130     filename = jobid_ext_to_filename(jobid, friendly_filename)
131     output = file(filename, 'wb')
132
133     p = subprocess.Popen(command, stdout=output, shell=True)
134     logging.debug('System process id = %s', p.pid)
135     sqlexec(u'UPDATE job SET pid=' + unicode(p.pid) + ' WHERE id=%(jobid)s', {'jobid': jobid})
136     dbcommit()
137     
138     returncode = p.wait()
139     sqlexec(u"UPDATE job SET pid=NULL, finish_time=now() AT TIME ZONE 'GMT', result=" + unicode(returncode) + " WHERE id=%(jobid)s", {'jobid': jobid})
140     dbcommit()
141     logging.info('Job complete: result=%s', returncode)
142
143     sqlexec(u'SELECT notify FROM job where id=%(jobid)s', {'jobid': jobid})
144     row = get_common_cursor().fetchone()
145     if row:
146         notify = row[0]
147         if notify == u'W': # Web
148             sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
149             sqlexec(u'UPDATE job SET notify=NULL WHERE id=%(jobid)s', {'jobid': jobid})
150             dbcommit()
151
152         elif notify == u'M': # Notification
153             sqlexec('SELECT email FROM "user" WHERE id=%(user_id)s', {'user_id':user_id})
154             row = get_common_cursor().fetchone()
155             email = None
156             if row is not None:
157                 email = row[0]
158             if not email:
159                 logging.error("Can't find email of user %s", user_id)
160             else:
161                 sendmail(fromaddr=NOTIFICATION_EMAIL, toaddr=email, subject='Job complete', message='Your job is complete. You can download it at %s/job/' % AIS_BASE_URL)
162         # else SMS...
163
164     return True
165
166
167 def daemon(nice=0, foreground=False):
168     # be nice
169     os.nice(nice)
170
171     if not foreground:
172         try:
173             from daemon import DaemonContext
174         except ImportError:
175             from ais.daemon import DaemonContext
176
177         stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
178         #ais_gid = grp.getgrnam('ais').gr_gid
179         dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
180         # todo: pidfile= with import lockfile (squeeze)
181         dctx.open()
182
183     if os.path.exists(SOCK_FILENAME):
184         os.remove(SOCK_FILENAME)
185     server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
186     server.bind(SOCK_FILENAME)
187     os.chmod(SOCK_FILENAME, 0777) # anyone can wake up the daemon
188     #TODO: set receive queue size to 0 or 1 byte
189     
190     #TODO: don't run startup_clean if another daemon is runing, that is using pid file
191     startup_clean()
192
193     while True:
194         if not runjob():
195             server.recv(1024) # blocks
196
197     
198 def main():
199     from optparse import OptionParser
200
201     parser = OptionParser(usage='%prog [options] [userid command friendlyname]')
202     parser.add_option('--foreground',
203         action='store_true', dest='foreground', default=False,
204         help='runs daemon in foreground.')
205     parser.add_option('--nice',
206         action='store', type='int', dest='nice', default=5,
207         help='set scheduling nice value. Default = %default')
208     parser.add_option('-d', '--debug',
209         action='store_true', dest='debug', default=False,
210         help='debug mode')
211     parser.add_option('--debug-sql',
212         action='store_true', dest='debug_sql', default=False,
213         help='prints sql statements. Implies --debug')
214     
215     parser.add_option('--notification-code',
216         action='store', dest='notify', default='W',
217         help='Job complete notification method: W=web M=mail')
218     options, args = parser.parse_args()
219     
220     if options.debug_sql:
221         sql_setdebug(True)
222         options.debug = True
223
224     if options.debug:
225         loglevel = logging.DEBUG
226     else:
227         loglevel = logging.INFO
228     logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
229
230     if args:
231         assert len(args)==3, 'There must be either no arguments (daemon) or 3 of them'
232         user_id = int(args[0])
233         command = args[1]
234         friendly_name = args[2]
235         jobid = addjob(user_id, command, friendly_name, notify=options.notify)
236         print >> sys.stderr, 'Job', jobid, 'queued'
237         sys.exit(0)
238
239
240     daemon(foreground = options.foreground)
241
242 if __name__ == '__main__':
243     main()