python

Multiprocessing

장곰부대 2018. 3. 4. 00:02

The multiprocessing package also includes some APIs that are not in the threading module at all. For example, there is a neat Pool class that you can use to parallelize executing a function across multiple inputs.



Demon, join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import multiprocessing
import logging
import time
import sys
 
def daemon():
    print('Starting:'+ multiprocessing.current_process().name)
    time.sleep(2)
    print('Exiting :'+ multiprocessing.current_process().name)
 
def non_daemon():
    print('Starting:'+ multiprocessing.current_process().name)
    print('Exiting :'+ multiprocessing.current_process().name)
 
if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)
    d = multiprocessing.Process(name='daemon', target=daemon)
    d.daemon = True
 
    n = multiprocessing.Process(name='non-daemon', target=non_daemon)
    n.daemon = False
 
    d.start()
    time.sleep(1)
    n.start()
 
    #d.join()
 
cs


[INFO/daemon] child process calling self.run()

Starting:daemon

[INFO/MainProcess] process shutting down

[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0

[INFO/MainProcess] calling terminate() for daemon daemon

[INFO/MainProcess] calling join() for process daemon

[INFO/MainProcess] calling join() for process non-daemon

[INFO/non-daemon] child process calling self.run()

Starting:non-daemon

Exiting :non-daemon

[INFO/non-daemon] process shutting down

[DEBUG/non-daemon] running all "atexit" finalizers with priority >= 0

[DEBUG/non-daemon] running the remaining "atexit" finalizers

[INFO/non-daemon] process exiting with exitcode 0

[DEBUG/MainProcess] running the remaining "atexit" finalizers

계속하려면 아무 키나 누르십시오 . . .



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import multiprocessing
import time
 
class Consumer(multiprocessing.Process):
    
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
 
    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print('%s: Exiting' % proc_name)
                self.task_queue.task_done()
                break
            print('%s: %s' % (proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return
 
 
class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1# pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)
 
 
if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    
    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
 
    print('Creating %d consumers' % num_consumers)
    consumers = [ Consumer(tasks, results)
                  for i in range(num_consumers) ]
    for w in consumers:
        w.start()
    
    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))
    
    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)
 
    # Wait for all of the tasks to finish
    tasks.join()
    
    # Start printing results
    while num_jobs:
        result = results.get()
        print('Result:', result)
        num_jobs -= 1
cs

Creating 8 consumers

Consumer-6: 0 * 0

Consumer-1: 1 * 1

Consumer-4: 2 * 2

Consumer-7: 3 * 3

Consumer-3: 4 * 4

Consumer-2: 5 * 5

Consumer-8: 6 * 6

Consumer-5: 7 * 7

Consumer-6: 8 * 8

Consumer-4: 9 * 9

Consumer-1: Exiting

Consumer-7: Exiting

Consumer-3: Exiting

Consumer-2: Exiting

Consumer-8: Exiting

Consumer-5: Exiting

Consumer-4: Exiting

Consumer-6: Exiting

Result: 2 * 2 = 4

Result: 0 * 0 = 0

Result: 1 * 1 = 1

Result: 3 * 3 = 9

Result: 4 * 4 = 16

Result: 5 * 5 = 25

Result: 6 * 6 = 36

Result: 7 * 7 = 49

Result: 9 * 9 = 81

Result: 8 * 8 = 64




1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import multiprocessing
import time
 
def stage_1(cond):
    """perform first stage of work, then notify stage_2 to continue"""
    name = multiprocessing.current_process().name
    print'Starting', name)
    with cond:
        print'%s done and ready for stage 2' % name)
        cond.notify_all()
 
def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print'Starting', name)
    with cond:
        cond.wait()
        print'%s running' % name)
 
if __name__ == '__main__':
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name='s1', target=stage_1, args=(condition,))
    s2_clients = [
        multiprocessing.Process(name='stage_2[%d]' % i, target=stage_2, args=(condition,))
        for i in range(13)
        ]
 
    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()
 
    s1.join()
    for c in s2_clients:
        c.join()
cs

Starting stage_2[1]

Starting stage_2[2]

Starting s1

s1 done and ready for stage 2

stage_2[2] running

stage_2[1] running