Weibw's World Weibw's World
首页
  • HTML
  • Python

    • Python基础知识
    • Python CookBook第三版
    • Flask
  • MySQL

    • MySQL基础知识
    • MySQL调优
    • MySQL面试题
算法
  • FineReport
  • Kettle
  • Git
  • 微信公众号文章
  • 优秀博客文章
  • 其他
收藏夹
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Weibw

一个没有梦想的咸鱼
首页
  • HTML
  • Python

    • Python基础知识
    • Python CookBook第三版
    • Flask
  • MySQL

    • MySQL基础知识
    • MySQL调优
    • MySQL面试题
算法
  • FineReport
  • Kettle
  • Git
  • 微信公众号文章
  • 优秀博客文章
  • 其他
收藏夹
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 《Flask》

  • 《Python Cookbook》第三版

    • 第一章:数据结构与算法

    • 第二章:字符串和文本

    • 第三章:数字日期和时间

    • 第四章:迭代器与生成器

    • 第五章:文件与IO

    • 第六章:数据编码和处理

    • 第七章:函数

    • 第八章:类与对象

    • 第九章:元编程

    • 第十章:模块与包

    • 第十一章:网络与Web编程

    • 第十二章:并发编程

      • 启动与停止线程
      • 判断线程是否已经启动
      • 线程间通信
      • 给关键部分加锁
      • 防止死锁的加锁机制
      • 保存线程的状态信息
      • 创建一个线程池
      • 简单的并行编程
      • Python的全局锁问题
      • 定义一个Actor任务
      • 实现消息发布-订阅模型
      • 使用生成器代替线程
      • 多个线程队列轮询
        • 问题
        • 解决方案
        • 讨论
      • 在Unix系统上面启动守护进程
    • 第十三章:脚本编程与系统管理

    • 第十四章:测试、调试和异常

    • 第十五章:C语言扩展

  • Python基础

  • Python
  • 《Python Cookbook》第三版
  • 第十二章:并发编程
weibw
2022-01-18

多个线程队列轮询

# 问题

你有一个线程队列集合,想为到来的元素轮询它们, 就跟你为一个客户端请求去轮询一个网络连接集合的方式一样。

# 解决方案

对于轮询问题的一个常见解决方案中有个很少有人知道的技巧,包含了一个隐藏的回路网络连接。 本质上讲其思想就是:对于每个你想要轮询的队列,你创建一对连接的套接字。 然后你在其中一个套接字上面编写代码来标识存在的数据, 另外一个套接字被传给 select() 或类似的一个轮询数据到达的函数。下面的例子演示了这个思想:

import queue
import socket
import os

class PollableQueue(queue.Queue):
    def __init__(self):
        super().__init__()
        # Create a pair of connected sockets
        if os.name == 'posix':
            self._putsocket, self._getsocket = socket.socketpair()
        else:
            # Compatibility on non-POSIX systems
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.bind(('127.0.0.1', 0))
            server.listen(1)
            self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self._putsocket.connect(server.getsockname())
            self._getsocket, _ = server.accept()
            server.close()

    def fileno(self):
        return self._getsocket.fileno()

    def put(self, item):
        super().put(item)
        self._putsocket.send(b'x')

    def get(self):
        self._getsocket.recv(1)
        return super().get()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

在这个代码中,一个新的 Queue 实例类型被定义,底层是一个被连接套接字对。 在Unix机器上的 socketpair() 函数能轻松的创建这样的套接字。 在Windows上面,你必须使用类似代码来模拟它。 然后定义普通的 get() 和 put() 方法在这些套接字上面来执行I/O操作。 put() 方法再将数据放入队列后会写一个单字节到某个套接字中去。 而 get() 方法在从队列中移除一个元素时会从另外一个套接字中读取到这个单字节数据。

fileno() 方法使用一个函数比如 select() 来让这个队列可以被轮询。 它仅仅只是暴露了底层被 get() 函数使用到的socket的文件描述符而已。

下面是一个例子,定义了一个为到来的元素监控多个队列的消费者:

import select
import threading

def consumer(queues):
    '''
    Consumer that reads data on multiple queues simultaneously
    '''
    while True:
        can_read, _, _ = select.select(queues,[],[])
        for r in can_read:
            item = r.get()
            print('Got:', item)

q1 = PollableQueue()
q2 = PollableQueue()
q3 = PollableQueue()
t = threading.Thread(target=consumer, args=([q1,q2,q3],))
t.daemon = True
t.start()

# Feed data to the queues
q1.put(1)
q2.put(10)
q3.put('hello')
q2.put(15)
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

如果你试着运行它,你会发现这个消费者会接受到所有的被放入的元素,不管元素被放进了哪个队列中。

# 讨论

对于轮询非类文件对象,比如队列通常都是比较棘手的问题。 例如,如果你不使用上面的套接字技术, 你唯一的选择就是编写代码来循环遍历这些队列并使用一个定时器。像下面这样:

import time
def consumer(queues):
    while True:
        for q in queues:
            if not q.empty():
                item = q.get()
                print('Got:', item)

        # Sleep briefly to avoid 100% CPU
        time.sleep(0.01)
1
2
3
4
5
6
7
8
9
10

这样做其实不合理,还会引入其他的性能问题。 例如,如果新的数据被加入到一个队列中,至少要花10毫秒才能被发现。 如果你之前的轮询还要去轮询其他对象,比如网络套接字那还会有更多问题。 例如,如果你想同时轮询套接字和队列,你可能要像下面这样使用:

import select

def event_loop(sockets, queues):
    while True:
        # polling with a timeout
        can_read, _, _ = select.select(sockets, [], [], 0.01)
        for r in can_read:
            handle_read(r)
        for q in queues:
            if not q.empty():
                item = q.get()
                print('Got:', item)
1
2
3
4
5
6
7
8
9
10
11
12

这个方案通过将队列和套接字等同对待来解决了大部分的问题。 一个单独的 select() 调用可被同时用来轮询。 使用超时或其他基于时间的机制来执行周期性检查并没有必要。 甚至,如果数据被加入到一个队列,消费者几乎可以实时的被通知。 尽管会有一点点底层的I/O损耗,使用它通常会获得更好的响应时间并简化编程。

编辑 (opens new window)
上次更新: 2023/10/13, 17:39:25
使用生成器代替线程
在Unix系统上面启动守护进程

← 使用生成器代替线程 在Unix系统上面启动守护进程→

最近更新
01
牛客网非技术快速入门SQL练习题
03-08
02
其他日常SQL题
03-07
03
用户与权限管理
03-05
更多文章>
Theme by Vdoing | Copyright © 2021-2023 | Weibw | 辽ICP备18015889号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式