Python并发编程(下)功能强大设置简单的分布式多进程生产者与消费者模式

作者:J.sky · 发表于:
2017-10-07T14:09:46.000000Z
· 更新于:
2023-08-13T22:54:33.234776Z
· Tag: Python基础

输入图片说明Python的日常生活中,当你要进行一系列超大型密集的计算任务或是超多未知的数据需要采集时,一台机器或许已经无法满足你的需求了,这时就得考虑使用到分布式操作进行任务处理了。

一个无聊而又变态的需求

我们假设有一个无聊而又变态的任务:有一系列的任务会成生一组随机>=0的整数队列[n,n,n,.....n,n],我们需要立即取得他们的n*n自乘结果。当然你也可以使用单线程进行相关的操作,但是由于计算任务巨大,我们不得采用分布式多进程生产者与消费者模式来进行程序的执行了,好吧,我们先了解一下相关术语。

分布式

在一个分布式系统中,一组独立的计算机展现给用户的是一个统一的整体,就好像是一个系统似的。这种情景下就是分布式。

为什么要用分布式多进程而不是多线程?

由于Python解释器中使用了内部的GIL全局解释器锁,使得Python多线程的并发在任意时刻只允许单个CPU来运行,这样的运行方式会影响程序的并发。所以多线程并不适用于高密度的计算任务,由于计算任务数量庞大,所以采用分布式可以大大提高计算的速度。

关于Python的进程

请点击传送门:Python并发编程(上)进程模块multiprocessing模块和Process类

Managers

管理器提供一种创建可在不同进程之间共享的数据的方法,包括在不同机器上运行的进程之间通过网络共享。管理员对象控制管理共享对象的服务器进程。其他进程可以通过使用代理访问共享对象。这里我们使用multiprocessing.managers.BaseManager来创建一个多进程管理器,对网络的任务进行注册和管理。BaseManager下边封装了很多相关的方法,大大简化了网络上进程之间的通信,可以方便的组建分布式的进程任务。

Queue

Queue实现多生产者,多消费者队列。它特别适用于信息必须在多个线程间安全地交换的多线程程序中。在本次分布式多进程的任务Queue起到了非常重要的作用。

开始编码前

我们先构思一下程序需要的模块:

  • 上帝(God也是服务器,服务器负责注册队列queue,并接收并查看到最终结果。)
  • 生产者(Producer,负责生产出需要进行计算的数据并发送给消费者。)
  • 消费者(Consumer,负责接收生产者发来的数据并进行计算,最后把结果发送给上帝看.)

相关思维导图:如果下 输入图片说明

程序构建要点:

class GodManager(BaseManager):
    pass

注册一个管理器,负责注册或获取网上的队列Queue

服务器上需要创建程序中必需的queue,并注册到网上。

GodManager(address=('192.168.0.88',5678),authkey=b'www.17python.com')

注册服务器,通过start()启动,通过connect()连接。通过manager.pq()类似的方法获取网上的队列。

注意,分布式多进程中一定要获取网上注册的queue,否则会造成数据错乱。

另外,测试时,请在终端中启动程序,网络连接错误的,建议查看一下防火墙是不是禁止了网络的连通,不要在IDE中使用调试来测试本程序。

God源码

import random, time, queue
from multiprocessing.managers import BaseManager
# 创建任务需要的两个队列
pq = queue.Queue()
cq = queue.Queue()
# 注册一个管理器,注册Queue队列到网,供其它终端使用。
class GodManager(BaseManager):
    pass

#把任务队列通过管理器注册到网上,这样就可以在多台机器间访问通信,做到分布式通信。
GodManager.register('pq',callable=lambda:pq)
GodManager.register('cq',callable=lambda:cq)
#设置服务器的ip、端口及密码
manager = GodManager(address=('192.168.0.88',5678),authkey=b'www.17python.com')
manager.start()#启动服务器
print('服务器已经启动!')
#重新获取已经在网上注册的队列,使用队列名()方法来获得网上注册的队列名。
p_q = manager.pq()
c_q = manager.cq()

while True:
    if c_q.empty():#如果结果集队列空
        time.sleep(1)
        print("静静的等待计算结果中。。。。。。")
    else:
        r = c_q.get()#获得传来的计算结果
        print("收到任务计算结果%s" % r)

Producer 生产者

import random, time
from multiprocessing.managers import BaseManager

# 注册一个管理器,负责管理调度网上注册的Queue队列
class ProducerMagager(BaseManager):
    pass
#获取网络上的Queue 生产者,只关心生产需要计算的数据即可。
ProducerMagager.register('pq')
# 注册生产者服务器,address 真写IP及端口,authkey是一个密码,如果需要访问此处必须与服务器一致。
pm = ProducerMagager(address=('192.168.0.88',5678), authkey=b'www.17python.com')
pm.connect()#连接服务器
print('生产者服务器已经准备就绪!')
task = pm.pq()#获取生产者的队列
k = 1
#
while True:
    for i in range(10):
        r = random.randint(0,999)
        task.put(r)
    print("第{0}轮任务完毕!稍后继续!".format(k))
    k += 1
    time.sleep(3)

Consumer 消费者

import time
from multiprocessing.managers import BaseManager

# 注册一个管理器,负责管理调度网上注册的Queue队列
class ConsumerMagager(BaseManager):
    pass

#获取网络上的Queue 消费者,需要获取任务,计算后发送任务。
ConsumerMagager.register('pq')
ConsumerMagager.register('cq')

m = ConsumerMagager(address=('192.168.0.88',5678),authkey=b'www.17python.com')
m.connect()#连接服务器
pq = m.pq()
cq = m.cq()
#开始计算任务
while True:
    if not pq.empty():#如果任务队列不为空
        n = pq.get(timeout=1)#如果超。
        print('收到计算任务{0}*{1}={2}'.format(n,n,n*n))
        cq.put('%d * %d = %d' %(n,n,n*n))
    else:
        time.sleep(1)
        print("好无聊,我在等待任务安排中")

在终端分别启动python3 God.py python3 producer.py python3 Consumer.py

输入图片说明 输入图片说明 输入图片说明

然后就可以观察终端打印的结果,服务器终端显示着计算回来的结果,除了服务器以外,生产者和消费者都可以分布式在各个终端上启动多个进行任务的生成和计算。 我这里只在本机上进行测试了,如果只是在一台机器上进行多进程的计算,可以不必网上注册,使用唯一的Queue进行通信即可。

别小睢这个小小的分布式,稍加修改任务即可变成强大的分布式采集程序,看你怎么用了。

参考文档

本文源码下载: