from myghty.container import * import random, time, weakref, sys, re import myghty.buffer import testbase import unittest, sys # container test - # tests the container's get_value() function mostly, to insure # that items are recreated when expired, and that create function # is called exactly once per expiration try: import thread except: raise "this test requires a thread-enabled python" class item: def __init__(self, id): self.id = id def __str__(self): return "item id %d" % self.id def test_item(self): return True class context(ContainerContext): pass #def __init__(self): # ContainerContext.__init__(self, log_file = myghty.buffer.LogFormatter(myghty.buffer.LinePrinter(sys.stdout), "test", id_threads = True)) # keep running indicator running = False starttime = time.time() # creation func entrance detector to detect non-synchronized access # to the create function baton = None context = context() def create(id, delay = 0): global baton if baton is not None: raise "baton is not none , ident " + repr(baton) + " this thread " + repr(thread.get_ident()) baton = thread.get_ident() try: i = item(id) time.sleep(delay) global totalcreates totalcreates += 1 return i finally: baton = None def test(cclass, id, statusdict, expiretime, delay, params): print "create thread %d starting" % id statusdict[id] = True try: container = cclass(context = context, namespace = 'test', key = 'test', createfunc = lambda: create(id, delay), expiretime = expiretime, data_dir='./cache', starttime = starttime, **params) global running global totalgets try: while running: item = container.get_value() if not item.test_item(): raise "item did not test" item = None totalgets += 1 time.sleep(random.random() * .00001) except: e = sys.exc_info()[0] running = False print e raise finally: print "create thread %d exiting" % id statusdict[id] = False def runtest(cclass, totaltime, expiretime, delay, nthreads=20, **params): statusdict = {} global totalcreates totalcreates = 0 global totalgets totalgets = 0 container = cclass(context = context, namespace = 'test', key = 'test', createfunc = lambda: create(id, delay), expiretime = expiretime, data_dir='./cache', starttime = starttime, **params) container.clear_value() global running running = True for t in range(1, nthreads): thread.start_new_thread(test, (cclass, t, statusdict, expiretime, delay, params)) time.sleep(totaltime) failed = not running running = False pause = True while pause: time.sleep(1) pause = False for v in statusdict.values(): if v: pause = True break if failed: raise "test failed" print "total object creates %d" % totalcreates print "total object gets %d" % totalgets class ContainerTest(testbase.MyghtyTest): def _runtest(self, cclass, totaltime, expiretime, delay, **params): print "\ntesting %s for %d secs with expiretime %s delay %d" % ( cclass, totaltime, expiretime, delay) runtest(cclass, totaltime, expiretime, delay, **params) if expiretime is None: self.assert_(totalcreates == 1) else: self.assert_(abs(totaltime / expiretime - totalcreates) <= 2) def testMemoryContainer(self, totaltime=10, expiretime=None, delay=0): self._runtest(container_registry('memory', 'Container'), totaltime, expiretime, delay) def testMemoryContainer2(self): self.testMemoryContainer(expiretime=2) def testMemoryContainer3(self): self.testMemoryContainer(expiretime=5, delay=2) def testDbmContainer(self, totaltime=10, expiretime=None, delay=0, **kw): self._runtest(container_registry('dbm', 'Container'), totaltime, expiretime, delay, **kw) def testDbmContainer2(self): self.testDbmContainer(expiretime=2, nthreads=8) def testDbmContainer3(self): self.testDbmContainer(expiretime=5, delay=2) if __name__ == "__main__": unittest.main()