Python的多进程

date
Mar 26, 2021
slug
22
status
Published
tags
Python
summary
type
Post

Process 类

API和threading差不多, 进程id通过os模块获取
from multiprocessing import Process

import os

def info(title):

    print(title)

    print('module name:', __name__)

    print('parent process:', os.getppid())

    print('process id:', os.getpid())

def f(name):

    info('function f')

    print('hello', name)

if __name__ == '__main__':

    info('main line')

    p = Process(target=f, args=('bob',))

    p.start()

    p.join()


启动进程的三种方式

  • spawn -- 如无必要, 资源不继承
  • fork -- 写时复制, 同Linux fork
  • forkserver -- 启动一个服务器进程, 由这个服务器进程fork出新进程.
在 Unix 上通过 spawn 和 forkserver 方式启动多进程会同时启动一个 资源追踪 进程,负责追踪当前程序的进程产生的、并且不再被使用的命名系统资源(如命名信号量以及 SharedMemory 对象)。当所有进程退出后,资源追踪会负责释放这些仍被追踪的的对象。通常情况下是不会有这种对象的,但是假如一个子进程被某个信号杀死,就可能存在这一类资源的“泄露”情况。(泄露的信号量以及共享内存不会被释放,直到下一次系统重启,对于这两类资源来说,这是一个比较大的问题,因为操作系统允许的命名信号量的数量是有限的,而共享内存也会占据主内存的一片空间)
简单来说,unix系统使用spawn 和 forkserver 可能导致内存泄漏.

通过set_start_method选择更换启动方法

但是不允许调用多次.
import multiprocessing as mp

def foo(q):

    q.put('hello')

if __name__ == '__main__':

    mp.set_start_method('spawn')
 # set_start_method
    q = mp.Queue()

    p = mp.Process(target=foo, args=(q,))

    p.start()

    print(q.get())

    p.join()


通过get_context() 选择更换启动方法

允许使用多次.
但是关联到不同上下文的对象和进程之前可能不兼容. 譬如,使用 fork 上下文创建的锁不能传递给使用 spawnforkserver 启动方法启动的进程
推荐使用此方法,因为不影响库的使用.
import multiprocessing as mp

def foo(q):

    q.put('hello')

if __name__ == '__main__':

    ctx = mp.get_context('spawn')

    q = ctx.Queue()

    p = ctx.Process(target=foo, args=(q,))

    p.start()

    print(q.get())

    p.join()


进程通信

队列

from multiprocessing import Process, Queue

def f(q):

    q.put([42, None, 'hello'])

if __name__ == '__main__':

    q = Queue()

    p = Process(target=f, args=(q,))

    p.start()

    print(q.get())    # prints "[42, None, 'hello']"

    p.join()


管道

虽然是双工的,但是同时读写同一端,是危险的.
from multiprocessing import Process, Pipe

def f(conn):

    conn.send([42, None, 'hello'])

    conn.close()

if __name__ == '__main__':

    receive, send = Pipe()

    p = Process(target=f, args=(send,))

    p.start()

    print(receive.recv())   # prints "[42, None, 'hello']"

    p.join()


共享内存


from multiprocessing import Process, Value, Array


def f(n, a):

    n.value = 3.1415927

    for i in range(len(a)):

        a[i] = -a[i]

if __name__ == '__main__':

    num = Value('d', 0.0)

    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))

    p.start()

    p.join()

    print(num.value)

    print(arr[:])


© chaleaoch 2021