Python 学习之路(五)————线程、进程
以下所用的是Python 3.6。使用PyCharm IDE。
一、概念及区别
1.1 概念
进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。譬如说QQ、360安全卫士这些程序都是一个进程。简单来说就是资源的集合。
线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。 一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行。譬如说360安全卫士可以同时查杀病毒,清理缓存,这就是一个进程中启用了多个线程。简单来说就是资源调度的基本单位。
1.2 关系与区别
进程和线程的关系:
- 一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程。
- 资源分配给进程,同一进程的所有线程共享该进程的所有资源。
- 处理机分给线程,即真正在处理机上运行的是线程。
- 线程在执行过程中,需要协作同步。不同进程的线程间要利用消息通信的办法实现同步。线程是指进程内的一个执行单元,也是进程内的可调度实体.
进程与线程的区别:
- 调度:线程作为调度和分配的基本单位,进程作为拥有资源的基本单位
- 并发性:不仅进程之间可以并发执行,同一个进程的多个线程之间也可并发执行
- 拥有资源:进程是拥有资源的一个独立单位,线程不拥有系统资源,但可以访问隶属于进程的资源,有一个共享的程序资源空间
- 系统开销:在创建或撤消进程时,由于系统都要为之分配和回收资源,导致系统的开销明显大于创建或撤消线程时的开销
二、线程部分
2.1 调用的两种方式
直接调用
import threadingimport timedef run(args): print("%s runing"% args) time.sleep(3)t1 = threading.Thread(target=run,args=("task one",))t2 = threading.Thread(target=run,args=("task two",))t1.start()t2.start()
继承调用
import threadingimport timeclass MyThread(threading.Thread): #继承Thread类 def __init__(self,msg): threading.Thread.__init__(self) self.msg = msg def run(self): print("%s is running" % self.msg) time.sleep(3)if __name__ == "__main__": t1 = MyThread("task one") t2 = MyThread("task two") t1.start() t2.start()
2.2 线程模块
threading 模块提供的其他方法
- threading.currentThread(): 返回当前的线程变量。
- threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
- threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
Thread类提供了以下方法
- run(): 用以表示线程活动的方法。
- start():启动线程活动。
- join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
- isAlive(): 返回线程是否活动的。
- getName(): 返回线程名。
- setName(): 设置线程名。
threading 可用对象列表
- Thread 表示执行线程的对象
- Lock 锁原语对象
- RLock 可重入锁对象,使单一进程再次获得已持有的锁(递归锁)
- Condition 条件变量对象,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值
- Semaphore 为线程间共享的有限资源提供一个”计数器”,如果没有可用资源会被阻塞
- Event 条件变量的通用版本,任意数量的线程等待某个时间的发生,在改事件发生后所有线程被激活
- Timer 与 Thread 相识,不过它要在运行前等待一段时间
- Barrier 创建一个”阻碍”,必须达到指定数量的线程后才可以继续
2.3 互斥锁 Mutex
互斥锁,防止多个线程使用同一份资源(数据)产生错误,保证线程同步。 如下例所示,如果没有lock这个锁,可能会导致num结果出错。注:请在2.X上调试,3.0以上可能自动加了锁,没有出现问题。
import threading, timenum = 0def run(args): #线程运行函数 lock.acquire() #上锁 global num #获取全局变量 time.sleep(0.1) num += 1 print("%s runing"% args) lock.release() #开锁lock = threading.Lock() #定义互斥锁thread_list= [] #定义线程列表for i in range(50): #创建50个线程 t = threading.Thread(target=run,args=(i,)) t.start() thread_list.append(t)for thread in thread_list: #等待全部线程结束 thread.join()print(num)
2.4 递归锁(重入锁) RLock
重入锁必须由获取它的线程释放。 一旦线程获得了一个可重入的锁,同一个线程可能会再次获取它没有阻塞; 该线程每次都必须释放一次获得它。
import threadingdef run(): #线程调用函数 对num计数 print("threading %s" % threading.get_ident()) lock.acquire() global num num += 1 lock.release() return numdef run2():#线程运行函数,完成调用两次run函数 lock.acquire() res1= run() res2 = run() lock.release() print("res1:%s res2:%s"%(res1, res2))if __name__ == '__main__': num = 0 lock = threading.RLock() #定义重入锁,不加重入锁程序会导致死锁 for i in range(10): #开启10个线程 t = threading.Thread(target=run2) t.start() while threading.active_count() > 1: #活动线程数小于等于1时,退出循环 pass else: print(num)
2.5 信号量 Semaphore
Semaphore是同时允许一定数量的线程更改数据,有界信号量检查以确保其当前值不超过其初始值。 案例:做多只能有5个线程同时作业
import threading, timedef work(n): #作业线程运行函数 semaphore.acquire() #获取信号量,会在当前信号量总数count的基础上加1 time.sleep(1) print("this is %s work" % n) semaphore.release() #释放信号量,中当前信号量的总数上减1if __name__ == '__main__': semaphore = threading.BoundedSemaphore(5) #信号量最大总数为5,当到达5时, # 其他线程需要继续获取信号量,会等待其他线程释放 for i in range(1,100): t = threading.Thread(target=work, args=(i,)) t.start() while threading.active_count() != 1: pass else: print('----all works done---')
2.6 简单的生产者,消费者案例
生产者,消费者案例是在多线程同步问题中的典型案例
import threading,time,queuedef producer(name,q): #生产者线程运行函数 for i in range(100): if q.qsize() <= 5: lock.acquire() # 对缓存区操作时,需要上锁 q.put("产品%s" % i) print("%s 生产 产品%i"%(name,i)) lock.release() #操作完毕解锁 time.sleep(0.1) else: time.sleep(0.1)def consumer(name,q): #消费者线程运行函数 while True: if q.qsize() > 0: lock.acquire()#对缓存区操作时,需要上锁 print("%s 消费 %s"%(name,q.get())) lock.release() #操作完毕解锁 time.sleep(0.2)if __name__ == "__main__": q = queue.Queue() #定义资源缓存区 lock = threading.Lock() #定义资源互斥锁 pro1 = threading.Thread(target=producer,args=("生产者1",q,)) pro2 = threading.Thread(target=producer,args=("生产者2",q,)) con1 = threading.Thread(target=consumer,args=("消费者1",q,)) con2 = threading.Thread(target=consumer,args=("消费者2",q,)) pro1.start() pro2.start() con1.start() con2.start()
三、进程部分
3.1 进程的简单调用
from multiprocessing import Processimport time,osdef fun(name): time.sleep(2) print("this is new process:", name) print("parent process:", os.getppid()) #获取父进程id print("process id:", os.getpid()) #获取当前进程idif __name__ == "__main__": print("current process:",os.getpid()) #获取当前进程id p = Process(target=fun, args=("test",)) p.start() p.join()
3.2 进程间通信
可以通过Queue、Pipes等多种方式来交换数据
Queue通信案例
from multiprocessing import Process, Queueimport timedef send(q): #发送进程函数 msgs = ["Hello","python","process"] for msg in msgs: q.put(msg) time.sleep(1)def recv(q): #接收进程函数 while True: print("recv msg : %s from another process" % q.get(True))if __name__ == '__main__': q = Queue() send_process = Process(target=send, args=(q,)) # 声明发送进程 recv_process = Process(target=recv, args=(q,)) # 声明接收进程 send_process.start() # 开启发送进程 recv_process.start() # 开启接收进程 send_process.join() # 等待发送进程结束 recv_process.terminate() #强制关闭接收进程
Pipe管道案例
from multiprocessing import Process, Pipedef fun(conn): conn.send("this is child_process") #向父进程发送数据 conn.close()if __name__ == "__main__": parent_conn, child_conn = Pipe() #声明一个pipe管道,拥有两个返回值,分别代表通信的两端 p = Process(target=fun, args=(child_conn,)) p.start() print(parent_conn.recv()) #接收子进程发送的数据 p.join()
3.3 进程同步
from multiprocessing import Process, Lockdef fun(lock, i): lock.acquire() try: print('This is process ', i) finally: lock.release()if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=fun, args=(lock, i)).start()
3.4 进程池 Pool
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
from multiprocessing import Process, Poolimport timedef fun(i): time.sleep(2) return i*idef callback_fun(res): print('return result:', res)if __name__ == "__main__": pool = Pool(5) #定义进程池,最多有5个进程 for i in range(10): pool.apply_async(func=fun, args=(i,), callback=callback_fun) #异步调用进程池,传入启用函数,参数,回调函数 pool.close() pool.join() # 进程池中进程执行完毕后再关闭,一定要加上,不然不会运行