进程和线程

一个进程(Process)至少有一个线程(Thread),一个任务需要一个进程,每个任务又有多个子任务,每个子任务都需要独立的线程。正常来说,你的CPU有几个核心,就可以有几个执行流,但我们任务很多,比如你的电脑是4核CPU,你就只能同时有4个执行流。但事实是,你电脑同时进行的任务非常多,你不可能说只能开4个任务吧,尤其是过去很多都是单核CPU,这是因为操作系统会让各个任务交替执行,比如任务1执行0.01s,任务2接着执行0.01s,因为切换速度非常快,因此你感知不出来。

多进程

fork

类Unix系统提供了一个fork() 调用。这个函数比较特殊,一般的函数调用1次返回1次,但它调用1次返回2次。因为操作系统会自动把当前进程(父进程)复制一份(子进程),然后在父进程和子进程中分别返回,子进程返回0 ,而父进程返回子进程的id。这样的话父进程就可以fork 出很多子进程,因此你自然是要记住子进程的id的,子进程如果想要父进程的id的话,就要调用getppid() 了。

Python的os 模块封装了很多系统调用,其中自然是包括fork 的,请注意,这个调用对Windows系统不生效。

multiprocessing

Windows没有fork() 调用,无伤大雅,Python作为一门跨平台的语言,为我们提供了一个通用的多进程模块,即multiprocessing

这个模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process
import os

# 子进程要执行的代码def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

执行结果如下:

Parent process 928.
Child process will start.
Run child process test (929)...
Process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。

join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

执行结果如下:

Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

代码解读:

Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

请注意输出的结果,task 0123是立刻执行的,而task 4要等待前面某个task完成后才执行,这说明Pool的默认大小在这个实例的电脑上是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:

p = Pool(5)

就可以同时跑5个进程。

Pool() 的默认进程数是CPU的核数,上面的实例的效果自然是4核CPU的效果了。

子进程

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

上述示例创建了一个运行命令nslookup www.python.org 的子进程

进程间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据。

我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

执行结果:

Process to write: 27532
Put A to queue...
Process to read: 7920
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

代码结构:

┌─────────────────────────────────────────────────────────┐
│                     父进程 (main)                        │
│                                                         │
│   q = Queue()  ← 创建共享队列                            │
│   pw = Process(target=write, args=(q,))  ← 写进程       │
│   pr = Process(target=read, args=(q,))   ← 读进程       │
└─────────────────────────────────────────────────────────┘
              │                           │
              ▼                           ▼
┌──────────────────────┐    ┌──────────────────────┐
│   子进程 pw (PID:27532)  │    │   子进程 pr (PID:7920)   │
│                      │    │                      │
│   write(q) 函数      │    │   read(q) 函数       │
│   q.put(value) 写入  │───▶│   q.get() 读取       │
└──────────────────────┘    └──────────────────────┘

在类Unix系统下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所以,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。

小结

  1. 类Unix系统提供fork() 调用
  2. Python提供跨平台的multiprocessing 模块
  3. 创建大量进程可以使用进程池Pool()
  4. 子进程使用subprocess 模块
  5. 进程间通信是通过QueuePipes等实现的

多线程

线程是操作系统直接支持的执行单元,高级语言通常都提供多线程的支持,Python也不例外。

Python的标准库提供了两个模块:_threadthreading_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行

任何进程默认就会启动一个线程,这个叫做主线程,主线程又可以启动新的线程。current_thread()函数,永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定,我们用LoopThread命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1Thread-2……

Lock

在多进程中,同一份变量,每个进程中都会拷贝一份,互不影响;但多线程中,并不是如此,变量是共享的,这样就会出现一些无法预料的风险,因此我们要将线程锁起来。创建锁通过threading.lock() 实现,获取锁通过lock.acquire() ,在最后一定要记得使用lock.release() 来释放锁。将线程锁起来的话,其他线程无法执行,苦苦等待,就会变成死线程,用try...finally... 或语法糖with 来确保锁一定会被释放。

死锁:不同线程持有不同锁,如果某个线程试图获得另一个线程的锁,就可能造成死锁,这些线程既不能执行,也不能结束,只能靠操作系统强行终止

GIL

如果你有一个多核CPU的话,你可能会想,一个CPU跑一个死循环会怎么样,于是你写出了下面的代码

import threading, multiprocessing

def loop():
    x = 0
    while True:
        x = x ^ 1

for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()

如果你用C、CPP、Java来实现类似代码的话,假如你是四核,你通过任务管理器之类的你会发现你的CPU占用率来到了400%,但Python就不会如此,这是因为它有着GIL(Global Interpreter Lock),任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码(源代码编译后的中间表示形式,比如.pyc),解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

当然,我们可以通过C扩展来实现,不过这样就失去了Python简单易用的特点。

另外,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

ThreadLocal

在多线程环境下,每个线程都有自己的数据,一个线程使用局部变量的话,只有自己能看见,且不会影响其他线程,使用全局变量的话,则还得加一个锁,所以比起全局我们更倾向于局部变量。

但局部变量的话你怎么调用函数呢?一层层传吗?太麻烦了,使用一个global_dict() 存放怎么样?以thread 自身作为key,获取对应的对象,似乎不错,但真写起来代码的话,好像获取写起来也有点麻烦,ThreadLocal 应运而生。

import threading

# 创建全局ThreadLocal对象:
local_school = threading.local()

def process_student():
    # 获取当前线程关联的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
    # 绑定ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。

进程 VS 线程

多任务

实现多任务,我们最常用的两种方式就是多进程和多线程了。然后我们通常会设计Master-Worker 模式,Master负责分配任务,Worker负责执行任务,通常是一个Master,多个Worker(现在agent越来越火了,你是否会想到subagent呢)

用多进程/多线程实现Master-Worker的话,比较如下:

多进程:稳定性高,一个子进程崩了不影响其他进程,而主进程只负责分配,一般不会崩掉

多线程:所有线程共享一个内存,一个完蛋其他的跟着完蛋,但是,多线程在Windows下效率较高

线程切换

无论是多进程还是多线程,都是多任务模型,切换任务是要花时间的,虽然很短,但数量多的话,叠加起来也是有点吓人的,操作系统可能会主要忙着切换任务,根本没有多少时间去执行任务了,这种情况最常见的就是硬盘狂响,点窗口无反应,系统处于假死状态。

计算密集型 VS IO密集型

是否采用多任务的第二个考虑是任务的类型。我们可以把任务分为计算密集型和IO密集型。

计算密集型的特点是要进行大量计算,主要考验CPU能力,如计算圆周率,对视频高清解码;而IO密集型则是任务时间主要花在IO等待上,如涉及网络、磁盘IO的任务,常见的大部分任务都是IO密集型任务,比如Web应用。前者我们使用C语言这样的语言开发会更好,而后者,反正对CPU完成任务没那么高的要求,还是得等IO,直接Python这类脚本语言提升效率就行了

异步IO

利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型,Nginx就是支持异步IO的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。由于系统总的进程数量十分有限,因此操作系统调度非常高效。用异步IO编程模型来实现多任务是一个主要的趋势。

对应到Python语言,单线程的异步编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。我们会在后面讨论如何编写协程。

分布式进程

进程和线程,优先选进程,因为进程可以布置到多台机器上,而线程不行,并且进程更稳定。

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。

比如我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上,怎么用分布式进程实现?

原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。

请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。

然后,在另一台机器上启动任务进程(本机上启动也可以)

正则表达式

字符串是编程时用得最多的一种数据结构,对它的操作更是频繁,当我们对字符串有着特定的匹配要求时,比如检查邮件地址是否合法,就要借助正则表达式了。正则表达式的设计思想是用一种描述性的语言来给字符串定义一个规则,凡是符合规则的字符串,我们就认为它“匹配”了,否则,该字符串就是不合法的。

正则表达式中,直接给出字符,就是精准匹配,如果是\d 就是匹配一个数字,\w 就是匹配一个字符或数字,. 可以匹配任意字符。

要匹配变长的字符,在正则表达式中,用*表示任意个字符(包括0个),用+表示至少一个字符,用?表示0个或1个字符,用{n}表示n个字符,用{n,m}表示n-m个字符

如果要匹配'010-12345'这样的号码呢?由于'-'是特殊字符,在正则表达式中,要用'\'转义,所以,上面的正则是\d{3}\-\d{3,8}

但是,仍然无法匹配'010 - 12345',因为带有空格。所以我们需要更复杂的匹配方式。

进阶

[]表示范围,下面举一些实例:

  • [0-9a-zA-Z\_]可以匹配一个数字、字母或者下划线;
  • [0-9a-zA-Z\_]+可以匹配至少由一个数字、字母或者下划线组成的字符串,比如'a100''0_Z''Py3000'等等;
  • [a-zA-Z\_][0-9a-zA-Z\_]*可以匹配由字母或下划线开头,后接任意个由一个数字、字母或者下划线组成的字符串,也就是Python合法的变量;
  • [a-zA-Z\_][0-9a-zA-Z\_]{0, 19}更精确地限制了变量的长度是1-20个字符(前面1个字符+后面最多19个字符)

[]表示一个符合规则的字符,后可以附加其他规则

A|B可以匹配A或B,所以(P|p)ython可以匹配'Python'或者'python'

^表示行的开头,^\d表示必须以数字开头。

$表示行的结束,\d$表示必须以数字结束。

re模块

Python提供re模块,包含所有正则表达式的功能。

由于Python的字符串本身也用\转义,所以要特别注意:

s = 'ABC\\-001' # Python的字符串# 对应的正则表达式字符串变成 'ABC\-001'

因此强烈建议使用Python的r前缀,就不用考虑转义的问题了:

s = r'ABC\-001' # Python的字符串# 对应的正则表达式字符串不变:'ABC\-001'

match()方法判断是否匹配,如果匹配成功,返回一个Match对象,否则返回None

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐