python多进程

python多进程

python跨平台实现多进程需要使用multiprocessing这个模块,在模块中常用的两个类是process和pool

process类

process类用来描述一个进程对象。创建子进程的时候只需要传入一个执行函数和函数的参数就可以完成process实例的创建。

  • start() 方法启动进程

  • join() 方法实现进程之间的同步,等待所有进程退出

  • close()方法阻止多余的进程涌入进程池pool造成进程阻塞

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
     __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *,mon=None)
    | 初始化类,target指向函数名,name参数可以为该进程设置名称,args是为target指向的函数传入的参数,且为tuple类型的
    |
    | is_alive(self)
    | 返回这个进程是否alive
    |
    | join(self, timeout=None)
    | 等待,直到子进程执行完毕
    |
    | run(self)
    | Method to be run in sub-process; can be overridden in sub-class
    |
    | start(self)
    | 开启子进程
    |
    | terminate(self)
    | 结束进程
    | ----------------------------------------------------------------------
    | Data descriptors inherited from multiprocessing.process.BaseProcess:
    |
    | __dict__
    | dictionary for instance variables (if defined)
    |
    | __weakref__
    | list of weak references to the object (if defined)
    |
    | authkey
    |
    | daemon
    | 返回该进程是否为守护进程
    |
    | exitcode
    | 返回进程的退出代码
    |
    | ident
    | Return identifier (PID) of process or `None` if it has yet to start
    |
    | name
    |
    | pid
    | Return identifier (PID) of process or `None` if it has yet to start
  • process类使用实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import multiprocessing,os

    def run_proc(name):
    print('Child process {0} {1} Running'.format(name,os.getpid()))

    if __name__ == '__main__':
    print('Parent process {0} is Running'.format(os.getpid()))
    for i in range(5):
    p = multiprocessing.Process(target=run_proc,args=(str(i),))
    print('process start')
    p.start()
    p.join()
    print('process close')

    #这里创建多个进程可以使用for循环,也可以使用进程池,在使用for循环的时候注意join方法在for循环之外

Pool类

  • Pool类可以提供指定数量的进程供用户使用,默认是CPU核数。当有新的请求提交到Pool的时候,如果进程池没有满,则会创建一个进程执行,如果进程池满了,则会让该请求等待。

  • Pool对象调用join方法会等待所有的子进程执行完毕

  • 调用join方法前必须调用close()

  • 调用close()以后就不能继续添加新的Process

  • pool.apply_async() 直接启动了进程,不用使用start来启动(同步执行进程,允许多个进程同时进入进程池)

  • pool类使用实例(使用apply_async()来执行进程)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import multiprocessing
    import os
    import time

    def run_task(name):
    print('Task {0} pid {1} is Running, Parent id is {2}'.format(name,os.getpid(),os.getppid()))
    time.sleep(1)
    print('Task {0} end'.format(name))

    if __name__ == '__main__':
    print('current process {0}'.format(os.getpid()))
    p = multiprocessing.Pool(processes=3)
    for i in range(6):
    p.apply_async(run_task,args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All process done!')

  • pool使用实例(pool.apply(func[,args[,kwds]]) 只允许一个进程进入进程池,在一个进程结束以后,另外一个进程才可以进入执行)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import multiprocessing
    import os
    import time

    def run_task(name):
    print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
    time.sleep(1)
    print('Task {0} end.'.format(name))

    if __name__ == '__main__':
    print('current process {0}'.format(os.getpid()))
    p = multiprocessing.Pool(processes=3)
    for i in range(6):
    p.apply(run_task,args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All process done!')

进程间的通信

  1. 使用python标准库中的subprocess包来fork一个子进程,并运行一个外部的程序

    • subprocess.call(command,shell=True) fork一个子进程,并运行一个外部程序(命令)在本进程中显示结果

    • subprocess.check_output(command) 将外部命令执行结果放入变量,不在控制台输出

    • 可以看出subprocess.call和check_output的返回结果,call执行成功返回一个0,否则返回非零数,check_output返回命令执行的结果

  2. multiprocessing提供的Queue(Queue用来在多个进程间的通信,Queue有两个方法,get和put)

    • put方法用来插入数据到队列中。有两个可选参数,blocked和timeout

      • blocked = True(默认值),timeout 为正

      该方法会阻塞 timeout 指定的时间,直到该队列有剩余空间。如果超时,抛出 Queue.Full 异常

      • blocked = False

      如果 Queue 已满,立刻抛出 Queue.Full 异常

    • get方法用来从队列中读取并删除一个元素。有两个参数可选,blocked 和 timeout

      • blocked = False (默认),timeout 正值

      等待时间内,没有取到任何元素,会抛出 Queue.Empty 异常

      • blocked = True

      Queue 有一个值可用,立刻返回改值;Queue 没有任何元素

    • Queue实例

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      from multiprocessing import Process, Queue
      import os, time, random

      #写数据进程
      def proc_write(q,urls):
      print('Process {0} is writing...'.format(os.getpid()))
      for url in urls:
      q.put(url)
      print('Put %s to queue..' % url)
      time.sleep(random.random())

      #读数据进程
      def proc_read(q):
      print('Process {0} is reading...'.format(os.getpid()))
      while True:
      url = q.get(True)
      print('Get %s from queue...' % url)

      if __name__ == '__main__':
      q = Queue()
      proc_write1 = Process(target=proc_write,args=(q,['url_1','url_2','url_3','url_4']))
      proc_write2 = Process(target=proc_write,args=(q,['url_4','url_5','url_6']))
      proc_reader = Process(target=proc_read,args=(q,))
      #启动子进程写入
      proc_write1.start()
      proc_write2.start()
      #启动子进程读取
      proc_reader.start()
      #等待写入进程结束
      proc_write1.join()
      proc_write2.join()
      #proc_reader进程是死循环,无法等待结束,只能强行终止
      proc_reader.terminate()

  3. multiprocessing提供的Pipe(常用来在两个进行间的通信,两个进程分别位于管道的两端)

    • multiprocessing.Pipe([duplex])

    • pipe实例一(发送列表)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      from multiprocessing import Process, Pipe

      def send(pipe):
      pipe.send(['spm']+[545,'wdd']) #send传输一个列表
      pipe.close()

      if __name__ == '__main__':
      #实例化两个pipe对象,因为pipe进程通信实在管道的两边
      (c1,c2) = Pipe()
      sender = Process(target=send,args=(c1,))
      sender.start()
      print("c2 got: {0} ".format(c2.recv()))
      c2.close()

    • pipe实例二(发送字典文件)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      from multiprocessing import Process, Pipe

      def talk(pipe):
      pipe.send(dict(name='ss',age=22))
      reply = pipe.recv()
      print('talk got:',reply)

      if __name__ == '__main__':
      (a1, a2) = Pipe()
      c = Process(target=talk, args=(a2,)) #创建一个进程
      c.start()
      print('parent got:',a1.recv())
      a1.send({x * 2 for x in 'spam'})
      c.join() #传输的数据被 talk 函数内的 pip 管道接收,并赋值给 reply
      print('parent exit')

  4. 上述的queue和pipe都是进程间的通信,实在一个进行池中,如果不在一个进程池中,用multiprocessing.Manager().Queue() 通信;同时:子进程与父进程的通信试用subprocess比较好

    • 不同进程池之间的通信实例如下

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      from multiprocessing import Process,Pool,Queue,Manager
      import os,time,random

      def write(q,list,i):

      print("[+] This is Process_"+str(i)+" and pid is %s START"%os.getpid())
      for item in list:
      q.put(item)
      print("[+] Pid %s Put : %s"%(os.getpid(),item))
      print("[+] This is Process_"+i+" and pid is %s CLOSE"%os.getpid())
      def test():
      print(os.getpid())

      def read(q):
      print('[====] Process to Read , pid = %s'%os.getpid())
      while True:
      data = q.get(True)
      print("[=] data from Queue : %s"%str(data))

      def main():
      manager = Manager()
      q = manager.Queue()
      p = Pool(5)
      listx = []
      listx.append([x for x in range(10)])
      listx.append([x for x in range(100,110)])
      listx.append([x for x in range(200,210)])
      listx.append([x for x in range(300,310)])
      listx.append([x for x in range(400,410)])
      for i in range(5):
      if i==4:
      pass
      p.apply_async(read,args=(q,))
      else:
      list = listx[i]
      #print(list)
      p.apply_async(write,args=(q,list,i))
      p.close()
      p.join()

      if __name__ == '__main__':
      main()

总结

通过一些实例学习python3的多进程,包含了主要的两个类及其适用方法;同一进程池中进程间通信,不同进程池中进程间的通信以及父类与子类的进程间通信