038a2b34189f47380ecd4d72d6a185040d54843c
[ais.git] / bin / jobrunner.py
1 #!/usr/bin/env python
2
3 __all__ = [ \
4     'wakeup_daemon',
5     ]
6
7 import sys
8 import os
9 import time
10 import logging
11 import subprocess
12 import socket
13 from ais.db import *
14
15 SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
16
17 def wakeup_daemon():
18     client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
19     client.connect(SOCK_FILENAME)
20     client.send('')
21
22 def runjob():
23     """
24     returns 0 if no job was waiting
25     returns 1 if a job was already runing
26     returns 2 when a job has been processed
27     """
28     # remove jobs archived for more than 1 day
29     # TODO: delete kmz/csv files too
30     sqlexec(u"DELETE FROM job WHERE archive_time < now() - '1 day'::interval")
31     dbcommit()
32
33     sqlexec(u'SELECT id, pid FROM job WHERE start_time IS NOT NULL AND finish_time IS NULL')
34     row = get_common_cursor().fetchone()
35     if row is not None:
36         logging.debug('Job %s is running: pid=%s', row[0], row[1])
37         return 1
38
39     sqlexec(u'SELECT id, command, extension, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
40     row = get_common_cursor().fetchone()
41     if row is None:
42         logging.debug('Queue is empty.')
43         return 0
44
45     jobid, command, extension, user_id = row
46     logging.info('Starting job %s: %s', jobid, command)
47
48     sqlexec(u'UPDATE job SET start_time=now() WHERE id=%(jobid)s', {'jobid': jobid})
49     dbcommit()
50
51     output = file('/var/lib/ais/jobs/'+unicode(jobid)+'.'+extension, 'wb')
52     p = subprocess.Popen(command, stdout=output, shell=True)
53     logging.debug('System process id = %s', p.pid)
54     sqlexec(u'UPDATE job SET pid=' + unicode(p.pid) + ' WHERE id=%(jobid)s', {'jobid': jobid})
55     dbcommit()
56     
57     returncode = p.wait()
58     sqlexec(u'UPDATE job SET pid=NULL, finish_time=now(), result=' + unicode(returncode) + ' WHERE id=%(jobid)s', {'jobid': jobid})
59     dbcommit()
60     logging.info('Job complete: result=%s', returncode)
61
62     sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
63     dbcommit()
64
65     return 2
66
67
68 def main():
69     from optparse import OptionParser
70
71     parser = OptionParser()
72     parser.add_option('-d', '--debug',
73         action='store_true', dest='debug', default=False,
74         help='debug mode')
75     parser.add_option('--debug-sql',
76         action='store_true', dest='debug_sql', default=False,
77         help='prints sql statements. Implies --debug')
78     parser.add_option('--daemon',
79         action='store_true', dest='daemonize', default=False,
80         help='runs as a daemon')
81     parser.add_option('--loop',
82         action='store_true', dest='loop', default=False,
83         help='keeps running forever')
84     options, args = parser.parse_args()
85
86     if args:
87         print >> sys.stderr, "That program doesn't take any argument"
88         sys.exit(1)
89
90     if options.debug_sql:
91         sql_setdebug(True)
92         options.debug = True
93
94     if options.debug:
95         loglevel = logging.DEBUG
96     else:
97         loglevel = logging.INFO
98     logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
99
100     if options.daemonize:
101         try:
102             from daemon import DaemonContext
103         except ImportError:
104             from ais.daemon import DaemonContext
105
106         stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
107         #ais_gid = grp.getgrnam('ais').gr_gid
108         dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
109         # todo: pidfile= with import lockfile (squeeze)
110         dctx.open()
111
112     if options.loop:
113         if os.path.exists(SOCK_FILENAME):
114             os.remove(SOCK_FILENAME)
115         server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
116         server.bind(SOCK_FILENAME)
117         os.chmod(SOCK_FILENAME, 0777)
118
119         while True:
120             r = runjob()
121             if r == 0:
122                 server.recv(1024) # blocks
123             elif r == 1:
124                 logging.error('Allready running?')
125                 sys.exit(1)
126             # else loop now
127     else:
128         runjob()
129
130 if __name__ == '__main__':
131     main()