3 from __future__ import division
15 'DAEMON_WAKEUP_ERROR',
18 SOCK_FILENAME = '/var/run/ais/jobrunner.wakeup'
19 RESULT_DIR = '/var/lib/ais/jobs/'
20 ARCHIVE_EXPIRE = '1 day' # postgres format
23 client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
25 client.connect(SOCK_FILENAME)
31 DAEMON_WAKEUP_ERROR = """
32 Your job has been queued, but there was an error contacting the task
33 scheduler. Please repport the error.
36 def jobid_ext_to_filename(jobid, friendly_filename):
37 extension = os.path.splitext(friendly_filename)[-1]
38 return RESULT_DIR + jobid + extension
41 sqlexec(u'SELECT id, pid, start_time, finish_time FROM job WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
43 row = get_common_cursor().fetchone()
46 logging.error('Startup: Job %s is supposed to be running: pid=%s, start_time=%s, finish_time=%s', *row)
49 sqlexec(u'UPDATE job SET pid=NULL, start_time=NULL WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
55 returns True is a job was waiting and has been processed
57 # remove jobs archived for more than 1 day
59 sqlexec(u"SELECT id, friendly_filename FROM job WHERE archive_time < now() - '%s'::interval" % ARCHIVE_EXPIRE)
61 row = get_common_cursor().fetchone()
64 filename = jobid_ext_to_filename(*row)
68 logging.error('unlinking %s: %s', filename, err)
69 deleted_jobs.append(row[0])
71 for jobid in deleted_jobs:
72 sqlexec(u'DELETE FROM job WHERE id=%(id)s', {'id':jobid})
75 sqlexec(u'SELECT id, command, friendly_filename, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
76 row = get_common_cursor().fetchone()
78 logging.debug('Queue is empty.')
81 jobid, command, friendly_filename, user_id = row
82 command = 'python -m ais.' + command
83 logging.info('Starting job %s: %s', jobid, command)
85 sqlexec(u'UPDATE job SET start_time=now() WHERE id=%(jobid)s', {'jobid': jobid})
88 filename = jobid_ext_to_filename(jobid, friendly_filename)
89 output = file(filename, 'wb')
91 p = subprocess.Popen(command, stdout=output, shell=True)
92 logging.debug('System process id = %s', p.pid)
93 sqlexec(u'UPDATE job SET pid=' + unicode(p.pid) + ' WHERE id=%(jobid)s', {'jobid': jobid})
97 sqlexec(u'UPDATE job SET pid=NULL, finish_time=now(), result=' + unicode(returncode) + ' WHERE id=%(jobid)s', {'jobid': jobid})
99 logging.info('Job complete: result=%s', returncode)
101 sqlexec(u'SELECT notify FROM job where id=%(jobid)s', {'jobid': jobid})
102 row = get_common_cursor().fetchone()
106 sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
107 sqlexec(u'UPDATE job SET notify=NULL WHERE id=%(jobid)s', {'jobid': jobid})
115 from optparse import OptionParser
117 parser = OptionParser()
118 parser.add_option('-d', '--debug',
119 action='store_true', dest='debug', default=False,
121 parser.add_option('--debug-sql',
122 action='store_true', dest='debug_sql', default=False,
123 help='prints sql statements. Implies --debug')
124 parser.add_option('--daemon',
125 action='store_true', dest='daemonize', default=False,
126 help='runs as a daemon')
127 options, args = parser.parse_args()
130 print >> sys.stderr, "That program doesn't take any argument"
133 if options.debug_sql:
138 loglevel = logging.DEBUG
140 loglevel = logging.INFO
141 logging.basicConfig(level=loglevel, format='%(asctime)s %(levelname)s %(message)s')
143 if options.daemonize:
145 from daemon import DaemonContext
147 from ais.daemon import DaemonContext
149 stderr = file('/var/log/ais/jobrunner', 'w+', 0664)
150 #ais_gid = grp.getgrnam('ais').gr_gid
151 dctx = DaemonContext(stdout=stderr, stderr=stderr, umask=002)
152 # todo: pidfile= with import lockfile (squeeze)
155 if os.path.exists(SOCK_FILENAME):
156 os.remove(SOCK_FILENAME)
157 server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
158 server.bind(SOCK_FILENAME)
159 os.chmod(SOCK_FILENAME, 0777) # anyone can wake up the daemon
161 #TODO: don't run startup_clean if another daemon is runing, that is using pid file
165 server.recv(1024) # blocks
167 if __name__ == '__main__':