First version of the job queue
authorJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Mon, 8 Nov 2010 00:00:20 +0000 (00:00 +0000)
committerJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Mon, 8 Nov 2010 00:00:20 +0000 (00:00 +0000)
bin/common.py
bin/djais/models.py
bin/djais/urls.py
bin/djais/views.py
bin/jobrunner.py [new file with mode: 0755]
html_templates/base.html
html_templates/jobs.html [new file with mode: 0644]
html_templates/vessel.html

index e7fba43e1cf34370b42dd5d4e246e7c7349f6267..3e07f4da4241a3649f1d9bd1ae202403f70f062e 100755 (executable)
@@ -30,6 +30,7 @@ __all__ = [
     'BankNmea5',
     'Nmea5Feeder',
     'NmeaFeeder',
+    'nice_timedelta_str',
     'all_mmsi_generator',
     'load_fleet_to_uset',
     'fleetname_to_fleetid',
@@ -1218,51 +1219,7 @@ class Nmea(Nmea1, Nmea5):
             return u'Never'
         dt_lastupdate = datetime.utcfromtimestamp(lastupdate)
         delta = datetime.utcnow() - dt_lastupdate
-        def nice_timedelta_str(delta):
-            strdelta = ''
-            disprank = None # first item type displayed
-            if delta.days:
-                strdelta += str(delta.days)
-                if delta.days > 1:
-                    strdelta += ' days '
-                else:
-                    strdelta += ' day '
-                disprank = 0
-            delta_s = delta.seconds
-            delta_m = delta_s / 60
-            delta_s -= delta_m * 60
-            delta_h = delta_m / 60
-            delta_m -= delta_h * 60
-
-            if delta_h:
-                strdelta += str(delta_h)
-                if delta_h > 1:
-                    strdelta += ' hours '
-                else:
-                    strdelta += ' hour '
-                if disprank is None:
-                    disprank = 1
-            if delta_m and (disprank is None or disprank >= 1):
-                strdelta += str(delta_m)
-                if delta_m > 1:
-                    strdelta += ' minutes '
-                else:
-                    strdelta += ' minute '
-                if disprank is None:
-                    disprank = 2
-            if delta_s and (disprank is None or disprank >= 2):
-                strdelta += str(delta_s)
-                if delta_s > 1:
-                    strdelta += ' seconds '
-                else:
-                    strdelta += ' second '
-                if disprank is None:
-                    disprank = 3
-            if not strdelta:
-                strdelta = 'less than a second '
-            strdelta += ' ago'
-            return strdelta
-        return dt_lastupdate.strftime('%Y-%m-%d %H:%M:%S GMT') + ' (' +  nice_timedelta_str(delta) + ')'
+        return dt_lastupdate.strftime('%Y-%m-%d %H:%M:%S GMT') + ' (' +  nice_timedelta_str(delta) + ' ago)'
 
     @staticmethod
     def format_source(infosrc):
@@ -1703,6 +1660,50 @@ class NmeaFeeder:
                 lasttimestamp = nmea.timestamp_1
 
 
+def nice_timedelta_str(delta):
+    strdelta = ''
+    disprank = None # first item type displayed
+    if delta.days:
+        strdelta += str(delta.days)
+        if delta.days > 1:
+            strdelta += ' days '
+        else:
+            strdelta += ' day '
+        disprank = 0
+    delta_s = delta.seconds
+    delta_m = delta_s / 60
+    delta_s -= delta_m * 60
+    delta_h = delta_m / 60
+    delta_m -= delta_h * 60
+
+    if delta_h:
+        strdelta += str(delta_h)
+        if delta_h > 1:
+            strdelta += ' hours '
+        else:
+            strdelta += ' hour '
+        if disprank is None:
+            disprank = 1
+    if delta_m and (disprank is None or disprank >= 1):
+        strdelta += str(delta_m)
+        if delta_m > 1:
+            strdelta += ' minutes '
+        else:
+            strdelta += ' minute '
+        if disprank is None:
+            disprank = 2
+    if delta_s and (disprank is None or disprank >= 2):
+        strdelta += str(delta_s)
+        if delta_s > 1:
+            strdelta += ' seconds '
+        else:
+            strdelta += ' second '
+        if disprank is None:
+            disprank = 3
+    if not strdelta:
+        strdelta = 'less than a second '
+    return strdelta
+
 def all_mmsi_generator():
     """
     Returns an array of all known strmmsi.
index 1f5409939c882af2fabea85a36b06f5c38b79fa4..310f4fff45470643a8db5601ea44e186682d3c38 100644 (file)
@@ -2,10 +2,11 @@
 
 import os, os.path
 from datetime import datetime
+from random import SystemRandom
 from django.db import models
 from django.contrib.auth.models import get_hexdigest
 
-from ais.common import Nmea, mmsi_to_strmmsi
+from ais.common import Nmea, mmsi_to_strmmsi, nice_timedelta_str
 
 class UserMessageCategory(models.Model):
     id = models.CharField(max_length=10, primary_key=True)
@@ -192,3 +193,34 @@ class News(models.Model):
     txt = models.TextField()
     class Meta:
         db_table = u'news'
+
+
+class Job(models.Model):
+    def make_unique_job_id():
+        def make_id():
+            rnd = SystemRandom()
+            source = u'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
+            result = u''
+            for i in range(8):
+                result += source[int(rnd.random()*len(source))]
+            return result
+        return make_id() # TODO check it's unique
+    
+    id = models.CharField(primary_key=True, max_length=8, default=make_unique_job_id)
+    user = models.ForeignKey(User)
+    queue_time = models.DateTimeField(auto_now_add=True)
+    start_time = models.DateTimeField(blank=True, null=True)
+    finish_time = models.DateTimeField(blank=True, null=True)
+    command = models.TextField()
+    pid = models.IntegerField(blank=True, null=True)
+    result = models.IntegerField(blank=True, null=True)
+
+    def queue_rank(self):
+        return Job.objects.filter(queue_time__lt=self.queue_time).filter(start_time__isnull=True).count()
+    def process_time(self):
+        dt = self.finish_time - self.start_time
+        return nice_timedelta_str(dt)
+
+    class Meta:
+        db_table = u'job'
+        ordering = ('queue_time',)
index 2e9a0072476fca448a9a871b7a122f1be1ba1be0..92cf239470b3917b6cee2d9bb18b95f4ecad80be 100644 (file)
@@ -29,6 +29,7 @@ urlpatterns = patterns('',
     (r'^user/add$', 'ais.djais.views.user_edit', {'login':None} ),
     (r'^user/(?P<login>[a-zA-Z0-9_]+)/change_password$', 'ais.djais.views.user_change_password'),
     (r'^user/(?P<login>[a-zA-Z0-9_]+)/delete$', 'ais.djais.views.user_delete'),
+    (r'^job/$', 'ais.djais.views.jobs_index'),
     (r'^source/$', 'ais.djais.views.sources_index'),
     (r'^source/stats$', 'ais.djais.views.sources_stats'),
     (r'^news/(?P<page>\d*)$', 'ais.djais.views.news'),
index cb5302a89f210245fd8b9dea958d43790c0bf621..62817cc8e284a072079d949dfe234f6b32ee33bd 100644 (file)
@@ -400,35 +400,61 @@ def vessel_history(request, strmmsi, format=None):
 
     date_end = datetime.utcnow()
     date_start = date_end - timedelta(0,period*period_type)
-    print date_start
     nmea_iterator = NmeaFeeder(strmmsi, date_end, date_start, granularity=grain*grain_type)
     
+    queue = request.REQUEST.get('queue', None)
+
     if format is None:
         format = request.REQUEST.get('format', u'track')
+
     if format == u'track':
-        value = kml_to_kmz(format_boat_track(nmea_iterator))
-        response = HttpResponse(value, mimetype="application/vnd.google-earth.kml")
-        response['Content-Disposition'] = 'attachment; filename=%s.kmz' % strmmsi
-        return response
+        if queue:
+            command = u'python -m ais.show_targets_ships --start=\'' + date_start.strftime('%Y%m%d %H%M%S') + u'\' --granularity=' + unicode(grain*grain_type) + ' --format=track '+ strmmsi
+            job = Job()
+            job.user = request.user
+            job.command = command
+            job.save()
+            request.user.info('Request queued as job %s' % job.id)
+        else:
+            value = kml_to_kmz(format_boat_track(nmea_iterator))
+            response = HttpResponse(value, mimetype="application/vnd.google-earth.kml")
+            response['Content-Disposition'] = 'attachment; filename=%s.kmz' % strmmsi
+            return response
 
     elif format == u'animation':
-        value = kml_to_kmz(format_boat_intime(nmea_iterator))
-        response = HttpResponse(value, mimetype="application/vnd.google-earth.kml")
-        response['Content-Disposition'] = 'attachment; filename=%s.kmz' % strmmsi
-        return response
+        if queue:
+            command = u'python -m ais.show_targets_ships --start=\'' + date_start.strftime('%Y%m%d %H%M%S') + u'\' --granularity=' + unicode(grain*grain_type) + ' --format=animation '+ strmmsi
+            job = Job()
+            job.user = request.user
+            job.command = command
+            job.save()
+            request.user.info('Request queued as job %s' % job.id)
+        else:
+            value = kml_to_kmz(format_boat_intime(nmea_iterator))
+            response = HttpResponse(value, mimetype="application/vnd.google-earth.kml")
+            response['Content-Disposition'] = 'attachment; filename=%s.kmz' % strmmsi
+            return response
 
     elif format == u'csv':
-        value = StringIO()
-        output = csv.writer(value)
-        output.writerow(Nmea.csv_headers)
-        for nmea in nmea_iterator:
-            output.writerow(nmea.get_dump_row())
-        response = HttpResponse(value.getvalue(), mimetype='text/csv; charset="UTF-8"')
-        response['Content-Disposition'] = 'attachment; filename=%s.csv' % strmmsi
-        return response
+        if queue:
+            command = u'python -m ais.common --start=\'' + date_start.strftime('%Y%m%d %H%M%S') + u'\' --granularity=' + unicode(grain*grain_type) + ' ' + strmmsi
+            job = Job()
+            job.user = request.user
+            job.command = command
+            job.save()
+            request.user.info('Request queued as job %s' % job.id)
+        else:
+            value = StringIO()
+            output = csv.writer(value)
+            output.writerow(Nmea.csv_headers)
+            for nmea in nmea_iterator:
+                output.writerow(nmea.get_dump_row())
+            response = HttpResponse(value.getvalue(), mimetype='text/csv; charset="UTF-8"')
+            response['Content-Disposition'] = 'attachment; filename=%s.csv' % strmmsi
+            return response
     else:
         request.user.error(u'Invalid archive format')
-        return HttpResponseRedirect('/vessel/%s/' % strmmsi)
+    return HttpResponseRedirect('/vessel/%s/' % strmmsi)
 
 
 @http_authenticate(auth, 'ais')
@@ -629,6 +655,10 @@ def fleet_lastpos(request, fleetname):
     return response
 
 
+@http_authenticate(auth, 'ais')
+def jobs_index(request):
+    return render_to_response('jobs.html', {}, RequestContext(request))
+
 @http_authenticate(auth, 'ais')
 def users(request):
     users = User.objects.order_by('name')
diff --git a/bin/jobrunner.py b/bin/jobrunner.py
new file mode 100755 (executable)
index 0000000..1bfe976
--- /dev/null
@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+
+import sys
+import logging
+import subprocess
+from ais.db import *
+
+def main():
+    from optparse import OptionParser
+
+    parser = OptionParser()
+    parser.add_option('-d', '--debug',
+        action='store_true', dest='debug', default=False,
+        help="debug mode")
+    options, args = parser.parse_args()
+
+    if args:
+        print >> sys.stderr, "That program doesn't take any argument"
+        sys.exit(1)
+
+    if options.debug:
+        loglevel = logging.DEBUG
+        sql_setdebug(True)
+    else:
+        loglevel = logging.INFO
+    logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
+
+    sqlexec(u'SELECT id, pid FROM job WHERE start_time IS NOT NULL AND finish_time IS NULL')
+    row = get_common_cursor().fetchone()
+    if row is not None:
+        logging.debug('Job %s is running: pid=%s', row[0], row[1])
+        sys.exit(0)
+
+    sqlexec(u'SELECT id, command FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
+    row = get_common_cursor().fetchone()
+    if row is None:
+        logging.debug('Queue is empty.')
+        sys.exit(0)
+
+    jobid, command = row
+    logging.info('Starting job %s: %s', jobid, command)
+
+    sqlexec(u'UPDATE job SET start_time=now() WHERE id=%(jobid)s', {'jobid': jobid})
+    dbcommit()
+
+    output = file('/var/lib/ais/jobs/'+unicode(jobid), 'wb')
+    p = subprocess.Popen(command, stdout=output, shell=True)
+    logging.debug('System process id = %s', p.pid)
+    sqlexec(u'UPDATE job SET pid=' + unicode(p.pid) + ' WHERE id=%(jobid)s', {'jobid': jobid})
+    dbcommit()
+    
+    returncode = p.wait()
+    sqlexec(u'UPDATE job SET pid=NULL, finish_time=now(), result=' + unicode(returncode) + ' WHERE id=%(jobid)s', {'jobid': jobid})
+    dbcommit()
+    logging.info('Job complete: result=%s', returncode)
+
+if __name__ == '__main__':
+    main()
index 5499ec4afb26465fe8d88847a4e8634e6a7ecbc2..caa6ee32f089f28e788fec8edd96dee380d4a63c 100644 (file)
@@ -17,6 +17,9 @@
         <a href='/vessel/'{% block tab_active_vessel %}{% endblock %} title="Look for some specific ships">vessels</a>
         <a href='/fleet/'{% block tab_active_fleet %}{% endblock %} title="These are groups of ships shared amongst small groups of users">fleets</a>
         <a href='/user/'{% block tab_active_user %}{% endblock %} title="Accounts to access that web site">users</a>
+        {% if user.job_set.count %}
+        <a href='/job/'{% block tab_active_job %}{% endblock %} title="Acynchronous jobs">jobs</a>
+        {% endif %}
         <a href='/source/'{% block tab_active_source %}{% endblock %} title="Real time statistics">sources</a>
     </div>
 </div>
diff --git a/html_templates/jobs.html b/html_templates/jobs.html
new file mode 100644 (file)
index 0000000..683671f
--- /dev/null
@@ -0,0 +1,36 @@
+{% extends "base.html" %}
+
+{% block tab_active_job %} id=tabactive{% endblock %}
+
+{% block breadcrumbs %}
+{{ block.super }}
+/ <a href="/job/">job</a>
+{% endblock %}
+
+{% block content %}
+
+{% if not user.job_set.all %}
+You don't have any job.
+{% endif %}
+<ul>
+{% for job in user.job_set.all %}
+<li>Job {{ job.id }}<br>
+Command:<br>
+<tt>{{ job.command }}</tt><br>
+{% if job.finish_time %}
+Status: <b>Complete</b> in {{ job.process_time }} <br>
+Result: {% if job.result %}Error {{ job.result }}{% else %}Success<br><a href="/job_result/{{ job.id }}" class=button>download</a>{% endif %}<br>
+{% else %}
+    {% if job.start_time %}
+    Status:<b>Running</b> since {{ job.start_time }}<br>
+    Pid: {{ job.pid }}<br>
+    {% else %}
+    Status: <b>Queued</b> since {{ job.queue_time }}<br>
+    Position in jobs queue: {{ job.queue_rank }}<br>
+    {% endif %}
+{% endif %}
+
+{% endfor %}
+</ul>
+
+{% endblock %}
index 94c97fbb096968f87dd192c7ea256de30e72b2bf..df76d7775f05e3a22f0fe499ad2824c8bf3c64c2 100644 (file)
@@ -58,6 +58,8 @@ One position every <input name=grain size=3 value=1><select name=grain_type>
 <option value=86400>day(s)</option>
 </select>
 <br>
+<input type=checkbox name=queue>Queue request (experimental/work in progress...)
+<br>
 <span id=csvhint style="display:none;">Make sure you select "Charset: UTF-8" and "Separated by: Coma" when you <a href="/oocalc_howto.png">choose import options</a>.<br></span>
 <input type=submit value=Get>
 </form>