Python学习(五)

学习网址:廖雪峰的Python教程

多进程和多线程

线程是最小的执行单元,真正的多进程是需要多核CPU支持才行,单核CPU之所以能执行多进程,是因为CPU的执行速度非常之快,在多个进程中交替执行,看起来像是多进程,实际上还是单进程.进程中包含有许多线程,像word之类的编辑文档,既需要记录用户输入的内容,又需要随时保存,打印等服务.

多进程

Python中os模块封装了常见的系统调用,包括fork()方法,可以在Python中创建子进程,但是在Windows下,调用fork() 方法时,却没有…

1
2
3
4
5
6
7
>>> import os
>>> print('Process (%s) start...' % os.getpid())
Process (7088) start...
>>> pid = os.fork()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: module 'os' has no attribute 'fork'

Windows上就没有办法编写多进程的程序了嘛?当然不是.

因为Python是跨平台的,multiprocessing模块就是跨平台的多进程模块.

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process
import os
# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if name=='main':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')

执行结果:

1
2
3
4
Parent process 6288.
Child process will start.
Run child process test (9012)...
Child process end.

创建子进程时,先创建Process实例,传入函数和函数的参数,挑用start方法启动,join方法是让子进程执行完毕后再继续向下执行,通常用于进程间同步.

Pool

如果需要大量的子进程,可以用进程池批量创建子进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
Parent process 5524.
Waiting for all subprocesses done...
Run task 0 (2856)...
Run task 1 (7840)...
Run task 2 (8688)...
Run task 3 (9180)...
Task 1 runs 0.75 seconds.
Run task 4 (7840)...
Task 4 runs 0.08 seconds.
Task 0 runs 1.36 seconds.
Task 3 runs 1.59 seconds.
Task 2 runs 2.82 seconds.
All subprocesses done.

对Pool对象调用join()方法会等待所有的子进程,调用之前必须先调用close()方法,调用close()之后就不能继续添加新的Process了.

子进程

subprocess,可以启动一个子进程,然后控制其输入和输出.

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> import subprocess
>>> print('$nslookup www.python.org')
$nslookup www.python.org
>>> r = subprocess.call(['nslookup','www.python.org'])
服务器: UnKnown
Address: 192.168.0.1
非权威应答:
名称: python.map.fastly.net
Addresses: 2a04:4e42:12::223
151.101.76.223
Aliases: www.python.org
>>> print('Exit code:',r)
Exit code: 0
进程间通信

进程间需要通信,操作系统提供了很多机制实现进程间通信,Python的multiprocessing模块包装了底层的机制,提供了Queue,Pipes等方式交换数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if name=='main':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()

结果有报错:

1
2
3
4
5
6
7
8
Traceback (most recent call last):
File "queue.py", line 21, in <module>
q = Queue()
File"C:\Users\snow\AppData\Local\Programs\Python\Python35\lib\multiprocessing\context.py",line 100, in Queue
from .queues import Queue
File"C:\Users\snow\AppData\Local\Programs\Python\Python35\lib\multiprocessing\queues.py", line 20, in <module>
from queue import Empty,
FullImportError: cannot import name 'Empty'

明白了错误的原因,在命名的时候queue.py文件名会和要导入的模块有冲突,改成没有冲突的就可以了,比如:qu.py.结果:

1
2
3
4
5
6
7
8
9
D:\Python>python qu.py
Process to write: 6156
Put A to queue...
Process to read: 7052
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了.

实现跨平台多进程,使用multiprocessing模块,进程间通信,用Queu,Pipes.

多线程

多任务可以由多进程来完成,也可以用多线程来完成.一个进程是由多条线程组成的,一个进程至少有一个线程.

Python中提供了两个模块,_threadthreading,_thread是低级模块,threading是高级模块,对_thread进行了封装,一般情况下,用threading.

启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time, threading
# 新线程执行的代码:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

执行结果:

1
2
3
4
5
6
7
8
9
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

进程启动都会默认启动一个线程,称之为主线程,主线程的实例叫MainThread,在Python的threading模块中的 current_thread()函数,它永远返回当前线程的实例,子线程的名字在创建时指定,如果没有为子线程命名,Python会自动给线程命名Thread1,Thread2…

Lock

就是锁的意思.多线程和多进程最大的区别,就是在多进程中,同一变量,每个进程都有拷贝一份,互不影响.在多线程中,所有变量都由所有线程共享,任何 一个变量都可以被任何一个线程修改.所以,线程间共享数据最大的危险在于多个线程同时修改一个变量,导致变量内容错误.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time, threading
# 假定这是你的银行存款:
balance = 0
def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

执行结果可能每次都不一样:

1
2
3
4
5
6
7
8
9
10
D:\Python>python lock.py
0
D:\Python>python lock.py
5
D:\Python>python lock.py
0
D:\Python>python lock.py
-8
D:\Python>python lock.py
0

线程的调度是由操作系统决定的,当t1,t2交替执行时候,结果可能会变掉

x为局部变量,当每个线程都有自己的变量x时:

1
2
3
4
5
6
7
8
9
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2 # balance = 0
结果 balance = 0

当t1和t2交替执行时候:

1
2
3
4
5
6
7
8
9
10
初始值 balance = 0
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance - 8 # x2 = 0 - 8 = -8
t2: balance = x2 # balance = -8
结果 balance = -8

为了防止变量的内容被多个线程修改错乱,既可以为线程加上锁(Lock),在change_it()方法加上锁,当前线程就获得了锁,其他线程就不能调用change_it()方法,必须等待锁的释放之后,才能对变量进行修改:

1
2
3
4
5
6
7
8
9
10
11
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
#先要获取锁
lock.acquire()
try:
change_it(n)
finally:
#改完了一定要释放锁
lock.release()

通过threading.Lock()来创建一个锁,加上锁之后,一定要记得通过lock.release()释放锁,否则,会导致线程为死线程.

Python解释器在执行代码时,都会为每个线程加上一个GIL锁(Global Interpreter Lock),线程在执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就会自动释放GIL锁,让其他线程有机会执行,GIL全局锁,实际上把所有线程的执行代码都给加上了锁.所以,多线程在Python中只能交替执行,即使100个线程泡在100核CPU上,也只能用1核.

所以,在Python中,可以使用多线程,但不要期望有效利用多核,在Python中虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务,多个Python进程有各自独立的GIL锁,互不影响.

多线程并发在Python中,不会发生,

ThreadLocal

在多线程环境下,每个线程都有自己的数据,一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看到,不会影响其他线程,而全局变量的修改必须加锁.

当你在函数调用局部变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
def process_student(name):
std = Student(name)
# std是局部变量,但是每个函数都要用它,因此必须传进去:
do_task1(std)
do_task2(std)
def do_task1(std):
do_subtask1(std)
do_subtask2(std)
def do_task2(std):
do_subtask2(std)
do_subtask2(std)

这样的层级传递很麻烦,使用ThreadLocal时,可以简化这样的调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
# 创建全局ThreadLocal对象:
local_school = threading.local()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread,
args=('Alice',),name='Thread-A')
t2 = threading.Thread(target= process_thread,
args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

输出结果:

1
2
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

使用threading.local()方法来创建一个全局的ThreadLocal对象–>local_school

每个Thread对它都可以读写student属性,互不影响.

可以把local_school看成是全局变量,local_school.student属性都是线程的局部变量,可以任意读写而且互不干扰,也不用管锁的问题,都由ThreadLocal处理.

ThreadLocal最常用的地方是,为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以方便的访问这些资源.

ThreadLocal解决了参数在一个线程中各个函数间互相传递的问题.


真诚地希望能帮到你!