Source code for galaxy.util.heartbeat

import threading
import time
import traceback
import os
import sys

[docs]def get_current_thread_object_dict(): """ Get a dictionary of all 'Thread' objects created via the threading module keyed by thread_id. Note that not all interpreter threads have a thread objects, only the main thread and any created via the 'threading' module. Threads created via the low level 'thread' module will not be in the returned dictionary. HACK: This mucks with the internals of the threading module since that module does not expose any way to match 'Thread' objects with intepreter thread identifiers (though it should). """ rval = dict() # Acquire the lock and then union the contents of 'active' and 'limbo' # threads into the return value. threading._active_limbo_lock.acquire() rval.update( threading._active ) rval.update( threading._limbo ) threading._active_limbo_lock.release() return rval
[docs]class Heartbeat( threading.Thread ): """ Thread that periodically dumps the state of all threads to a file """ def __init__( self, name="Heartbeat Thread", period=20, fname="heartbeat.log" ): threading.Thread.__init__( self, name=name ) self.should_stop = False self.period = period self.fname = fname self.file = None self.fname_nonsleeping = fname + ".nonsleeping" self.file_nonsleeping = None self.nonsleeping_heartbeats = { } # Save process id self.pid = os.getpid() # Event to wait on when sleeping, allows us to interrupt for shutdown self.wait_event = threading.Event()
[docs] def run( self ): self.file = open( self.fname, "a" ) print >> self.file, "Heartbeat for pid %d thread started at %s" % ( self.pid, time.asctime() ) print >> self.file self.file_nonsleeping = open ( self.fname_nonsleeping, "a" ) print >> self.file_nonsleeping, "Non-Sleeping-threads for pid %d thread started at %s" % ( self.pid, time.asctime() ) print >> self.file_nonsleeping try: while not self.should_stop: # Print separator with timestamp print >> self.file, "Traceback dump for all threads at %s:" % time.asctime() print >> self.file # Print the thread states threads = get_current_thread_object_dict() for thread_id, frame in sys._current_frames().iteritems(): if thread_id in threads: object = repr( threads[thread_id] ) else: object = "<No Thread object>" print >> self.file, "Thread %s, %s:" % ( thread_id, object ) print >> self.file traceback.print_stack( frame, file=self.file ) print >> self.file print >> self.file, "End dump" print >> self.file self.file.flush() self.print_nonsleeping(threads) # Sleep for a bit self.wait_event.wait( self.period ) finally: print >> self.file, "Heartbeat for pid %d thread stopped at %s" % ( self.pid, time.asctime() ) print >> self.file # Cleanup self.file.close() self.file_nonsleeping.close()
[docs] def shutdown( self ): self.should_stop = True self.wait_event.set() self.join()
[docs] def thread_is_sleeping ( self, last_stack_frame ): """ Returns True if the given stack-frame represents a known sleeper function (at least in python 2.5) """ _filename = last_stack_frame[0] _line = last_stack_frame[1] _funcname = last_stack_frame[2] _text = last_stack_frame[3] ### Ugly hack to tell if a thread is supposedly sleeping or not ### These are the most common sleeping functions I've found. ### Is there a better way? (python interpreter internals?) ### Tested only with python 2.5 if _funcname=="wait" and _text=="waiter.acquire()": return True if _funcname=="wait" and _text=="_sleep(delay)": return True if _funcname=="accept" and _text[-14:]=="_sock.accept()": return True if _funcname=="monitor" and _text.startswith("time.sleep( ") and _text.endswith(" )"): return True if _funcname=="drain_events" and _text=="sleep(polling_interval)": return True #Ugly hack: always skip the heartbeat thread #TODO: get the current thread-id in python # skip heartbeat thread by thread-id, not by filename if _filename.find("/lib/galaxy/util/heartbeat.py")!=-1: return True ## By default, assume the thread is not sleeping return False
[docs] def get_interesting_stack_frame ( self, stack_frames ): """ Scans a given backtrace stack frames, returns a single quadraple of [filename, line, function-name, text] of the single, deepest, most interesting frame. Interesting being:: inside the galaxy source code ("/lib/galaxy"), prefreably not an egg. """ for _filename, _line, _funcname, _text in reversed(stack_frames): idx = _filename.find("/lib/galaxy/") if idx!=-1: relative_filename = _filename[idx:] return ( relative_filename, _line, _funcname, _text ) # no "/lib/galaxy" code found, return the innermost frame return stack_frames[-1]
[docs] def print_nonsleeping( self, threads_object_dict ): print >> self.file_nonsleeping, "Non-Sleeping threads at %s:" % time.asctime() print >> self.file_nonsleeping all_threads_are_sleeping = True threads = get_current_thread_object_dict() for thread_id, frame in sys._current_frames().iteritems(): if thread_id in threads: object = repr( threads[thread_id] ) else: object = "<No Thread object>" tb = traceback.extract_stack(frame) if self.thread_is_sleeping(tb[-1]): if thread_id in self.nonsleeping_heartbeats: del self.nonsleeping_heartbeats[thread_id] continue # Count non-sleeping thread heartbeats if thread_id in self.nonsleeping_heartbeats: self.nonsleeping_heartbeats[thread_id] += 1 else: self.nonsleeping_heartbeats[thread_id]=1 good_frame = self.get_interesting_stack_frame(tb) print >> self.file_nonsleeping, "Thread %s\t%s\tnon-sleeping for %d heartbeat(s)\n File %s:%d\n Function \"%s\"\n %s" % \ ( thread_id, object, self.nonsleeping_heartbeats[thread_id], good_frame[0], good_frame[1], good_frame[2], good_frame[3] ) all_threads_are_sleeping = False if all_threads_are_sleeping: print >> self.file_nonsleeping, "All threads are sleeping." print >> self.file_nonsleeping self.file_nonsleeping.flush()