The multiprocessing package also includes some APIs that are not in the threading module at all. For example, there is a neat Pool class that you can use to parallelize executing a function across multiple inputs.
Demon, join
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | import multiprocessing import logging import time import sys def daemon(): print('Starting:'+ multiprocessing.current_process().name) time.sleep(2) print('Exiting :'+ multiprocessing.current_process().name) def non_daemon(): print('Starting:'+ multiprocessing.current_process().name) print('Exiting :'+ multiprocessing.current_process().name) if __name__ == '__main__': multiprocessing.log_to_stderr(logging.DEBUG) d = multiprocessing.Process(name='daemon', target=daemon) d.daemon = True n = multiprocessing.Process(name='non-daemon', target=non_daemon) n.daemon = False d.start() time.sleep(1) n.start() #d.join() | cs |
[INFO/daemon] child process calling self.run()
Starting:daemon
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[INFO/MainProcess] calling terminate() for daemon daemon
[INFO/MainProcess] calling join() for process daemon
[INFO/MainProcess] calling join() for process non-daemon
[INFO/non-daemon] child process calling self.run()
Starting:non-daemon
Exiting :non-daemon
[INFO/non-daemon] process shutting down
[DEBUG/non-daemon] running all "atexit" finalizers with priority >= 0
[DEBUG/non-daemon] running the remaining "atexit" finalizers
[INFO/non-daemon] process exiting with exitcode 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
계속하려면 아무 키나 누르십시오 . . .
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | import multiprocessing import time class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue def run(self): proc_name = self.name while True: next_task = self.task_queue.get() if next_task is None: # Poison pill means shutdown print('%s: Exiting' % proc_name) self.task_queue.task_done() break print('%s: %s' % (proc_name, next_task)) answer = next_task() self.task_queue.task_done() self.result_queue.put(answer) return class Task(object): def __init__(self, a, b): self.a = a self.b = b def __call__(self): time.sleep(0.1) # pretend to take some time to do the work return '%s * %s = %s' % (self.a, self.b, self.a * self.b) def __str__(self): return '%s * %s' % (self.a, self.b) if __name__ == '__main__': # Establish communication queues tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() # Start consumers num_consumers = multiprocessing.cpu_count() * 2 print('Creating %d consumers' % num_consumers) consumers = [ Consumer(tasks, results) for i in range(num_consumers) ] for w in consumers: w.start() # Enqueue jobs num_jobs = 10 for i in range(num_jobs): tasks.put(Task(i, i)) # Add a poison pill for each consumer for i in range(num_consumers): tasks.put(None) # Wait for all of the tasks to finish tasks.join() # Start printing results while num_jobs: result = results.get() print('Result:', result) num_jobs -= 1 | cs |
Creating 8 consumers
Consumer-6: 0 * 0
Consumer-1: 1 * 1
Consumer-4: 2 * 2
Consumer-7: 3 * 3
Consumer-3: 4 * 4
Consumer-2: 5 * 5
Consumer-8: 6 * 6
Consumer-5: 7 * 7
Consumer-6: 8 * 8
Consumer-4: 9 * 9
Consumer-1: Exiting
Consumer-7: Exiting
Consumer-3: Exiting
Consumer-2: Exiting
Consumer-8: Exiting
Consumer-5: Exiting
Consumer-4: Exiting
Consumer-6: Exiting
Result: 2 * 2 = 4
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 3 * 3 = 9
Result: 4 * 4 = 16
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 7 * 7 = 49
Result: 9 * 9 = 81
Result: 8 * 8 = 64
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | import multiprocessing import time def stage_1(cond): """perform first stage of work, then notify stage_2 to continue""" name = multiprocessing.current_process().name print( 'Starting', name) with cond: print( '%s done and ready for stage 2' % name) cond.notify_all() def stage_2(cond): """wait for the condition telling us stage_1 is done""" name = multiprocessing.current_process().name print( 'Starting', name) with cond: cond.wait() print( '%s running' % name) if __name__ == '__main__': condition = multiprocessing.Condition() s1 = multiprocessing.Process(name='s1', target=stage_1, args=(condition,)) s2_clients = [ multiprocessing.Process(name='stage_2[%d]' % i, target=stage_2, args=(condition,)) for i in range(1, 3) ] for c in s2_clients: c.start() time.sleep(1) s1.start() s1.join() for c in s2_clients: c.join() | cs |
Starting stage_2[1]
Starting stage_2[2]
Starting s1
s1 done and ready for stage 2
stage_2[2] running
stage_2[1] running
'python' 카테고리의 다른 글
Share counter in processes (0) | 2018.03.05 |
---|---|
Mutiprocess Downloader (0) | 2018.03.05 |
파일 이름 가져오기 (0) | 2018.03.02 |
file download (0) | 2018.03.01 |
Blogspot 블로그 RSS (0) | 2018.02.28 |