加入收藏 | 设为首页 |

电动牙刷怎么用-Python分布式进程中你会遇到的坑

海外新闻 时间: 浏览:174 次

写在前面

小惊大怪

你是不是在用Python3或许在windows体系上编程?最重要的是你对进程和线程不是很清楚?那么祝贺你,在python散布式进程中,会有坑等着你去挖。。。(hahahaha,此处答应我吓唬一下你)恶作剧的啦,不过,假如你知道序列中不支撑匿名函数,那这个坑就和你say byebye了。好了话不大都,直接进入正题。

散布式进程

正如咱们所知道的Process比Thread更安稳,并且Process能够散布到多台机器上,而Thread最多只能散布到同一台机器的多个CPU上。Python的multiprocessing模块不光支撑多进程,其间managers子模块还支撑把多进程散布到多台机器上。一个服务进程能够作为调度者,将使命散布到其他多个进程中,依托网络通讯。因为managers模块封装很好,不用了解网络通讯的细节,就能够很容易地编写散布式多进程程序。

代码记载

举个比方

假如咱们现已有一个经过Queue通讯的多进程程序在同一台机器上运转,现在,因为处理使命的进程使命深重,期望把发送使命的进程和处理使命的进程散布到两台机器上,这应该怎样用散布式进程来完结呢?你现已知道了原有的Queue能够持续运用,并且经过managers模块把Queue经过网络露出出去,就能够让其他机器的进程来拜访Queue了。好,那咱们就这么干!

写个task_master.py

咱们先看服务进程。服务进程担任发动Queue,把Queue注册到网络上,然后往Queue里边写入使命。

#!/user/bin/pytthon
# -*- coding:utf-8 -*-
# @Time: 2018/3/3 16:46
# @Author: lichexo
# @File: task_master.py
import random, time, queue
from multiprocessing.managers import BaseManager
# 发送使命的行列:
task_queue = queue.Queue()
# 接纳成果的行列:
result_queue = queue.Queue()
# 从BaseManager承继的QueueManager:
class QueueManager(BaseManager):
pass
# 把两个Queue都注册到网络上, callable参数相关了Queue方针:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 发动Queue:
manager.start()
# 取得经过网络拜访的Queue方针:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个使命进去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 从result行列读取成果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
#电动牙刷怎么用-Python分布式进程中你会遇到的坑 封闭:
manager.shutdown()
print('master exit.')

请留意,当咱们在一台机器上写多进程程序时,创立的Queue能够直接拿来用,可是,在散布式多进程环境下,增加使命到Queue不能够直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,有必要经过manager.get_task_queue()取得的Queue接口增加。然后,在另一台机器上发动使命进程(本机上发动也能够)

写个task_worker.py

#!/user/bin/pytthon
# -*- coding:utf-8 -*-
# @Time: 2018/3/3 16:46
# @Author: lichexo
# @File: task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# 创立相似的QueueManager:
class QueueManager(BaseManager):
pass
# 因为这个QueueManager只从网络上获取Queue,所以注册时只提供姓名:
QueueManager.register('get_task_que电动牙刷怎么用-Python分布式进程中你会遇到的坑ue')
QueueMa电动牙刷怎么用-Python分布式进程中你会遇到的坑nager.register('get_result_queue')
# 衔接到服务器,也便是运转task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码留意坚持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络衔接:
m.connect()
# 获取Queue的方针:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task行列取使命,并把成果写入result行列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 处理完毕:
print('worker exit.')

使命进程要经过网络衔接到服务进程,所以要指定服务进程的IP。

运转成果

现在,能够试试散布式进程的作业作用了。先发动task_master.py服务进程:

Traceback (most recent call last):
File "F:/Python/untitled/xianchengjincheng/master.py", line 25, in
manager.start()
File "F:\Python\pystall\lib\multiprocessing\managers.py", line 513, in start
self._process.start()
File "F:\Python\pystall\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "F:\Python\pystall\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "F:\Python\pystall\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
reduction.dump(process_obj, to_child)
File "F:\Python\pystall\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle at 0x00000202D1921E18>: attribute lookup on __main__ failed

task_master.py进程发送完使命后,开端等候result行列的成果。现在发动task_worker.py进程:

Connect to server 127.0.0.1...
Traceback (most recent call last):
File "F:/Python/untitled/xianchengjincheng/work.py", line 24, in
m.connect()
File "F:\Python\pystall\lib\multiprocessing\managers.py", line 489, in connect
conn = Client(self._address, authkey=self._authkey)
File "F:\Python\pystall\lib\multiprocessing\connection.py", line 487, in Client
c = SocketClient(address)
File "F:\Python\pystall\lib\multiprocessing\connection.py", line 614, in SocketClient
s.connect(address)
ConnectionRefusedError: [WinError 10061] 因为方针核算机活跃回绝,无法衔接。

看到没,成果都出错了,咱们好好剖析一下究竟哪出错了。。。

过错剖析

在task_master.py的报错提示中,咱们知道它说lambda过错,这是因为序列化不电动牙刷怎么用-Python分布式进程中你会遇到的坑支撑匿名函数,所以咱们得修正代码,从头对queue用QueueManager进行封装放到网络中。

# 把两个Queue都注册到网络上, callable参数相关了Queue方针
QueueManager.register('get_task_queue',callable=return_task_queue)
QueueManager.register('get_result_queue',callable=return_result_queue)

其间task_queue和result_queue是两个行列,别离寄存使命和成果。它们用来进行进程间通讯,交流方针。

因为是散布式的环境,放入queue中的数据需求等候Workers机器运算处理后再进行读取,这样就需求对queue用QueueManager进行封装放到网络中,这是经过上面的2行代码来完结的。咱们给return_task_queue的网络调用接口取了一个名get_task_queue,而return_result_queue的姓名是get_result_queue,便利区分对哪个queue进行操作。前端task.put(n)便是对task_queue进行写入数据,相当于分配使命。而result.get()便是等候workers机器处理后回来的成果。

值得留意 在windows体系中你有必要要写IP地址,而其他操作体系比方linux操作体系则就不要了。

 # windows需求写ip地址
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')

修正后的代码

在task_master.py中修正如下:

#!/user/bin/pytthon
# -*- coding:utf-8 -*-
# @Time: 2018/3/3 16:46
# @Author: lichexo
# @File: task_master.py
# task_master.py
import random,time,queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
task_queue = queue.Queue() # 发送使命的行列:
result_queue = queue.Queue() # 接纳成果的行列:
class QueueManager(BaseManager): # 从BaseManager承继的QueueManager:
pass
# windows下运转
def return_task_queue():
global task_queue
return task_queue # 回来发送使命行列
def return_result_queue ():
global result_queue
return result_queue # 回来接纳成果行列
def test():
# 把两个Queue都注册到网络上, callable参数相关了Queue方针,它们用来进行进程间通讯,交流方针
#QueueManager.register('get_task_queue', callable=lambda: task_queue)
#QueueManager.register('get_result_queue', callable=lambda: result_queue)
QueueManager.register('get_task_queue', callable=return_task_queue)
QueueManager.register('get_result_queue', callable=return_result_queue)
# 绑定端口5000, 设置验证码'abc':
#manager = QueueManager(address=('', 5000), authkey=b'abc')
# windows需求写ip地址
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
manager.start() # 发动Queue:
# 取得经过网络拜访的Queue方针:
task = manager.get_task_queue()
result = manager.get_result_queue()
for i in range(10): # 放几个使命进去:
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 从result行列读取成果:
print('Try get results...')
for i in range(10):
# 这儿加了反常捕获
try:
r = result.get(timeout=5)
print电动牙刷怎么用-Python分布式进程中你会遇到的坑('Result: %s' % r)
except queue.Empty:
print('result queue is empty.')
# 封闭:
manager.shutdown()
print('master exit.')
if __name__=='__main__':
freeze_support()
print('start!')
test()

在task_worker.py中修正如下:

#!/user/bin/pytthon
# -*- coding:utf-8 -*-
# @Time: 2018/3/3 16:46
# @Author: lichexo
# @File: task_worker.py
# task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# 创立相似的QueueManager:
class QueueManager(BaseManager):
pass
# 因为这个QueueManager只从网络上获取Queue,所以注册时只提供姓名:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 衔接到服务器,也便是运转task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码留意坚持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络衔接:
m.connect()
# 获取Queue的方针:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task行列取使命,并把成果写入result行列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print('task queue is empty.')
# 处理完毕:
print('worker exit.')

先运转task_master.py,然后再运转task_worker.py

(1)task_master.py运转成果如下

start!
Put task 7872...
Put task 6931...
Put task 1395...
Put task 8477...
Put task 8300...
Put task 1597...
Put task 8738...
Put task 8627...
Put task 1884...
Put task 2561...
Try get results...
Result: 7872 * 7872 = 61968384
Result: 6931 * 6931 = 48038761
Result: 1395 * 1395 = 1946025
Result: 8477 * 8477 = 71859529
Result: 8300 * 8300 = 68890000
Result: 1597 * 1597 = 2550409
Result: 8738 * 8738 = 76352644
Result: 8627 * 8627 = 74425129
Result: 1884 * 1884 = 3549456
Result: 2561 * 25电动牙刷怎么用-Python分布式进程中你会遇到的坑61 = 6558721
master exit.

(2)task_worker.py运转成果如下

Connect to server 127.0.0.1...
run task 8640 * 8640...
run task 7418 * 7418...
run task 9303 * 9303...
run task 568 * 568...
run task 1633 * 1633...
run task 3583 * 3583...
run task 3293 * 3293...
run task 8975 * 8975...
run task 8189 * 8189...
run task 731 * 731...
worker exit.

常识弥补

这个简略的Master/Worker模型有什么用?其实这便是一个简略但真实的散布式核算,把代码稍加改造,发动多个worker,就能够把使命散布到几台乃至几十台机器上,比方把核算n*n的代码换成发送邮件,就完结了邮件行列的异步发送。

Queue方针存储在哪?留意到task_worker.py中底子没有创立Queue的代码,所以,Queue方针存储在task_master.py进程中:


而Queue之所以能经过网络拜访,便是经过QueueManager完结的。因为QueueManager办理的不止一个Queue,所以,要给每个Queue的网络调用接口起个姓名,比方get_task_queue。task_worker这儿的QueueManager注册的姓名有必要和task_manager中的相同。比照上面的比方,能够看出Queue方针从另一个进程经过网络传递了过来。只不过这儿的传递和网络通讯由QueueManager完结。

authkey有什么用?这是为了确保两台机器正常通讯,不被其他机器歹意搅扰。假如task_worker.py的authkey和task_master.py的authkey不一致,必定衔接不上。

今日的共享就到这儿了,假如你有任何不明白的问题,能够发信息或许留言喽。