#!/usr/bin/env python
+__all__ = [ \
+ 'wakeup_daemon',
+ 'DAEMON_WAKEUP_ERROR',
+ ]
+
import sys
+import os
+import time
import logging
import subprocess
+import socket
from ais.db import *
-def main():
- from optparse import OptionParser
+SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
- parser = OptionParser()
- parser.add_option('-d', '--debug',
- action='store_true', dest='debug', default=False,
- help="debug mode")
- options, args = parser.parse_args()
+def wakeup_daemon():
+ client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ try:
+ client.connect(SOCK_FILENAME)
+ client.send('')
+ return True
+ except:
+ return False
- if args:
- print >> sys.stderr, "That program doesn't take any argument"
- sys.exit(1)
+DAEMON_WAKEUP_ERROR = """
+Your job has been queued, but there was an error contacting the task
+scheduler. Please repport the error.
+"""
- if options.debug:
- loglevel = logging.DEBUG
- sql_setdebug(True)
- else:
- loglevel = logging.INFO
- logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
+def runjob():
+ """
+ returns 0 if no job was waiting
+ returns 1 if a job was already runing
+ returns 2 when a job has been processed
+ """
+ # remove jobs archived for more than 1 day
+ # TODO: delete kmz/csv files too
+ sqlexec(u"DELETE FROM job WHERE archive_time < now() - '1 day'::interval")
+ dbcommit()
sqlexec(u'SELECT id, pid FROM job WHERE start_time IS NOT NULL AND finish_time IS NULL')
row = get_common_cursor().fetchone()
if row is not None:
logging.debug('Job %s is running: pid=%s', row[0], row[1])
- sys.exit(0)
+ return 1
- sqlexec(u'SELECT id, command, extension FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
+ sqlexec(u'SELECT id, command, extension, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
row = get_common_cursor().fetchone()
if row is None:
logging.debug('Queue is empty.')
- sys.exit(0)
+ return 0
- jobid, command, extension = row
+ jobid, command, extension, user_id = row
logging.info('Starting job %s: %s', jobid, command)
sqlexec(u'UPDATE job SET start_time=now() WHERE id=%(jobid)s', {'jobid': jobid})
dbcommit()
logging.info('Job complete: result=%s', returncode)
+ sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
+ dbcommit()
+
+ return 2
+
+
+def main():
+ from optparse import OptionParser
+
+ parser = OptionParser()
+ parser.add_option('-d', '--debug',
+ action='store_true', dest='debug', default=False,
+ help='debug mode')
+ parser.add_option('--debug-sql',
+ action='store_true', dest='debug_sql', default=False,
+ help='prints sql statements. Implies --debug')
+ parser.add_option('--daemon',
+ action='store_true', dest='daemonize', default=False,
+ help='runs as a daemon')
+ parser.add_option('--loop',
+ action='store_true', dest='loop', default=False,
+ help='keeps running forever')
+ options, args = parser.parse_args()
+
+ if args:
+ print >> sys.stderr, "That program doesn't take any argument"
+ sys.exit(1)
+
+ if options.debug_sql:
+ sql_setdebug(True)
+ options.debug = True
+
+ if options.debug:
+ loglevel = logging.DEBUG
+ else:
+ loglevel = logging.INFO
+ logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
+
+ if options.daemonize:
+ try:
+ from daemon import DaemonContext
+ except ImportError:
+ from ais.daemon import DaemonContext
+
+ stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
+ #ais_gid = grp.getgrnam('ais').gr_gid
+ dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
+ # todo: pidfile= with import lockfile (squeeze)
+ dctx.open()
+
+ if options.loop:
+ if os.path.exists(SOCK_FILENAME):
+ os.remove(SOCK_FILENAME)
+ server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ server.bind(SOCK_FILENAME)
+ os.chmod(SOCK_FILENAME, 0777)
+
+ while True:
+ r = runjob()
+ if r == 0:
+ server.recv(1024) # blocks
+ elif r == 1:
+ logging.error('Allready running?')
+ sys.exit(1)
+ # else loop now
+ else:
+ runjob()
+
if __name__ == '__main__':
main()