Weibw's World Weibw's World
首页
  • HTML
  • Python

    • Python基础知识
    • Python CookBook第三版
    • Flask
  • MySQL

    • MySQL基础知识
    • MySQL调优
    • MySQL面试题
算法
  • FineReport
  • Kettle
  • Git
  • 微信公众号文章
  • 优秀博客文章
  • 其他
收藏夹
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Weibw

一个没有梦想的咸鱼
首页
  • HTML
  • Python

    • Python基础知识
    • Python CookBook第三版
    • Flask
  • MySQL

    • MySQL基础知识
    • MySQL调优
    • MySQL面试题
算法
  • FineReport
  • Kettle
  • Git
  • 微信公众号文章
  • 优秀博客文章
  • 其他
收藏夹
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 《Flask》

  • 《Python Cookbook》第三版

    • 第一章:数据结构与算法

    • 第二章:字符串和文本

    • 第三章:数字日期和时间

    • 第四章:迭代器与生成器

    • 第五章:文件与IO

    • 第六章:数据编码和处理

    • 第七章:函数

    • 第八章:类与对象

    • 第九章:元编程

    • 第十章:模块与包

    • 第十一章:网络与Web编程

    • 第十二章:并发编程

      • 启动与停止线程
      • 判断线程是否已经启动
      • 线程间通信
      • 给关键部分加锁
      • 防止死锁的加锁机制
      • 保存线程的状态信息
      • 创建一个线程池
      • 简单的并行编程
      • Python的全局锁问题
      • 定义一个Actor任务
      • 实现消息发布-订阅模型
        • 问题
        • 解决方案
        • 讨论
      • 使用生成器代替线程
      • 多个线程队列轮询
      • 在Unix系统上面启动守护进程
    • 第十三章:脚本编程与系统管理

    • 第十四章:测试、调试和异常

    • 第十五章:C语言扩展

  • Python基础

  • Python
  • 《Python Cookbook》第三版
  • 第十二章:并发编程
weibw
2022-01-18

实现消息发布-订阅模型

# 问题

你有一个基于线程通信的程序,想让它们实现发布/订阅模式的消息通信。

# 解决方案

要实现发布/订阅的消息通信模式, 你通常要引入一个单独的“交换机”或“网关”对象作为所有消息的中介。 也就是说,不直接将消息从一个任务发送到另一个,而是将其发送给交换机, 然后由交换机将它发送给一个或多个被关联任务。下面是一个非常简单的交换机实现例子:

from collections import defaultdict

class Exchange:
    def __init__(self):
        self._subscribers = set()

    def attach(self, task):
        self._subscribers.add(task)

    def detach(self, task):
        self._subscribers.remove(task)

    def send(self, msg):
        for subscriber in self._subscribers:
            subscriber.send(msg)

# Dictionary of all created exchanges
_exchanges = defaultdict(Exchange)

# Return the Exchange instance associated with a given name
def get_exchange(name):
    return _exchanges[name]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

一个交换机就是一个普通对象,负责维护一个活跃的订阅者集合,并为绑定、解绑和发送消息提供相应的方法。 每个交换机通过一个名称定位,get_exchange() 通过给定一个名称返回相应的 Exchange 实例。

下面是一个简单例子,演示了如何使用一个交换机:

# Example of a task.  Any object with a send() method

class Task:
    ...
    def send(self, msg):
        ...

task_a = Task()
task_b = Task()

# Example of getting an exchange
exc = get_exchange('name')

# Examples of subscribing tasks to it
exc.attach(task_a)
exc.attach(task_b)

# Example of sending messages
exc.send('msg1')
exc.send('msg2')

# Example of unsubscribing
exc.detach(task_a)
exc.detach(task_b)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

尽管对于这个问题有很多的变种,不过万变不离其宗。 消息会被发送给一个交换机,然后交换机会将它们发送给被绑定的订阅者。

# 讨论

通过队列发送消息的任务或线程的模式很容易被实现并且也非常普遍。 不过,使用发布/订阅模式的好处更加明显。

首先,使用一个交换机可以简化大部分涉及到线程通信的工作。 无需去写通过多进程模块来操作多个线程,你只需要使用这个交换机来连接它们。 某种程度上,这个就跟日志模块的工作原理类似。 实际上,它可以轻松的解耦程序中多个任务。

其次,交换机广播消息给多个订阅者的能力带来了一个全新的通信模式。 例如,你可以使用多任务系统、广播或扇出。 你还可以通过以普通订阅者身份绑定来构建调试和诊断工具。 例如,下面是一个简单的诊断类,可以显示被发送的消息:

class DisplayMessages:
    def __init__(self):
        self.count = 0
    def send(self, msg):
        self.count += 1
        print('msg[{}]: {!r}'.format(self.count, msg))

exc = get_exchange('name')
d = DisplayMessages()
exc.attach(d)
1
2
3
4
5
6
7
8
9
10

最后,该实现的一个重要特点是它能兼容多个“task-like”对象。 例如,消息接受者可以是actor(12.10小节介绍)、协程、网络连接或任何实现了正确的 send() 方法的东西。

关于交换机的一个可能问题是对于订阅者的正确绑定和解绑。 为了正确的管理资源,每一个绑定的订阅者必须最终要解绑。 在代码中通常会是像下面这样的模式:

exc = get_exchange('name')
exc.attach(some_task)
try:
    ...
finally:
    exc.detach(some_task)
1
2
3
4
5
6

某种意义上,这个和使用文件、锁和类似对象很像。 通常很容易会忘记最后的 detach() 步骤。 为了简化这个,你可以考虑使用上下文管理器协议。 例如,在交换机对象上增加一个 subscribe() 方法,如下:

from contextlib import contextmanager
from collections import defaultdict

class Exchange:
    def __init__(self):
        self._subscribers = set()

    def attach(self, task):
        self._subscribers.add(task)

    def detach(self, task):
        self._subscribers.remove(task)

    @contextmanager
    def subscribe(self, *tasks):
        for task in tasks:
            self.attach(task)
        try:
            yield
        finally:
            for task in tasks:
                self.detach(task)

    def send(self, msg):
        for subscriber in self._subscribers:
            subscriber.send(msg)

# Dictionary of all created exchanges
_exchanges = defaultdict(Exchange)

# Return the Exchange instance associated with a given name
def get_exchange(name):
    return _exchanges[name]

# Example of using the subscribe() method
exc = get_exchange('name')
with exc.subscribe(task_a, task_b):
     ...
     exc.send('msg1')
     exc.send('msg2')
     ...

# task_a and task_b detached here
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

最后还应该注意的是关于交换机的思想有很多种的扩展实现。 例如,交换机可以实现一整个消息通道集合或提供交换机名称的模式匹配规则。 交换机还可以被扩展到分布式计算程序中(比如,将消息路由到不同机器上面的任务中去)。

编辑 (opens new window)
上次更新: 2023/10/13, 17:39:25
定义一个Actor任务
使用生成器代替线程

← 定义一个Actor任务 使用生成器代替线程→

最近更新
01
牛客网非技术快速入门SQL练习题
03-08
02
其他日常SQL题
03-07
03
用户与权限管理
03-05
更多文章>
Theme by Vdoing | Copyright © 2021-2023 | Weibw | 辽ICP备18015889号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式