Python编程——进程与线程

进程与线程

进程:操作系统分配资源的最小单位,每个进程在执行过程中拥有独立的内存单元。

线程:操作系统调度的最小单位,同一个进程的多个线程在执行过程中共享内存。

单个CPU一次只能运行一个任务(进程)

一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存

互斥锁 Mutex 防止多个线程同时读写某一块内存区域

信号量 Semaphore 互斥锁是信号量为1时的特殊情况

进程三态模型

  • 运行态:进程占有处理器
  • 就绪态:进程具备运行条件(分配到除CPU以外的所有必要资源),等待分配处理器。
  • 阻塞态:进程不具备运行条件(等待某一事件发生)

七态模型

挂起就绪态、挂起阻塞态:在外存中

同步/异步,阻塞/非阻塞

【怎样理解阻塞非阻塞与同步异步的区别? – 卢毅luis的回答 – 知乎 https://www.zhihu.com/question/19732473/answer/20851256

同步和异步关注的是消息通信机制

  • 同步:发出一个调用时,在没有得到结果之前,该调用就不会返回,调用返回返回值(调用者主动等待调用结果)
  • 异步:发出一个调用时,调用直接返回(无结果),即调用者不能立刻得到结果,当该异步功能完成后,被调用者通过状态、通知或回调来通知调用者。

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态

  • 阻塞:调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会返回。(没得到结果挂起)
  • 非阻塞:在不能立刻得到结果之前,该调用不会阻塞当前线程。(没得到结果依然继续运行)

进程的创建

  • 系统初始化
    • 前台进程:与用户交互
    • 后台进程:与用户无关
    • 守护进程:运行在后台,需要时才唤醒
  • 进程创建子进程
  • 用户交互请求,创建新进程
  • 批处理作业初始化

进程的中止

  • 正常退出
  • 出错退出(自愿)
  • 严重错误(非自愿)
  • 被其他进程杀死

关于Unix和Windows的进程

相同

  • 进程创建后,父进程和子进程有各自不同的地址空间(多道技术要求物理层面实现进程之间内存的隔离),任何一个进程的在其地址空间中的修改都不会影响到另外一个进程。
  • 进程只有一个父进程。

不同

  • 在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是可以有只读的共享内存区的。但是对于windows系统来说,从一开始父进程与子进程的地址空间就是不同的。
  • 在UNIX中所有的进程,都是以init进程为根,组成树形结构,父子进程共同组成一个进程组。
  • 在Windows中,所有的进程都是地位相同的,父进程得到一个特别的令牌(称为句柄),该句柄可以用来控制子进程。

进程并发的实现

操作系统维护进程表Process Table,表项为进程控制块,保存进程的运行状态。

僵尸进程和孤儿进程

  • 僵尸进程(有害)一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵尸进程(ps下状态为“Z”)。在每个进程退出的时候,内核释放该进程所有的资源,但是仍然为其保留一定的状态信息,直到父进程通过wait / waitpid来取时才释放,否则就不会释放,进程号一直被占用。如果父进程在子进程结束之前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。(对开启的子进程应该记得使用join,join会回收僵尸进程)
  • 孤儿进程(无害)一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。

守护进程

  • 守护进程会在主进程代码执行结束后就终止
  • 守护进程内无法再开启子进程,否则抛出异常

多进程

多进程的主进程一定要写在程序入口 if __name__ =='__main__': 内部才能正常运行

避免自己调用自己时重复执行主进程

不想在被调用时执行的代码放在程序入口的if内部

__name__是当前模块名,模块被直接运行时为__main__,被导入时,模块名为文件名。

os.fork()

os.fork() 子进程返回0,父进程返回子进程的ID (无法用于Windows)

getpid() 获取当前进程ID

getppid() 获取父进程的ID

Process 进程

from multiprocessing import Process
def run_proc(name):
    ...
    
p = Process(target=run_proc, args=('name',)) # 创建子进程
p.start() # 启动
p.join() # 阻塞主进程(等待子进程结束再继续运行主进程)
  • 新创建的进程与进程的切换都是要耗资源的(独立分配资源和拷贝访问的数据),所以平时工作中进程数不能开太大。
  • 同时可以运行的进程数一般受制于CPU的核数。

Pool 进程池

Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果进程池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

from multiprocessing import Pool
​
p = Pool(num) # num为进程数,默认为CPU的核数
for i in range(5):
    p.apply_async(long_time_task, args=(i,)) # 添加进程
p.close() # close之后不能添加进程
p.join()  
multiprocessing.cpu_count() # 获取CPU核心数
# Pool类中的方法
p = Pool(num) # 创建进程池
p.apply_async(func,args=(),callback) # 向进程池提交需要执行的函数和参数 各个进程采用非阻塞(异步)的调用方式(每个进程只管运行自己)
p.map(func,iterable) # 使用多进程执行map函数 https://www.zhangshengrong.com/p/ArXGrRPBNj/
p.map_async() # 非阻塞(异步)执行map 
p.close() # 关闭进程池,之后不能添加进程
p.terminate() # 结束工作进程,不再处理未处理的任务(用于终止无限循环的进程)
p.is_alive() 
p.join() # 主进程阻塞等待子进程退出,要在close或terminate之后使用,使不再接受新的进程
​
# AsyncResul的实例obj[apply_async()和map_async()的返回值]
obj.get() # 返回结果,如果有必要则等待结果到达
obj.ready() # 如果调用完成,返回True
obj.successful() # 如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout]) # 等待结果变为可用。
obj.terminate() # 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

回调函数callback

进程池中任何一个任务一旦处理完了,就立即告知主进程,主进程则调用回调函数去处理该结果

继承类重写run创建进程

class MyProcess(Process):
    def __init__(self,func,args):
        Process.__init__(self) # 不要忘了继承父类的__init__方法
        # super().__init__()
        self.func = func
        self.args = args
    def run(self):
        print('开始子进程')
        self.func(self.args[0])
        print('子进程结束')
​
def a(i):
    print(i*i)
​
if __name__ == '__main__':
    M = MyProcess(a,(10,))
    M.start()
    M.join()

进程间通信(数据交互)

管道Pipe

Pipe比队列快

from multiprocessing import Process, Pipe
conn1, conn2 = Pipe(duplex=True) # 管道有两端,管道的两端可以同时给多个进程 duplex=True,则管道是双向的,False则conn1接收数据,conn2发送数据
conn1.send() # 放入
conn1.recv() # 取出从另一端放入的数据
conn1.poll([timeout]) # 是否还有数据可读取

队列Queue

from multiprocessing import Process, Queue
q = Queue(maxsize)
q.put(obj, block=True, timeout=None) # 放入(FIFO) block=True写满阻塞timeout时间,如果没有空闲空间,抛出queue.Full
q.get() # 取出
q.empty() # 判断队列是否为空
q.qsize() # 队列的长度

数据共享 Managers

from multiprocessing import Process, Manager
mgr = Manager()
d = mgr.dict()
l = mgr.list()
​
with Manager() as mgr:
    d = mgr.dict()
    l = mgr.list()

multiprocessing.Lock() #最简单的锁(非递归锁)
​
multiprocessing.RLock() #可复用的锁(递归锁)
​
multiprocessing.Semaphore(value=1) #计数器锁(信号量锁),value为初始计数
​
multiprocessing.BoundedSemaphore(value=1) #带上限的计数器锁(信号量锁),value即是初始计数,同时也是允许的计数上限
​
# 以上锁即可通过acquire/release方法获得/释放,也可采用with上下文方式来使用(with lock: …, 这样可以省去acquire/release语句)
​
multiprocessing.Event() #事件锁,当事件触发时释放。其通过set/clear方法获得/释放。
​
multiprocessing.Condition(lock = None) #条件锁,当条件触发时释放。其通过wait_for来条件阻塞,当条件满足时自动释放;也可用作类事件锁,通过wait阻塞,notify或notify_all释放。
​
multiprocessing.Barrier(parties, action=None, timeout=None) #障碍锁,等待进程数达到parties要求数目后释放,可用于进程同步。其通过wait阻塞,等待进程数达标后自动释放;也可通过abort强行释放。

多线程

import threading
​
t = threading.Thread(target=func, name='LoopThread') # 主线程名为MainThread,子线程再创建时指定,默认为Thread-1等
t.setDaemon(True) # 默认为False 必须在start之前调用,设置为True,则该线程为守护线程(后台线程),进程退出时无需等待这个线程完成(避免子线程无限循环,无法退出程序,避免孤儿进程出现)
t.start()
t.join()
​
threading.current_thread().name # current_thread() 返回当前线程的实例

数据共享

多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享。

lock = threading.Lock() 
​
lock.acquire() # 获得锁
# 使用try..finally确保锁一定会释放
try:
    ... # 修改数据
finally:
    lock.release() # 释放锁
 
with Lock():
    ...
import threading
​
class Account:
    
    def __init__(self):
        self.balance = 0
        
    def add(self, lock):
        # 获得锁
        lock.acquire()
        for i in range(0, 100000):
            self.balance += 1
        # 释放锁
        lock.release()
        
    def delete(self, lock):
        # 获得锁
        lock.acquire()
        for i in range(0, 100000):
            self.balance -= 1
            # 释放锁
        lock.release()
        
if __name__ == "__main__":
    account = Account()
    lock = threading.Lock()
    # 创建线程
    thread_add = threading.Thread(target=account.add, args=(lock,), name='Add')
    thread_delete = threading.Thread(target=account.delete, args=(lock,), name='Delete')
​
    # 启动线程
    thread_add.start()
    thread_delete.start()
​
    # 等待线程结束
    thread_add.join()
    thread_delete.join()
​
    print('The final balance is: {}'.format(account.balance))
from queue import Queue
Queue.qsize() # 返回队列的大小
Queue.empty() # 如果队列为空,返回True,反之False
Queue.full() # 如果队列满了,返回True,反之False,Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]]) # 获取队列,timeout等待时间
Queue.get_nowait() # 相当于Queue.get(False),非阻塞方法
Queue.put(item) # 写入队列,timeout等待时间
​
# 一起使用
Queue.task_done() # 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号。每个get()调用得到一个任务,接下来task_done()调用告诉队列该任务已经处理完毕。
Queue.join() # 阻塞调用线程,直到队列中的所有任务被处理掉
​
# 生产者 - 消费者模式
from queue import Queue
from threading import Thread
import time
def Producer(q,i):
    for i in range(5):
        q.join() # 任务为空才能生产,否则阻塞
        print('生产:{}'.format(i))
        q.put(i)
​
def Consumer(q):
    for i in range(5):
        print('消费:{}'.format(q.get()))
        time.sleep(2)
        q.task_done() # 任务数 - 1
​
if __name__ == '__main__':
    q = Queue()
    t_consumer = Thread(target=Consumer,args=(q,))
    t_producer = Thread(target=Producer,args=(q,10))
    t_producer.start()
    t_consumer.start()
# 不使用join和task_done,put不受限制,get只有当队列为空时才阻塞

使用队列Queue

Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。

ThreadLocal

全局变量ThreadLocal 在每一个变量中都会创建一个副本,每个线程都可以访问自己内部的副本变量。

import threading
    
# 创建全局ThreadLocal对象:
local_school = threading.local()
​
def process_student():
    # 获取当前线程关联的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))
​
def process_thread(name):
    # 绑定ThreadLocal的student:
    local_school.student = name
    process_student()
​
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
# Hello, Alice (in Thread-A)
# Hello, Bob (in Thread-B)

守护线程

等待主线程运行完毕后销毁

主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

多进程 VS 多线程

  • 对CPU密集型代码(比如循环计算) – 多进程效率更高
  • 对IO密集型代码(比如文件操作,网络爬虫) – 多线程效率更高。大部分消耗时间其实是等待时间,在等待时间中CPU是不需要工作的

GIL

Global Interpreter Lock 全局解释器锁

在python2.x里,GIL的释放逻辑是当前线程遇见IO操作或者ticks计数达到100(可粗略看作Python虚拟机的指令)

在python3.x中,GIL不使用ticks计数,改为使用计时器(执行时间达到阈值后,当前线程释放GIL),这样对CPU密集型程序更加友好

由于GIL锁存在,python里一个进程永远只能同时执行一个线程

多核多线程比单核多线程更差

原因是单核下多线程,每次释放GIL,唤醒的那个线程都能获取到GIL锁,所以能够无缝执行,但多核下,多核的其他CPU中的参与这个核竞争成功线程也会被唤醒,进行锁的竞争,如果不成功又会进入睡眠,导致线程颠簸,效率更低。

多进程:每个进程有各自独立的GIL,互不干扰

Python GIL不能绝对保证线程安全

http://c.biancheng.net/view/5537.html

即便 GIL 仅允许一个 Python 线程执行,但别忘了 Python 还有 check interval 这样的抢占机制

https://www.zhihu.com/question/23030421

GIL控制的是字节码, 锁控制的是python代码,粒度不一样

GIL 的作用是:对于一个解释器,只能有一个thread在执行bytecode。所以每时每刻只有一条bytecode在被执行一个thread。GIL保证了bytecode 这层面上是thread safe的。

一条汇编语言的原子性由硬件保证;一条python-bytecode的原子性由GIL保证;一条python语句的线程安全性由线程锁保证。

GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理

python语句不是直接编译成汇编语言的,有cpython这个虚拟机夹在中间,因此硬件无法保证python-bytecode的原子性,因此要由GIL来保证。

只要线程之间没有共享资源,那么就是线程安全的,有共享资源,为了保证线程安全,需要引进锁的机制

GIL锁与互斥锁综合分析

#1. 100个线程去抢GIL锁,即抢执行权限
#2. 肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()
#3. 极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL
#4. 直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程

死锁与递归锁

递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock

RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该<线程内>又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止

发表评论