import logging
import subprocess
import socket
+from random import SystemRandom
from ais.db import *
+from ais.djais.settings import AIS_BASE_URL, NOTIFICATION_EMAIL
+
__all__ = [ \
'wakeup_daemon',
'DAEMON_WAKEUP_ERROR',
+ 'make_unique_job_id',
]
SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
RESULT_DIR = '/var/lib/ais/jobs/'
-ARCHIVE_EXPIRE = '1 day' # postgres format
+ARCHIVE_EXPIRE = '1 day' # postgres interval format
def wakeup_daemon():
client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
return False
DAEMON_WAKEUP_ERROR = """
-Your job has been queued, but there was an error contacting the task
+Your job has been queued, but there was an error contacting the job
scheduler. Please repport the error.
"""
+def make_unique_job_id():
+ def make_id():
+ rnd = SystemRandom()
+ source = u'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
+ result = u''
+ for i in range(8):
+ result += source[int(rnd.random()*len(source))]
+ return result
+ return make_id() # TODO check it's unique
+
+
+def addjob(user_id, command, friendly_filename, notify=None):
+ jobid = make_unique_job_id()
+ sqlexec(u'INSERT INTO job (id, user_id, command, friendly_filename, notify) VALUES (%(id)s, %(user_id)s, %(cmd)s, %(friendly_filename)s, %(notify)s)', {
+ 'id': jobid,
+ 'user_id': user_id,
+ 'cmd': command,
+ 'friendly_filename': friendly_filename,
+ 'notify': notify})
+ dbcommit()
+ wakeup_daemon()
+ return jobid
+
+
def jobid_ext_to_filename(jobid, friendly_filename):
extension = os.path.splitext(friendly_filename)[-1]
return RESULT_DIR + jobid + extension
dbcommit()
+def sendmail(fromaddr, toaddr, subject, message):
+ import smtplib
+ server = smtplib.SMTP('localhost')
+ server.set_debuglevel(1)
+ message = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n%s" \
+ % (fromaddr, toaddr, subject, message)
+ try:
+ server.sendmail(fromaddr, toaddr, message)
+ except:
+ logging.error('Error sending notification to %s', toaddr)
+ server.quit()
+
+
def runjob():
"""
- returns True is a job was waiting and has been processed
+ returns True if a job was waiting and has been processed
"""
# remove jobs archived for more than 1 day
deleted_jobs = []
sqlexec(u'SELECT id, command, friendly_filename, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
row = get_common_cursor().fetchone()
if row is None:
+ dbcommit() # Do not leave a transaction open
logging.debug('Queue is empty.')
return False
row = get_common_cursor().fetchone()
if row:
notify = row[0]
- if notify == u'W':
+ if notify == u'W': # Web
sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
sqlexec(u'UPDATE job SET notify=NULL WHERE id=%(jobid)s', {'jobid': jobid})
dbcommit()
- # else SMS, Mail ...
+
+ elif notify == u'M': # Notification
+ sqlexec('SELECT email FROM "user" WHERE id=%(user_id)s', {'user_id':user_id})
+ row = get_common_cursor().fetchone()
+ email = None
+ if row is not None:
+ email = row[0]
+ if not email:
+ logging.error("Can't find email of user %s", user_id)
+ else:
+ sendmail(fromaddr=NOTIFICATION_EMAIL, toaddr=email, subject='Job complete', message='Your job is complete. You can download it at %sjob/' % AIS_BASE_URL)
+ # else SMS...
return True
+def daemon(nice=0, foreground=False):
+ # be nice
+ os.nice(nice)
+
+ if not foreground:
+ try:
+ from daemon import DaemonContext
+ except ImportError:
+ from ais.daemon import DaemonContext
+
+ stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
+ #ais_gid = grp.getgrnam('ais').gr_gid
+ dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
+ # todo: pidfile= with import lockfile (squeeze)
+ dctx.open()
+
+ if os.path.exists(SOCK_FILENAME):
+ os.remove(SOCK_FILENAME)
+ server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ server.bind(SOCK_FILENAME)
+ os.chmod(SOCK_FILENAME, 0777) # anyone can wake up the daemon
+ #TODO: set receive queue size to 0 or 1 byte
+
+ #TODO: don't run startup_clean if another daemon is runing, that is using pid file
+ startup_clean()
+
+ while True:
+ if not runjob():
+ server.recv(1024) # blocks
+
+
def main():
from optparse import OptionParser
- parser = OptionParser()
- parser.add_option('--daemon',
- action='store_true', dest='daemonize', default=False,
- help='runs as a daemon')
+ parser = OptionParser(usage='%prog [options] [userid command friendlyname]')
+ parser.add_option('--foreground',
+ action='store_true', dest='foreground', default=False,
+ help='runs daemon in foreground.')
parser.add_option('--nice',
action='store', type='int', dest='nice', default=5,
help='set scheduling nice value. Default = %default')
parser.add_option('--debug-sql',
action='store_true', dest='debug_sql', default=False,
help='prints sql statements. Implies --debug')
+
+ parser.add_option('--notification-code',
+ action='store', dest='notify', default='W',
+ help='Job complete notification method: W=web M=mail')
options, args = parser.parse_args()
-
- if args:
- print >> sys.stderr, "That program doesn't take any argument"
- sys.exit(1)
-
+
if options.debug_sql:
sql_setdebug(True)
options.debug = True
loglevel = logging.INFO
logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
- # be nice
- os.nice(options.nice)
-
- if options.daemonize:
- try:
- from daemon import DaemonContext
- except ImportError:
- from ais.daemon import DaemonContext
+ if args:
+ assert len(args)==3, 'There must be either no arguments (daemon) or 3 of them'
+ user_id = int(args[0])
+ command = args[1]
+ friendly_name = args[2]
+ jobid = addjob(user_id, command, friendly_name, notify=options.notify)
+ print >> sys.stderr, 'Job', jobid, 'queued'
+ sys.exit(0)
- stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
- #ais_gid = grp.getgrnam('ais').gr_gid
- dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
- # todo: pidfile= with import lockfile (squeeze)
- dctx.open()
- if os.path.exists(SOCK_FILENAME):
- os.remove(SOCK_FILENAME)
- server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
- server.bind(SOCK_FILENAME)
- os.chmod(SOCK_FILENAME, 0777) # anyone can wake up the daemon
-
- #TODO: don't run startup_clean if another daemon is runing, that is using pid file
- startup_clean()
- while True:
- if not runjob():
- server.recv(1024) # blocks
+ daemon(foreground = options.foreground)
if __name__ == '__main__':
main()