Improved jobruner daemon:
authorJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Tue, 9 Nov 2010 23:28:19 +0000 (23:28 +0000)
committerJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Tue, 9 Nov 2010 23:28:19 +0000 (23:28 +0000)
. wakes immeditaly when a job a submited
. added job pg index
. updated INSTALL doc

INSTALL
bin/djais/views.py
bin/jobrunner.py
structure.sql

diff --git a/INSTALL b/INSTALL
index c1725af62cfb2463fc448b7a5f98d37ab75c06cc..130121a7f48a6e9befbdaf6996ba41f4d99b97e2 100644 (file)
--- a/INSTALL
+++ b/INSTALL
@@ -32,9 +32,12 @@ mkdir --mode 2775 /var/lib/ais/nmea/
 mkdir --mode 2775 /var/lib/ais/areas/
 mkdir --mode 2775 /var/lib/ais/cron/
 mkdir --mode 2775 /var/lib/ais/cron/fleets/
+mkdir --mode 2775 /var/lib/ais/jobs/
 mkdir /etc/ais/
 mkdir --mode 2775 /var/log/ais
 chown :ais /var/log/ais
+mkdir --mode 2775 /var/run/ais/
+chown :ais /var/run/ais
 
 = Postgres setup
 ================
index 20b4a7d54972c4d4dda92cd918117a93bc881801..33363ffe6f0510454e4b7f04907d832de0f7a5f2 100644 (file)
@@ -29,6 +29,7 @@ from ais.ntools import datetime_to_timestamp, clean_ais_charset
 from ais.inputs.common import is_id4_active
 from ais.inputs.stats import STATS_DIR
 from ais.inputs.config import peers_get_config
+from ais import jobrunner
 
 def auth(username, raw_password):
     try:
@@ -416,6 +417,7 @@ def vessel_history(request, strmmsi, format=None):
             job.extension = u'kmz'
             job.save()
             request.user.info('Request queued as job %s' % job.id)
+            jobrunner.wakeup_daemon()
         else:
             value = kml_to_kmz(format_boat_track(nmea_iterator))
             response = HttpResponse(value, mimetype="application/vnd.google-earth.kml")
@@ -431,6 +433,7 @@ def vessel_history(request, strmmsi, format=None):
             job.extension = u'kmz'
             job.save()
             request.user.info('Request queued as job %s' % job.id)
+            jobrunner.wakeup_daemon()
         else:
             value = kml_to_kmz(format_boat_intime(nmea_iterator))
             response = HttpResponse(value, mimetype="application/vnd.google-earth.kml")
@@ -446,6 +449,7 @@ def vessel_history(request, strmmsi, format=None):
             job.extension = u'csv'
             job.save()
             request.user.info('Request queued as job %s' % job.id)
+            jobrunner.wakeup_daemon()
         else:
             value = StringIO()
             output = csv.writer(value)
index 0d0b13ba53f44f8c6eee545d178f12b2c9be40b8..038a2b34189f47380ecd4d72d6a185040d54843c 100755 (executable)
@@ -1,41 +1,46 @@
 #!/usr/bin/env python
 
+__all__ = [ \
+    'wakeup_daemon',
+    ]
+
 import sys
+import os
+import time
 import logging
 import subprocess
+import socket
 from ais.db import *
 
-def main():
-    from optparse import OptionParser
-
-    parser = OptionParser()
-    parser.add_option('-d', '--debug',
-        action='store_true', dest='debug', default=False,
-        help="debug mode")
-    options, args = parser.parse_args()
+SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
 
-    if args:
-        print >> sys.stderr, "That program doesn't take any argument"
-        sys.exit(1)
+def wakeup_daemon():
+    client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+    client.connect(SOCK_FILENAME)
+    client.send('')
 
-    if options.debug:
-        loglevel = logging.DEBUG
-        sql_setdebug(True)
-    else:
-        loglevel = logging.INFO
-    logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
+def runjob():
+    """
+    returns 0 if no job was waiting
+    returns 1 if a job was already runing
+    returns 2 when a job has been processed
+    """
+    # remove jobs archived for more than 1 day
+    # TODO: delete kmz/csv files too
+    sqlexec(u"DELETE FROM job WHERE archive_time < now() - '1 day'::interval")
+    dbcommit()
 
     sqlexec(u'SELECT id, pid FROM job WHERE start_time IS NOT NULL AND finish_time IS NULL')
     row = get_common_cursor().fetchone()
     if row is not None:
         logging.debug('Job %s is running: pid=%s', row[0], row[1])
-        sys.exit(0)
+        return 1
 
     sqlexec(u'SELECT id, command, extension, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
     row = get_common_cursor().fetchone()
     if row is None:
         logging.debug('Queue is empty.')
-        sys.exit(0)
+        return 0
 
     jobid, command, extension, user_id = row
     logging.info('Starting job %s: %s', jobid, command)
@@ -57,5 +62,70 @@ def main():
     sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
     dbcommit()
 
+    return 2
+
+
+def main():
+    from optparse import OptionParser
+
+    parser = OptionParser()
+    parser.add_option('-d', '--debug',
+        action='store_true', dest='debug', default=False,
+        help='debug mode')
+    parser.add_option('--debug-sql',
+        action='store_true', dest='debug_sql', default=False,
+        help='prints sql statements. Implies --debug')
+    parser.add_option('--daemon',
+        action='store_true', dest='daemonize', default=False,
+        help='runs as a daemon')
+    parser.add_option('--loop',
+        action='store_true', dest='loop', default=False,
+        help='keeps running forever')
+    options, args = parser.parse_args()
+
+    if args:
+        print >> sys.stderr, "That program doesn't take any argument"
+        sys.exit(1)
+
+    if options.debug_sql:
+        sql_setdebug(True)
+        options.debug = True
+
+    if options.debug:
+        loglevel = logging.DEBUG
+    else:
+        loglevel = logging.INFO
+    logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
+
+    if options.daemonize:
+        try:
+            from daemon import DaemonContext
+        except ImportError:
+            from ais.daemon import DaemonContext
+
+        stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
+        #ais_gid = grp.getgrnam('ais').gr_gid
+        dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
+        # todo: pidfile= with import lockfile (squeeze)
+        dctx.open()
+
+    if options.loop:
+        if os.path.exists(SOCK_FILENAME):
+            os.remove(SOCK_FILENAME)
+        server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+        server.bind(SOCK_FILENAME)
+        os.chmod(SOCK_FILENAME, 0777)
+
+        while True:
+            r = runjob()
+            if r == 0:
+                server.recv(1024) # blocks
+            elif r == 1:
+                logging.error('Allready running?')
+                sys.exit(1)
+            # else loop now
+    else:
+        runjob()
+
 if __name__ == '__main__':
     main()
index 8801105c7d9f4da01c38056587059ca77ad775c7..edcead7da24f5933736a4f603d03a5de4bb8dcb3 100644 (file)
@@ -497,6 +497,13 @@ ALTER TABLE ONLY vessel
     ADD CONSTRAINT vessel_pkey PRIMARY KEY (mmsi);
 
 
+--
+-- Name: job_user_id; Type: INDEX; Schema: public; Owner: -; Tablespace: 
+--
+
+CREATE INDEX job_user_id ON job USING btree (user_id);
+
+
 --
 -- Name: user_message_user_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --