0%

要让Python程序实现多进程(multiprocessing),我们先了解操作系统的相关知识。

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

1
2
3
4
5
6
7
8
9
import os

print('Process (%s) starts...' % os.getpid())
# Only works on Unix / Linux / Mac:
pid = os.fork()
if pid == 0:
print('I am a child process (%s) and my parent is (%s).' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

运行结果如下:

1
2
3
Process (56351) starts...
I (56351) just created a child process (56352).
I am a child process (56352) and my parent is (56351).

由于Windows没有fork调用,上面的代码在Windows上无法运行。由于Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的。

有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

multiprocessing

如果你打算编写多进程的服务程序,Unix/Linux无疑是正确的选择。由于Windows没有fork调用,难道在Windows上无法用Python编写多进程的程序?

由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块。

multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process
import os
import time


# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)' % (name, os.getpid()))
time.sleep(2)
print('Child process ends')


if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Parent process end.')

执行结果如下:

1
2
3
4
5
Parent process 56782.
Child process will start.
Run child process test (56783)
Child process ends
Parent process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。

join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

如果不加p.join()这句代码,子进程也是可以被执行完成的。

Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
Parent process 57316.
Waiting for all subprocesses done...
Run task 0 (57317)...
Run task 1 (57318)...
Run task 2 (57319)...
Run task 3 (57320)...
Task 0 runs 0.59 seconds.
Run task 4 (57317)...
Task 4 runs 1.06 seconds.
Task 3 runs 1.96 seconds.
Task 1 runs 2.02 seconds.
Task 2 runs 2.99 seconds.
All subprocesses done.

对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),不然会出现ValueError: Pool is still running错误。调用close()之后就不能继续添加新的Process了。

p.join()这句代码必须存在,不然子进程极有可能不会被执行完成。

请注意输出的结果,task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小在我的电脑上是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:

p = Pool(5)
就可以同时跑5个进程。

由于Pool的默认大小是CPU的核数,如果你不幸拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果。

子进程

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:

1
2
3
4
5
import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

运行结果:

1
2
3
4
5
6
7
8
9
10
$ nslookup www.python.org
Server: 192.168.19.4
Address: 192.168.19.4#53

Non-authoritative answer:
www.python.org canonical name = python.map.fastly.net.
Name: python.map.fastly.net
Address: 199.27.79.223

Exit code: 0

如果子进程还需要输入,则可以通过communicate()方法输入:

1
2
3
4
5
6
7
8
9
10
11
12
import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
上面的代码相当于在命令行执行命令nslookup,然后手动输入:

set q=mx
python.org
exit

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
$ nslookup
Server: 192.168.19.4
Address: 192.168.19.4#53

Non-authoritative answer:
python.org mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:
mail.python.org internet address = 82.94.164.166
mail.python.org has AAAA address 2001:888:2000:d::a6
Exit code: 0

进程间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)

if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()

运行结果如下:

1
2
3
4
5
6
7
8
Process to write: 50563
Put A to queue...
Process to read: 50564
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。

小结

在Unix/Linux下,可以使用fork()调用实现多进程。

要实现跨平台的多进程,可以使用multiprocessing模块。

进程间通信是通过Queue、Pipes等实现的。

日志

日志是跟踪软件运行时所发生的事件的一种方法。软件开发者在代码中调用日志函数,表明发生了特定的事件。事件由描述性消息描述,该描述性消息可以可选地包含可变数据(即,对于事件的每次出现都潜在地不同的数据)。事件还具有开发者归因于事件的重要性;重要性也可以称为级别或严重性。

什么时候使用Logging

logging提供了一组便利的函数,用来做简单的日志。它们是 debug()、 info()、 warning()、 error() 和 critical()。

logging函数根据它们用来跟踪的事件的级别或严重程度来命名。标准级别及其适用性描述如下(以严重程度递增排序):

级别 何时使用
DEBUG 详细信息,一般只在调试问题时使用。
INFO 证明事情按预期工作。
WARNING 某些没有预料到的事件的提示,或者在将来可能会出现的问题提示。例如:磁盘空间不足。但是软件还是会照常运行。
ERROR 由于更严重的问题,软件已不能执行一些功能了。
CRITICAL 严重错误,表明软件已不能继续运行了。

打印日志到控制台

由于默认设置的等级是warning,所有只有warning的信息会输出到控制台。

1
2
3
4
5
6
import logging

logging.warning('Watch out!') # will print a message to the console
logging.info('I told you so') # will not print anything

WARNING:root:Watch out!

打印日志到文件

1
2
3
4
5
6
7
8
9
10
import logging

logging.basicConfig(filename='example.log',level=logging.DEBUG)
logging.debug('This message should go to the log file')
logging.info('So should this')
logging.warning('And this, too')

DEBUG:root:This message should go to the log file
INFO:root:So should this
WARNING:root:And this, too

我们设置了logging的级别为DEBUG,所以所有信息都将被写入到example.log文件中。

如果想要每次启动时,原来的打印日志都被清空的话,则需要把filemode由默认的a改为w

1
logging.basicConfig(filename='example.log', filemode='w', level=logging.DEBUG)

日志的格式

日志级别

1
2
3
4
5
6
7
8
9
import logging
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
logging.debug('This message should appear on the console')
logging.info('So should this')
logging.warning('And this, too')

DEBUG:This message should appear on the console
INFO:So should this
WARNING:And this, too

日期

1
2
3
4
5
import logging
logging.basicConfig(format='%(asctime)s %(message)s')
logging.warning('is when this event was logged.')

2019-02-07 22:21:20,993 is when this event was logged.

本章节中我们将向大家介绍如何将数据插入到MongoDB的集合中。

文档的数据结构和JSON基本一样。

所有存储在集合中的数据都是BSON格式。

BSON是一种类json的一种二进制形式的存储格式,简称Binary JSON。

插入文档

MongoDB 使用 insert() 或 save() 方法向集合中插入文档,语法如下:

1
db.COLLECTION_NAME.insert(document)
1
2
3
4
5
6
7
8
9
10
11
12
> document = ({"name": "simon", "age": 20, "interests": ["badminton", "swimming", "music"]})
{
"name" : "simon",
"age" : 20,
"interests" : [
"badminton",
"swimming",
"music"
]
}
> db.col.insert(document)
WriteResult({ "nInserted" : 1 })

插入文档你也可以使用 db.col.save(document) 命令。如果不指定 _id 字段 save() 方法类似于 insert() 方法。如果指定 _id 字段,则会更新该 _id 的数据。

指定_id字段是{"_id": ObjectId("5c5be011dd377f0cd58338ae")}

更新文档

update() 方法

update() 方法用于更新已存在的文档。语法格式如下:

1
2
3
4
5
6
7
8
9
db.collection.update(
<query>,
<update>,
{
upsert: <boolean>,
multi: <boolean>,
writeConcern: <document>
}
)

参数说明:

  • query : update的查询条件,类似sql update查询内where后面的。

  • update : update的对象和一些更新的操作符(如$,$inc…)等,也可以理解为sql update查询内set后面的

  • upsert : 可选,这个参数的意思是,如果不存在update的记录,是否插入objNew,true为插入,默认是false,不插入。

  • multi : 可选,mongodb 默认是false,只更新找到的第一条记录,如果这个参数为true,就把按条件查出来多条记录全部更新。

  • writeConcern :可选,抛出异常的级别。

save() 方法

save() 方法通过传入的文档来替换已有文档。语法格式如下:

1
2
3
4
5
6
db.collection.save(
<document>,
{
writeConcern: <document>
}
)

参数说明:

document : 文档数据。

writeConcern :可选,抛出异常的级别。

删除文档

1
2
3
4
5
6
7
db.collection.remove(
<query>,
{
justOne: <boolean>,
writeConcern: <document>
}
)

参数说明:

  • query :(可选)删除的文档的条件。

  • justOne : (可选)如果设为 true 或 1,则只删除一个文档,如果不设置该参数,或使用默认值 false,则删除所有匹配条件的文档。

  • writeConcern :(可选)抛出异常的级别。

remove() 方法已经过时了,现在官方推荐使用 deleteOne() 和 deleteMany() 方法。

如删除集合下全部文档:

1
db.inventory.deleteMany({})

删除 status 等于 A 的全部文档:

1
db.inventory.deleteMany({ status : "A" })

删除 status 等于 D 的一个文档:

1
db.inventory.deleteOne( { status: "D" } )