from ais.inputs.common import is_id4_active
from ais.inputs.stats import STATS_DIR
from ais.inputs.config import peers_get_config
+from ais import jobrunner
def auth(username, raw_password):
try:
job.extension = u'kmz'
job.save()
request.user.info('Request queued as job %s' % job.id)
+ jobrunner.wakeup_daemon()
else:
value = kml_to_kmz(format_boat_track(nmea_iterator))
response = HttpResponse(value, mimetype="application/vnd.google-earth.kml")
job.extension = u'kmz'
job.save()
request.user.info('Request queued as job %s' % job.id)
+ jobrunner.wakeup_daemon()
else:
value = kml_to_kmz(format_boat_intime(nmea_iterator))
response = HttpResponse(value, mimetype="application/vnd.google-earth.kml")
job.extension = u'csv'
job.save()
request.user.info('Request queued as job %s' % job.id)
+ jobrunner.wakeup_daemon()
else:
value = StringIO()
output = csv.writer(value)
#!/usr/bin/env python
+__all__ = [ \
+ 'wakeup_daemon',
+ ]
+
import sys
+import os
+import time
import logging
import subprocess
+import socket
from ais.db import *
-def main():
- from optparse import OptionParser
-
- parser = OptionParser()
- parser.add_option('-d', '--debug',
- action='store_true', dest='debug', default=False,
- help="debug mode")
- options, args = parser.parse_args()
+SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
- if args:
- print >> sys.stderr, "That program doesn't take any argument"
- sys.exit(1)
+def wakeup_daemon():
+ client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ client.connect(SOCK_FILENAME)
+ client.send('')
- if options.debug:
- loglevel = logging.DEBUG
- sql_setdebug(True)
- else:
- loglevel = logging.INFO
- logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
+def runjob():
+ """
+ returns 0 if no job was waiting
+ returns 1 if a job was already runing
+ returns 2 when a job has been processed
+ """
+ # remove jobs archived for more than 1 day
+ # TODO: delete kmz/csv files too
+ sqlexec(u"DELETE FROM job WHERE archive_time < now() - '1 day'::interval")
+ dbcommit()
sqlexec(u'SELECT id, pid FROM job WHERE start_time IS NOT NULL AND finish_time IS NULL')
row = get_common_cursor().fetchone()
if row is not None:
logging.debug('Job %s is running: pid=%s', row[0], row[1])
- sys.exit(0)
+ return 1
sqlexec(u'SELECT id, command, extension, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
row = get_common_cursor().fetchone()
if row is None:
logging.debug('Queue is empty.')
- sys.exit(0)
+ return 0
jobid, command, extension, user_id = row
logging.info('Starting job %s: %s', jobid, command)
sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
dbcommit()
+ return 2
+
+
+def main():
+ from optparse import OptionParser
+
+ parser = OptionParser()
+ parser.add_option('-d', '--debug',
+ action='store_true', dest='debug', default=False,
+ help='debug mode')
+ parser.add_option('--debug-sql',
+ action='store_true', dest='debug_sql', default=False,
+ help='prints sql statements. Implies --debug')
+ parser.add_option('--daemon',
+ action='store_true', dest='daemonize', default=False,
+ help='runs as a daemon')
+ parser.add_option('--loop',
+ action='store_true', dest='loop', default=False,
+ help='keeps running forever')
+ options, args = parser.parse_args()
+
+ if args:
+ print >> sys.stderr, "That program doesn't take any argument"
+ sys.exit(1)
+
+ if options.debug_sql:
+ sql_setdebug(True)
+ options.debug = True
+
+ if options.debug:
+ loglevel = logging.DEBUG
+ else:
+ loglevel = logging.INFO
+ logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
+
+ if options.daemonize:
+ try:
+ from daemon import DaemonContext
+ except ImportError:
+ from ais.daemon import DaemonContext
+
+ stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
+ #ais_gid = grp.getgrnam('ais').gr_gid
+ dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
+ # todo: pidfile= with import lockfile (squeeze)
+ dctx.open()
+
+ if options.loop:
+ if os.path.exists(SOCK_FILENAME):
+ os.remove(SOCK_FILENAME)
+ server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ server.bind(SOCK_FILENAME)
+ os.chmod(SOCK_FILENAME, 0777)
+
+ while True:
+ r = runjob()
+ if r == 0:
+ server.recv(1024) # blocks
+ elif r == 1:
+ logging.error('Allready running?')
+ sys.exit(1)
+ # else loop now
+ else:
+ runjob()
+
if __name__ == '__main__':
main()