577fabc7b27bc85f350edc0971d709e94a5feca7
[ais.git] / bin / jobrunner.py
1 #!/usr/bin/env python
2
3 __all__ = [ \
4     'wakeup_daemon',
5     'DAEMON_WAKEUP_ERROR',
6     ]
7
8 import sys
9 import os
10 import os.path
11 import time
12 import logging
13 import subprocess
14 import socket
15 from ais.db import *
16
17 SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
18
19 def wakeup_daemon():
20     client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
21     try:
22         client.connect(SOCK_FILENAME)
23         client.send('')
24         return True
25     except:
26         return False
27
28 DAEMON_WAKEUP_ERROR = """
29 Your job has been queued, but there was an error contacting the task
30 scheduler. Please repport the error.
31 """
32
33 def runjob():
34     """
35     returns 0 if no job was waiting
36     returns 1 if a job was already runing
37     returns 2 when a job has been processed
38     """
39     # remove jobs archived for more than 1 day
40     # TODO: delete kmz/csv files too
41     sqlexec(u"DELETE FROM job WHERE archive_time < now() - '1 day'::interval")
42     dbcommit()
43
44     sqlexec(u'SELECT id, pid FROM job WHERE start_time IS NOT NULL AND finish_time IS NULL')
45     row = get_common_cursor().fetchone()
46     if row is not None:
47         logging.debug('Job %s is running: pid=%s', row[0], row[1])
48         return 1
49
50     sqlexec(u'SELECT id, command, friendly_filename, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
51     row = get_common_cursor().fetchone()
52     if row is None:
53         logging.debug('Queue is empty.')
54         return 0
55
56     jobid, command, friendly_filename, user_id = row
57     logging.info('Starting job %s: %s', jobid, command)
58
59     sqlexec(u'UPDATE job SET start_time=now() WHERE id=%(jobid)s', {'jobid': jobid})
60     dbcommit()
61
62     extension = os.path.splitext(friendly_filename)[-1]
63     output = file('/var/lib/ais/jobs/'+unicode(jobid)+extension, 'wb')
64     p = subprocess.Popen(command, stdout=output, shell=True)
65     logging.debug('System process id = %s', p.pid)
66     sqlexec(u'UPDATE job SET pid=' + unicode(p.pid) + ' WHERE id=%(jobid)s', {'jobid': jobid})
67     dbcommit()
68     
69     returncode = p.wait()
70     sqlexec(u'UPDATE job SET pid=NULL, finish_time=now(), result=' + unicode(returncode) + ' WHERE id=%(jobid)s', {'jobid': jobid})
71     dbcommit()
72     logging.info('Job complete: result=%s', returncode)
73
74     sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
75     dbcommit()
76
77     return 2
78
79
80 def main():
81     from optparse import OptionParser
82
83     parser = OptionParser()
84     parser.add_option('-d', '--debug',
85         action='store_true', dest='debug', default=False,
86         help='debug mode')
87     parser.add_option('--debug-sql',
88         action='store_true', dest='debug_sql', default=False,
89         help='prints sql statements. Implies --debug')
90     parser.add_option('--daemon',
91         action='store_true', dest='daemonize', default=False,
92         help='runs as a daemon')
93     parser.add_option('--loop',
94         action='store_true', dest='loop', default=False,
95         help='keeps running forever')
96     options, args = parser.parse_args()
97
98     if args:
99         print >> sys.stderr, "That program doesn't take any argument"
100         sys.exit(1)
101
102     if options.debug_sql:
103         sql_setdebug(True)
104         options.debug = True
105
106     if options.debug:
107         loglevel = logging.DEBUG
108     else:
109         loglevel = logging.INFO
110     logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
111
112     if options.daemonize:
113         try:
114             from daemon import DaemonContext
115         except ImportError:
116             from ais.daemon import DaemonContext
117
118         stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
119         #ais_gid = grp.getgrnam('ais').gr_gid
120         dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
121         # todo: pidfile= with import lockfile (squeeze)
122         dctx.open()
123
124     if options.loop:
125         if os.path.exists(SOCK_FILENAME):
126             os.remove(SOCK_FILENAME)
127         server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
128         server.bind(SOCK_FILENAME)
129         os.chmod(SOCK_FILENAME, 0777)
130
131         while True:
132             r = runjob()
133             if r == 0:
134                 server.recv(1024) # blocks
135             elif r == 1:
136                 logging.error('Allready running?')
137                 sys.exit(1)
138             # else loop now
139     else:
140         runjob()
141
142 if __name__ == '__main__':
143     main()