Added support for email notification of finished jobs
authorJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Sat, 15 Jan 2011 10:56:38 +0000 (10:56 +0000)
committerJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Sat, 15 Jan 2011 10:56:38 +0000 (10:56 +0000)
Jobrunner default behaviour is now background
Jobrunner can now queue new jobs

bin/djais/models.py
bin/jobrunner.py
bin/proc.py
bin/rt.sh

index b8404460498064c405b195b010cf8737b61b3950..b26c1947d5cd554b69065dfc29995bd00fe7673d 100644 (file)
@@ -3,13 +3,13 @@
 from __future__ import division
 import os, os.path
 from datetime import datetime
-from random import SystemRandom
 from django.db import models
 from django.contrib.auth.models import get_hexdigest
 from django.utils import html
 
 from ais.common import Nmea, mmsi_to_strmmsi, nice_timedelta_str
 from ais import proc
+from ais.jobrunner import make_unique_job_id
 
 class UserMessageCategory(models.Model):
     id = models.CharField(max_length=10, primary_key=True)
@@ -199,16 +199,6 @@ class News(models.Model):
 
 
 class Job(models.Model):
-    def make_unique_job_id():
-        def make_id():
-            rnd = SystemRandom()
-            source = u'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
-            result = u''
-            for i in range(8):
-                result += source[int(rnd.random()*len(source))]
-            return result
-        return make_id() # TODO check it's unique
-    
     id = models.CharField(primary_key=True, max_length=8, default=make_unique_job_id)
     user = models.ForeignKey(User)
     queue_time = models.DateTimeField() #auto_now_add=True) is buggy, not GMT
index d9ed8096a2693cdf8daa96cd06fcbd5a68778099..5bc78d9285b88df0d3d3d70da5e043c11fe6388d 100755 (executable)
@@ -8,16 +8,20 @@ import time
 import logging
 import subprocess
 import socket
+from random import SystemRandom
 from ais.db import *
 
+from ais.djais.settings import AIS_BASE_URL, NOTIFICATION_EMAIL
+
 __all__ = [ \
     'wakeup_daemon',
     'DAEMON_WAKEUP_ERROR',
+    'make_unique_job_id',
     ]
 
 SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
 RESULT_DIR = '/var/lib/ais/jobs/'
-ARCHIVE_EXPIRE = '1 day' # postgres format
+ARCHIVE_EXPIRE = '1 day' # postgres interval format
 
 def wakeup_daemon():
     client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
@@ -29,10 +33,34 @@ def wakeup_daemon():
         return False
 
 DAEMON_WAKEUP_ERROR = """
-Your job has been queued, but there was an error contacting the task
+Your job has been queued, but there was an error contacting the job
 scheduler. Please repport the error.
 """
 
+def make_unique_job_id():
+    def make_id():
+        rnd = SystemRandom()
+        source = u'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
+        result = u''
+        for i in range(8):
+            result += source[int(rnd.random()*len(source))]
+        return result
+    return make_id() # TODO check it's unique
+
+
+def addjob(user_id, command, friendly_filename, notify=None):
+    jobid = make_unique_job_id()
+    sqlexec(u'INSERT INTO job (id, user_id, command, friendly_filename, notify) VALUES (%(id)s, %(user_id)s, %(cmd)s, %(friendly_filename)s, %(notify)s)', {
+        'id':                   jobid,
+        'user_id':              user_id,
+        'cmd':                  command,
+        'friendly_filename':    friendly_filename,
+        'notify':               notify})
+    dbcommit()
+    wakeup_daemon()
+    return jobid
+
+
 def jobid_ext_to_filename(jobid, friendly_filename):
     extension = os.path.splitext(friendly_filename)[-1]
     return RESULT_DIR + jobid + extension
@@ -50,9 +78,22 @@ def startup_clean():
     dbcommit()
 
 
+def sendmail(fromaddr, toaddr, subject, message):
+    import smtplib
+    server = smtplib.SMTP('localhost')
+    server.set_debuglevel(1)
+    message = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n%s" \
+       % (fromaddr, toaddr, subject, message)
+    try:
+        server.sendmail(fromaddr, toaddr, message)
+    except:
+        logging.error('Error sending notification to %s', toaddr)
+    server.quit()
+
+
 def runjob():
     """
-    returns True is a job was waiting and has been processed
+    returns True if a job was waiting and has been processed
     """
     # remove jobs archived for more than 1 day
     deleted_jobs = []
@@ -75,6 +116,7 @@ def runjob():
     sqlexec(u'SELECT id, command, friendly_filename, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
     row = get_common_cursor().fetchone()
     if row is None:
+        dbcommit() # Do not leave a transaction open
         logging.debug('Queue is empty.')
         return False
 
@@ -102,22 +144,64 @@ def runjob():
     row = get_common_cursor().fetchone()
     if row:
         notify = row[0]
-        if notify == u'W':
+        if notify == u'W': # Web
             sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
             sqlexec(u'UPDATE job SET notify=NULL WHERE id=%(jobid)s', {'jobid': jobid})
             dbcommit()
-        # else SMS, Mail ...
+
+        elif notify == u'M': # Notification
+            sqlexec('SELECT email FROM "user" WHERE id=%(user_id)s', {'user_id':user_id})
+            row = get_common_cursor().fetchone()
+            email = None
+            if row is not None:
+                email = row[0]
+            if not email:
+                logging.error("Can't find email of user %s", user_id)
+            else:
+                sendmail(fromaddr=NOTIFICATION_EMAIL, toaddr=email, subject='Job complete', message='Your job is complete. You can download it at %sjob/' % AIS_BASE_URL)
+        # else SMS...
 
     return True
 
 
+def daemon(nice=0, foreground=False):
+    # be nice
+    os.nice(nice)
+
+    if not foreground:
+        try:
+            from daemon import DaemonContext
+        except ImportError:
+            from ais.daemon import DaemonContext
+
+        stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
+        #ais_gid = grp.getgrnam('ais').gr_gid
+        dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
+        # todo: pidfile= with import lockfile (squeeze)
+        dctx.open()
+
+    if os.path.exists(SOCK_FILENAME):
+        os.remove(SOCK_FILENAME)
+    server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+    server.bind(SOCK_FILENAME)
+    os.chmod(SOCK_FILENAME, 0777) # anyone can wake up the daemon
+    #TODO: set receive queue size to 0 or 1 byte
+    
+    #TODO: don't run startup_clean if another daemon is runing, that is using pid file
+    startup_clean()
+
+    while True:
+        if not runjob():
+            server.recv(1024) # blocks
+
+    
 def main():
     from optparse import OptionParser
 
-    parser = OptionParser()
-    parser.add_option('--daemon',
-        action='store_true', dest='daemonize', default=False,
-        help='runs as a daemon')
+    parser = OptionParser(usage='%prog [options] [userid command friendlyname]')
+    parser.add_option('--foreground',
+        action='store_true', dest='foreground', default=False,
+        help='runs daemon in foreground.')
     parser.add_option('--nice',
         action='store', type='int', dest='nice', default=5,
         help='set scheduling nice value. Default = %default')
@@ -127,12 +211,12 @@ def main():
     parser.add_option('--debug-sql',
         action='store_true', dest='debug_sql', default=False,
         help='prints sql statements. Implies --debug')
+    
+    parser.add_option('--notification-code',
+        action='store', dest='notify', default='W',
+        help='Job complete notification method: W=web M=mail')
     options, args = parser.parse_args()
-
-    if args:
-        print >> sys.stderr, "That program doesn't take any argument"
-        sys.exit(1)
-
+    
     if options.debug_sql:
         sql_setdebug(True)
         options.debug = True
@@ -143,32 +227,17 @@ def main():
         loglevel = logging.INFO
     logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
 
-    # be nice
-    os.nice(options.nice)
-
-    if options.daemonize:
-        try:
-            from daemon import DaemonContext
-        except ImportError:
-            from ais.daemon import DaemonContext
+    if args:
+        assert len(args)==3, 'There must be either no arguments (daemon) or 3 of them'
+        user_id = int(args[0])
+        command = args[1]
+        friendly_name = args[2]
+        jobid = addjob(user_id, command, friendly_name, notify=options.notify)
+        print >> sys.stderr, 'Job', jobid, 'queued'
+        sys.exit(0)
 
-        stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
-        #ais_gid = grp.getgrnam('ais').gr_gid
-        dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
-        # todo: pidfile= with import lockfile (squeeze)
-        dctx.open()
 
-    if os.path.exists(SOCK_FILENAME):
-        os.remove(SOCK_FILENAME)
-    server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
-    server.bind(SOCK_FILENAME)
-    os.chmod(SOCK_FILENAME, 0777) # anyone can wake up the daemon
-    
-    #TODO: don't run startup_clean if another daemon is runing, that is using pid file
-    startup_clean()
-    while True:
-        if not runjob():
-            server.recv(1024) # blocks
+    daemon(foreground = options.foreground)
 
 if __name__ == '__main__':
     main()
index 6a51302a00d48e600ca86c3b80afc1193a704e56..4c4121b82c1a7be4e211d93578a7aaff67e2faf6 100644 (file)
@@ -61,9 +61,7 @@ __states__ = {
 class Stat(dict):
     def __init__(self, processid):
         dict.__init__(self)
-        strstats = file('/proc/%s/stat' % processid).read()
-        # TODO:
-        # "getconf CLK_TCK" = 100 -> 1 tick = 1/100 seconds
+        strstats = open('/proc/%s/stat' % processid).read()
         strstats = strstats.rstrip('\n').split(' ')
         for i, keycls in enumerate(__proc_keys__):
             key, cls = keycls
@@ -77,7 +75,7 @@ class Stat(dict):
         return "%s (%s)" % (state, nice_state)
     
     # TODO: 
-    # "getconf CLK_TCK" = 100 -> 1 tick = 1/100 seconds
+    # subprocess.call(['getconf', 'CLK_TCK']) = 100 -> 1 tick = 1/100 seconds
     def nice_utime(self):
         return self['utime'] / 100
 
index d804b176c33bda2f4913b4a75b6fadae2abe0259..f805247168df70c5dd8240a02e8b2f52f00c2077 100755 (executable)
--- a/bin/rt.sh
+++ b/bin/rt.sh
@@ -1,4 +1,4 @@
 #!/bin/bash
 set -x
 python -m ais.inputs.run --db --background
-python -m ais.jobrunner -d --daemon
+python -m ais.jobrunner --debug