multi.py 3.96 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
#!/usr/bin/env python

import time
import Queue
import traceback
from multiprocessing import Process, Manager, Pipe, RLock

from ansible.playbook.play import Play
from ansible.playbook.task import Task
from ansible.utils.debug import debug

NUM_WORKERS = 50
NUM_HOSTS   = 2500
NUM_TASKS   = 1

class Foo:
   def __init__(self, i, j):
      self._foo = "FOO_%05d_%05d" % (i, j)

   def __repr__(self):
      return self._foo

   def __getstate__(self):
      debug("pickling %s" % self._foo)
      return dict(foo=self._foo)

   def __setstate__(self, data):
      debug("unpickling...")
      self._foo = data.get('foo', "BAD PICKLE!")
      debug("unpickled %s" % self._foo)

def results(pipe, workers):
   cur_worker = 0
   def _read_worker_result(cur_worker):
      result = None
      starting_point = cur_worker
      while True:
         (worker_prc, main_pipe, res_pipe) = workers[cur_worker]
         cur_worker += 1
         if cur_worker >= len(workers):
            cur_worker = 0

         if res_pipe[1].poll(0.01):
            debug("worker %d has data to read" % cur_worker)
            result = res_pipe[1].recv()
            debug("got a result from worker %d: %s" % (cur_worker, result))
            break

         if cur_worker == starting_point:
            break

      return (result, cur_worker)

   while True:
      result = None
      try:
         (result, cur_worker) = _read_worker_result(cur_worker)
         if result is None:
            time.sleep(0.01)
            continue
         pipe.send(result)
62
      except (IOError, EOFError, KeyboardInterrupt) as e:
63 64
         debug("got a breaking error: %s" % e)
         break
65
      except Exception as e:
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
         debug("EXCEPTION DURING RESULTS PROCESSING: %s" % e)
         traceback.print_exc()
         break

def worker(main_pipe, res_pipe):
   while True:
      foo = None
      try:
         if main_pipe.poll(0.01):
            foo = main_pipe.recv()
            time.sleep(0.07)
            res_pipe.send(foo)
         else:
            time.sleep(0.01)
      except (IOError, EOFError, KeyboardInterrupt), e:
         debug("got a breaking error: %s" % e)
         break
      except Exception, e:
         debug("EXCEPTION DURING WORKER PROCESSING: %s" % e)
         traceback.print_exc()
         break

workers = []
for i in range(NUM_WORKERS):
   (main_p1, main_p2) = Pipe()
   (res_p1, res_p2)   = Pipe()
   worker_p = Process(target=worker, args=(main_p2, res_p1))
   worker_p.start()
   workers.append((worker_p, (main_p1, main_p2), (res_p1, res_p2)))

in_p, out_p = Pipe()
res_p = Process(target=results, args=(in_p, workers))
res_p.start()

def send_data(obj):
   global cur_worker
   global workers
   global pending_results

   (w_proc, main_pipe, res_pipe) = workers[cur_worker]
   cur_worker += 1
   if cur_worker >= len(workers):
      cur_worker = 0

   pending_results += 1
   main_pipe[0].send(obj)
 
def _process_pending_results():
   global out_p
   global pending_results
   
   try:
      #p_lock.acquire()
      while out_p.poll(0.01):
         result = out_p.recv()
         debug("got final result: %s" % (result,))
         pending_results -= 1
   finally:
      #p_lock.release()
      pass

def _wait_on_pending_results():
   global pending_results
   while pending_results > 0:
      debug("waiting for pending results (%d left)" % pending_results)
      _process_pending_results()
      time.sleep(0.01)


debug("starting")
cur_worker      = 0
pending_results = 0

sample_play = Play()
for i in range(NUM_TASKS):
   for j in range(NUM_HOSTS):
      debug("queuing %d, %d" % (i, j))
      send_data(Task().load(dict(name="task %d %d" % (i,j), ping=""), sample_play))
      debug("done queuing %d, %d" % (i, j))
      _process_pending_results()
   debug("waiting for the results to drain...")
   _wait_on_pending_results()

in_p.close()
out_p.close()
res_p.terminate()

for (w_p, main_pipe, res_pipe) in workers:
   res_pipe[1].close()
   res_pipe[0].close()
   main_pipe[1].close()
   main_pipe[0].close()
   w_p.terminate()

debug("done")