Added a proper job_detail page that display information, and reloads itself.
[ais.git] / bin / jobrunner.py
index 1bfe97696e8805d3d0020e480fe717744f0ba497..065808beed8f0f802037d88c781754f1c7189d7b 100755 (executable)
@@ -1,49 +1,64 @@
 #!/usr/bin/env python
 
+__all__ = [ \
+    'wakeup_daemon',
+    'DAEMON_WAKEUP_ERROR',
+    ]
+
 import sys
+import os
+import time
 import logging
 import subprocess
+import socket
 from ais.db import *
 
-def main():
-    from optparse import OptionParser
+SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
 
-    parser = OptionParser()
-    parser.add_option('-d', '--debug',
-        action='store_true', dest='debug', default=False,
-        help="debug mode")
-    options, args = parser.parse_args()
+def wakeup_daemon():
+    client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+    try:
+        client.connect(SOCK_FILENAME)
+        client.send('')
+        return True
+    except:
+        return False
 
-    if args:
-        print >> sys.stderr, "That program doesn't take any argument"
-        sys.exit(1)
+DAEMON_WAKEUP_ERROR = """
+Your job has been queued, but there was an error contacting the task
+scheduler. Please repport the error.
+"""
 
-    if options.debug:
-        loglevel = logging.DEBUG
-        sql_setdebug(True)
-    else:
-        loglevel = logging.INFO
-    logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
+def runjob():
+    """
+    returns 0 if no job was waiting
+    returns 1 if a job was already runing
+    returns 2 when a job has been processed
+    """
+    # remove jobs archived for more than 1 day
+    # TODO: delete kmz/csv files too
+    sqlexec(u"DELETE FROM job WHERE archive_time < now() - '1 day'::interval")
+    dbcommit()
 
     sqlexec(u'SELECT id, pid FROM job WHERE start_time IS NOT NULL AND finish_time IS NULL')
     row = get_common_cursor().fetchone()
     if row is not None:
         logging.debug('Job %s is running: pid=%s', row[0], row[1])
-        sys.exit(0)
+        return 1
 
-    sqlexec(u'SELECT id, command FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
+    sqlexec(u'SELECT id, command, extension, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
     row = get_common_cursor().fetchone()
     if row is None:
         logging.debug('Queue is empty.')
-        sys.exit(0)
+        return 0
 
-    jobid, command = row
+    jobid, command, extension, user_id = row
     logging.info('Starting job %s: %s', jobid, command)
 
     sqlexec(u'UPDATE job SET start_time=now() WHERE id=%(jobid)s', {'jobid': jobid})
     dbcommit()
 
-    output = file('/var/lib/ais/jobs/'+unicode(jobid), 'wb')
+    output = file('/var/lib/ais/jobs/'+unicode(jobid)+'.'+extension, 'wb')
     p = subprocess.Popen(command, stdout=output, shell=True)
     logging.debug('System process id = %s', p.pid)
     sqlexec(u'UPDATE job SET pid=' + unicode(p.pid) + ' WHERE id=%(jobid)s', {'jobid': jobid})
@@ -54,5 +69,73 @@ def main():
     dbcommit()
     logging.info('Job complete: result=%s', returncode)
 
+    sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
+    dbcommit()
+
+    return 2
+
+
+def main():
+    from optparse import OptionParser
+
+    parser = OptionParser()
+    parser.add_option('-d', '--debug',
+        action='store_true', dest='debug', default=False,
+        help='debug mode')
+    parser.add_option('--debug-sql',
+        action='store_true', dest='debug_sql', default=False,
+        help='prints sql statements. Implies --debug')
+    parser.add_option('--daemon',
+        action='store_true', dest='daemonize', default=False,
+        help='runs as a daemon')
+    parser.add_option('--loop',
+        action='store_true', dest='loop', default=False,
+        help='keeps running forever')
+    options, args = parser.parse_args()
+
+    if args:
+        print >> sys.stderr, "That program doesn't take any argument"
+        sys.exit(1)
+
+    if options.debug_sql:
+        sql_setdebug(True)
+        options.debug = True
+
+    if options.debug:
+        loglevel = logging.DEBUG
+    else:
+        loglevel = logging.INFO
+    logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
+
+    if options.daemonize:
+        try:
+            from daemon import DaemonContext
+        except ImportError:
+            from ais.daemon import DaemonContext
+
+        stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
+        #ais_gid = grp.getgrnam('ais').gr_gid
+        dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
+        # todo: pidfile= with import lockfile (squeeze)
+        dctx.open()
+
+    if options.loop:
+        if os.path.exists(SOCK_FILENAME):
+            os.remove(SOCK_FILENAME)
+        server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+        server.bind(SOCK_FILENAME)
+        os.chmod(SOCK_FILENAME, 0777)
+
+        while True:
+            r = runjob()
+            if r == 0:
+                server.recv(1024) # blocks
+            elif r == 1:
+                logging.error('Allready running?')
+                sys.exit(1)
+            # else loop now
+    else:
+        runjob()
+
 if __name__ == '__main__':
     main()