Clean up jobs recorded as "running" at startup.
authorJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Thu, 11 Nov 2010 01:57:23 +0000 (01:57 +0000)
committerJean-Michel Nirgal Vourgère <jmv@nirgal.com>
Thu, 11 Nov 2010 01:57:23 +0000 (01:57 +0000)
bin/jobrunner.py

index bb5aab39db7684f893f9a37b51658a998500ff99..c0e7ccd717c59f3469265ca19f13e40ba2110c0b 100755 (executable)
@@ -30,28 +30,32 @@ Your job has been queued, but there was an error contacting the task
 scheduler. Please repport the error.
 """
 
+def startup_clean():
+    sqlexec(u'SELECT id, pid, start_time, finish_time FROM job WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
+    while True:
+        row = get_common_cursor().fetchone()
+        if not row:
+            break
+        logging.error('Startup: Job %s is supposed to be running: pid=%s, start_time=%s, finish_time=%s', *row)
+        # TODO: kill pid ?
+    
+    sqlexec(u'UPDATE job SET pid=NULL, start_time=NULL WHERE pid IS NOT NULL OR (start_time IS NOT NULL AND finish_time IS NULL)')
+    dbcommit()
+
 def runjob():
     """
-    returns 0 if no job was waiting
-    returns 1 if a job was already runing
-    returns 2 when a job has been processed
+    returns True is a job was waiting and has been processed
     """
     # remove jobs archived for more than 1 day
     # TODO: delete kmz/csv files too
     sqlexec(u"DELETE FROM job WHERE archive_time < now() - '1 day'::interval")
     dbcommit()
 
-    sqlexec(u'SELECT id, pid FROM job WHERE start_time IS NOT NULL AND finish_time IS NULL')
-    row = get_common_cursor().fetchone()
-    if row is not None:
-        logging.debug('Job %s is running: pid=%s', row[0], row[1])
-        return 1
-
     sqlexec(u'SELECT id, command, friendly_filename, user_id FROM job WHERE start_time IS NULL ORDER BY queue_time LIMIT 1')
     row = get_common_cursor().fetchone()
     if row is None:
         logging.debug('Queue is empty.')
-        return 0
+        return False
 
     jobid, command, friendly_filename, user_id = row
     logging.info('Starting job %s: %s', jobid, command)
@@ -74,7 +78,7 @@ def runjob():
     sqlexec(u"INSERT INTO user_message (user_id, user_message_category_id, txt) VALUES(%(user_id)s, 'info', %(msg)s)", {'user_id':user_id, 'msg':('Your <a href="/job/%(jobid)s/download">job %(jobid)s</a> is complete.' % {'jobid': jobid}) })
     dbcommit()
 
-    return 2
+    return True
 
 
 def main():
@@ -122,16 +126,13 @@ def main():
         os.remove(SOCK_FILENAME)
     server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
     server.bind(SOCK_FILENAME)
-    os.chmod(SOCK_FILENAME, 0777)
-
+    os.chmod(SOCK_FILENAME, 0777) # anyone can wake up the daemon
+    
+    #TODO: don't run startup_clean if another daemon is runing, that is using pid file
+    startup_clean()
     while True:
-        r = runjob()
-        if r == 0:
+        if not runjob():
             server.recv(1024) # blocks
-        elif r == 1:
-            logging.error('Allready running?')
-            sys.exit(1)
-        # else loop now
 
 if __name__ == '__main__':
     main()