3 from __future__ import division
11 from random import SystemRandom
14 from ais.djais.settings import AIS_BASE_URL, NOTIFICATION_EMAIL
18 'DAEMON_WAKEUP_ERROR',
22 SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
23 RESULT_DIR = '/var/lib/ais/jobs/'
24 ARCHIVE_EXPIRE = '1 day' # postgres interval format
27 client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
29 client.connect(SOCK_FILENAME)
35 DAEMON_WAKEUP_ERROR = """
36 Your job has been queued, but there was an error contacting the job
37 scheduler. Please repport the error.
40 def make_unique_job_id():
43 source = u'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
46 result += source[int(rnd.random()*len(source))]
48 return make_id() # TODO check it's unique
51 def addjob(user_id, command, friendly_filename, notify=None):
52 jobid = make_unique_job_id()
53 sqlexec(u'INSERT INTO job (id, user_id, command, friendly_filename, notify) VALUES (%(id)s, %(user_id)s, %(cmd)s, %(friendly_filename)s, %(notify)s)', {
57 'friendly_filename': friendly_filename,
64 def jobid_ext_to_filename(jobid, friendly_filename):
65 extension = os.path.splitext(friendly_filename)[-1]
66 return RESULT_DIR + jobid + extension
69 sqlexec(u'SELECT id, pid, start_time, finish_time FROM job WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
71 row = get_common_cursor().fetchone()
74 logging.error('Startup: Job %s is supposed to be running: pid=%s, start_time=%s, finish_time=%s', *row)
77 sqlexec(u'UPDATE job SET pid=NULL, start_time=NULL WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
81 def sendmail(fromaddr, toaddr, subject, message):
83 server = smtplib.SMTP('localhost')
84 server.set_debuglevel(1)
85 message = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n%s" \
86 % (fromaddr, toaddr, subject, message)
88 server.sendmail(fromaddr, toaddr, message)
90 logging.error('Error sending notification to %s', toaddr)
96 returns True if a job was waiting and has been processed
98 # remove jobs archived for more than 1 day
100 sqlexec(u"SELECT id, friendly_filename FROM job WHERE archive_time < now() AT TIME ZONE 'GMT' - '%s'::interval" % ARCHIVE_EXPIRE)
102 row = get_common_cursor().fetchone()
105 filename = jobid_ext_to_filename(*row)
109 logging.error('unlinking %s: %s', filename, err)
110 deleted_jobs.append(row[0])
112 for jobid in deleted_jobs:
113 sqlexec(u'DELETE FROM job WHERE id=%(id)s', {'id':jobid})
116 sqlexec(u'SELECT id, command, friendly_filename, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
117 row = get_common_cursor().fetchone()
119 dbcommit() # Do not leave a transaction open
120 logging.debug('Queue is empty.')
123 jobid, command, friendly_filename, user_id = row
124 command = 'python -m ais.' + command
125 logging.info('Starting job %s: %s', jobid, command)
127 sqlexec(u"UPDATE job SET start_time=now() AT TIME ZONE 'GMT' WHERE id=%(jobid)s", {'jobid': jobid})
130 filename = jobid_ext_to_filename(jobid, friendly_filename)
131 output = file(filename, 'wb')
133 p = subprocess.Popen(command, stdout=output, shell=True)
134 logging.debug('System process id = %s', p.pid)
135 sqlexec(u'UPDATE job SET pid=' + unicode(p.pid) + ' WHERE id=%(jobid)s', {'jobid': jobid})
138 returncode = p.wait()
139 sqlexec(u"UPDATE job SET pid=NULL, finish_time=now() AT TIME ZONE 'GMT', result=" + unicode(returncode) + " WHERE id=%(jobid)s", {'jobid': jobid})
141 logging.info('Job complete: result=%s', returncode)
143 sqlexec(u'SELECT notify FROM job where id=%(jobid)s', {'jobid': jobid})
144 row = get_common_cursor().fetchone()
147 if notify == u'W': # Web
148 sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
149 sqlexec(u'UPDATE job SET notify=NULL WHERE id=%(jobid)s', {'jobid': jobid})
152 elif notify == u'M': # Notification
153 sqlexec('SELECT email FROM "user" WHERE id=%(user_id)s', {'user_id':user_id})
154 row = get_common_cursor().fetchone()
159 logging.error("Can't find email of user %s", user_id)
161 sendmail(fromaddr=NOTIFICATION_EMAIL, toaddr=email, subject='Job complete', message='Your job is complete. You can download it at %s/job/' % AIS_BASE_URL)
167 def daemon(nice=0, foreground=False):
173 from daemon import DaemonContext
175 from ais.daemon import DaemonContext
177 stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
178 #ais_gid = grp.getgrnam('ais').gr_gid
179 dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
180 # todo: pidfile= with import lockfile (squeeze)
183 if os.path.exists(SOCK_FILENAME):
184 os.remove(SOCK_FILENAME)
185 server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
186 server.bind(SOCK_FILENAME)
187 os.chmod(SOCK_FILENAME, 0777) # anyone can wake up the daemon
188 #TODO: set receive queue size to 0 or 1 byte
190 #TODO: don't run startup_clean if another daemon is runing, that is using pid file
195 server.recv(1024) # blocks
199 from optparse import OptionParser
201 parser = OptionParser(usage='%prog [options] [userid command friendlyname]')
202 parser.add_option('--foreground',
203 action='store_true', dest='foreground', default=False,
204 help='runs daemon in foreground.')
205 parser.add_option('--nice',
206 action='store', type='int', dest='nice', default=5,
207 help='set scheduling nice value. Default = %default')
208 parser.add_option('-d', '--debug',
209 action='store_true', dest='debug', default=False,
211 parser.add_option('--debug-sql',
212 action='store_true', dest='debug_sql', default=False,
213 help='prints sql statements. Implies --debug')
215 parser.add_option('--notification-code',
216 action='store', dest='notify', default='W',
217 help='Job complete notification method: W=web M=mail')
218 options, args = parser.parse_args()
220 if options.debug_sql:
225 loglevel = logging.DEBUG
227 loglevel = logging.INFO
228 logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
231 assert len(args)==3, 'There must be either no arguments (daemon) or 3 of them'
232 user_id = int(args[0])
234 friendly_name = args[2]
235 jobid = addjob(user_id, command, friendly_name, notify=options.notify)
236 print >> sys.stderr, 'Job', jobid, 'queued'
240 daemon(foreground = options.foreground)
242 if __name__ == '__main__':