Weibw's World Weibw's World
首页
  • HTML
  • Python

    • Python基础知识
    • Python CookBook第三版
    • Flask
  • MySQL

    • MySQL基础知识
    • MySQL调优
    • MySQL面试题
算法
  • FineReport
  • Kettle
  • Git
  • 微信公众号文章
  • 优秀博客文章
  • 其他
收藏夹
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Weibw

一个没有梦想的咸鱼
首页
  • HTML
  • Python

    • Python基础知识
    • Python CookBook第三版
    • Flask
  • MySQL

    • MySQL基础知识
    • MySQL调优
    • MySQL面试题
算法
  • FineReport
  • Kettle
  • Git
  • 微信公众号文章
  • 优秀博客文章
  • 其他
收藏夹
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 《Flask》

  • 《Python Cookbook》第三版

    • 第一章:数据结构与算法

    • 第二章:字符串和文本

    • 第三章:数字日期和时间

    • 第四章:迭代器与生成器

    • 第五章:文件与IO

    • 第六章:数据编码和处理

    • 第七章:函数

    • 第八章:类与对象

    • 第九章:元编程

    • 第十章:模块与包

    • 第十一章:网络与Web编程

    • 第十二章:并发编程

      • 启动与停止线程
      • 判断线程是否已经启动
      • 线程间通信
      • 给关键部分加锁
      • 防止死锁的加锁机制
      • 保存线程的状态信息
      • 创建一个线程池
      • 简单的并行编程
        • 问题
        • 解决方案
        • 讨论
      • Python的全局锁问题
      • 定义一个Actor任务
      • 实现消息发布-订阅模型
      • 使用生成器代替线程
      • 多个线程队列轮询
      • 在Unix系统上面启动守护进程
    • 第十三章:脚本编程与系统管理

    • 第十四章:测试、调试和异常

    • 第十五章:C语言扩展

  • Python基础

  • Python
  • 《Python Cookbook》第三版
  • 第十二章:并发编程
weibw
2022-01-18

简单的并行编程

# 问题

你有个程序要执行CPU密集型工作,你想让他利用多核CPU的优势来运行的快一点。

# 解决方案

concurrent.futures 库提供了一个 ProcessPoolExecutor 类, 可被用来在一个单独的Python解释器中执行计算密集型函数。 不过,要使用它,你首先要有一些计算密集型的任务。 我们通过一个简单而实际的例子来演示它。假定你有个Apache web服务器日志目录的gzip压缩包:

logs/
   20120701.log.gz
   20120702.log.gz
   20120703.log.gz
   20120704.log.gz
   20120705.log.gz
   20120706.log.gz
   ...
1
2
3
4
5
6
7
8

进一步假设每个日志文件内容类似下面这样:

124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...
1
2
3
4
5

下面是一个脚本,在这些日志文件中查找出所有访问过robots.txt文件的主机:

# findrobots.py

import gzip
import io
import glob

def find_robots(filename):
    '''
    Find all of the hosts that access robots.txt in a single log file
    '''
    robots = set()
    with gzip.open(filename) as f:
        for line in io.TextIOWrapper(f,encoding='ascii'):
            fields = line.split()
            if fields[6] == '/robots.txt':
                robots.add(fields[0])
    return robots

def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+'/*.log.gz')
    all_robots = set()
    for robots in map(find_robots, files):
        all_robots.update(robots)
    return all_robots

if __name__ == '__main__':
    robots = find_all_robots('logs')
    for ipaddr in robots:
        print(ipaddr)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

前面的程序使用了通常的map-reduce风格来编写。 函数 find_robots() 在一个文件名集合上做map操作,并将结果汇总为一个单独的结果, 也就是 find_all_robots() 函数中的 all_robots 集合。 现在,假设你想要修改这个程序让它使用多核CPU。 很简单——只需要将map()操作替换为一个 concurrent.futures 库中生成的类似操作即可。 下面是一个简单修改版本:

# findrobots.py

import gzip
import io
import glob
from concurrent import futures

def find_robots(filename):
    '''
    Find all of the hosts that access robots.txt in a single log file

    '''
    robots = set()
    with gzip.open(filename) as f:
        for line in io.TextIOWrapper(f,encoding='ascii'):
            fields = line.split()
            if fields[6] == '/robots.txt':
                robots.add(fields[0])
    return robots

def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+'/*.log.gz')
    all_robots = set()
    with futures.ProcessPoolExecutor() as pool:
        for robots in pool.map(find_robots, files):
            all_robots.update(robots)
    return all_robots

if __name__ == '__main__':
    robots = find_all_robots('logs')
    for ipaddr in robots:
        print(ipaddr)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

通过这个修改后,运行这个脚本产生同样的结果,但是在四核机器上面比之前快了3.5倍。 实际的性能优化效果根据你的机器CPU数量的不同而不同。

# 讨论

ProcessPoolExecutor 的典型用法如下:

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as pool:
    ...
    do work in parallel using pool
    ...
1
2
3
4
5
6

其原理是,一个 ProcessPoolExecutor 创建N个独立的Python解释器, N是系统上面可用CPU的个数。你可以通过提供可选参数给 ProcessPoolExecutor(N) 来修改 处理器数量。这个处理池会一直运行到with块中最后一个语句执行完成, 然后处理池被关闭。不过,程序会一直等待直到所有提交的工作被处理完成。

被提交到池中的工作必须被定义为一个函数。有两种方法去提交。 如果你想让一个列表推导或一个 map() 操作并行执行的话,可使用 pool.map() :

# A function that performs a lot of work
def work(x):
    ...
    return result

# Nonparallel code
results = map(work, data)

# Parallel implementation
with ProcessPoolExecutor() as pool:
    results = pool.map(work, data)
1
2
3
4
5
6
7
8
9
10
11

另外,你可以使用 pool.submit() 来手动的提交单个任务:

# Some function
def work(x):
    ...
    return result

with ProcessPoolExecutor() as pool:
    ...
    # Example of submitting work to the pool
    future_result = pool.submit(work, arg)

    # Obtaining the result (blocks until done)
    r = future_result.result()
    ...
1
2
3
4
5
6
7
8
9
10
11
12
13

如果你手动提交一个任务,结果是一个 Future 实例。 要获取最终结果,你需要调用它的 result() 方法。 它会阻塞进程直到结果被返回来。

如果不想阻塞,你还可以使用一个回调函数,例如:

def when_done(r):
    print('Got:', r.result())

with ProcessPoolExecutor() as pool:
     future_result = pool.submit(work, arg)
     future_result.add_done_callback(when_done)
1
2
3
4
5
6

回调函数接受一个 Future 实例,被用来获取最终的结果(比如通过调用它的result()方法)。 尽管处理池很容易使用,在设计大程序的时候还是有很多需要注意的地方,如下几点:

  • 这种并行处理技术只适用于那些可以被分解为互相独立部分的问题。
  • 被提交的任务必须是简单函数形式。对于方法、闭包和其他类型的并行执行还不支持。
  • 函数参数和返回值必须兼容pickle,因为要使用到进程间的通信,所有解释器之间的交换数据必须被序列化
  • 被提交的任务函数不应保留状态或有副作用。除了打印日志之类简单的事情,

一旦启动你不能控制子进程的任何行为,因此最好保持简单和纯洁——函数不要去修改环境。

  • 在Unix上进程池通过调用 fork() 系统调用被创建,

它会克隆Python解释器,包括fork时的所有程序状态。 而在Windows上,克隆解释器时不会克隆状态。 实际的fork操作会在第一次调用 pool.map() 或 pool.submit() 后发生。

  • 当你混合使用进程池和多线程的时候要特别小心。

你应该在创建任何线程之前先创建并激活进程池(比如在程序启动的main线程中创建进程池)。

编辑 (opens new window)
上次更新: 2023/10/13, 17:39:25
创建一个线程池
Python的全局锁问题

← 创建一个线程池 Python的全局锁问题→

最近更新
01
牛客网非技术快速入门SQL练习题
03-08
02
其他日常SQL题
03-07
03
用户与权限管理
03-05
更多文章>
Theme by Vdoing | Copyright © 2021-2023 | Weibw | 辽ICP备18015889号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式