本文從參考了網(wǎng)絡(luò)上的許多內(nèi)容,主要為分布式進(jìn)程及其的評(píng)論內(nèi)容。
所謂分布式運(yùn)算,既可以指在同一臺(tái)機(jī)器上利用多進(jìn)程(線程)進(jìn)行運(yùn)算,又可以指將計(jì)算任務(wù)進(jìn)行分解,利用多臺(tái)機(jī)器進(jìn)行運(yùn)算。本文中的分布式單指最后一種。
python的標(biāo)準(zhǔn)庫(kù)
multiprocessing
中存在一個(gè)
managers
的子模塊,該模塊支持將多進(jìn)程分布到多臺(tái)機(jī)器上。選擇一個(gè)進(jìn)程來(lái)調(diào)度任務(wù),其他進(jìn)程進(jìn)行計(jì)算,從而實(shí)現(xiàn)分布式運(yùn)算。而調(diào)度進(jìn)程和計(jì)算進(jìn)程之間的通信,是通過(guò)網(wǎng)絡(luò)來(lái)進(jìn)行的,用到了python的
socket
模塊。
1. linux中的分布式運(yùn)算
以參考文章中的例子來(lái)進(jìn)行說(shuō)明,首先寫一個(gè)調(diào)度程序:
# task_manager.py
import
random
,
queue
from
multiprocessing
.
managers
import
BaseManager
# 發(fā)送任務(wù)的隊(duì)列
task_queue
=
queue
.
Queue
(
)
# 接收結(jié)果的隊(duì)列
result_queue
=
queue
.
Queue
(
)
# 將兩個(gè)隊(duì)列注冊(cè)到網(wǎng)絡(luò)上,可以被其他機(jī)器訪問(wèn)
BaseManager
.
register
(
'get_task_queue'
,
callable
=
lambda
:
task_queue
)
BaseManager
.
register
(
'get_result_queue'
,
callable
=
lambda
:
result_queue
)
# 實(shí)例化一個(gè)manager,綁定端口5000,ip為空表名為本地ip,設(shè)置驗(yàn)證碼
manager
=
BaseManager
(
address
=
(
''
,
5000
)
,
authkey
=
b
'abc'
)
# 啟動(dòng)manager
manager
.
start
(
)
# 獲取網(wǎng)絡(luò)中的queue對(duì)象
task
=
manager
.
get_task_queue
(
)
result
=
manager
.
get_result_queue
(
)
# 放幾個(gè)任務(wù)進(jìn)去
for
i
in
range
(
10
)
:
n
=
random
.
randint
(
0
,
10000
)
print
(
'Put task %d...'
%
n
)
task
.
put
(
n
)
# 監(jiān)聽(tīng)是否有結(jié)果傳回
print
(
'Try get results...'
)
for
i
in
range
(
10
)
:
r
=
result
.
get
(
timeout
=
10
)
print
(
'Result: %s'
%
r
)
# 關(guān)閉
manager
.
shutdown
(
)
再寫一個(gè)執(zhí)行任務(wù)的程序:
# task_worker.py
import
time
,
queue
from
multiprocessing
.
managers
import
BaseManager
# 獲取網(wǎng)絡(luò)上被master注冊(cè)的queue
BaseManager
.
register
(
'get_task_queue'
)
BaseManager
.
register
(
'get_result_queue'
)
# 連接到服務(wù)器,即任務(wù)分配進(jìn)程的地址
server_address
=
'191.168.6.67'
print
(
f
'Connecting to server {server_address}...'
)
# 注意驗(yàn)證碼要保持一致
m
=
BaseManager
(
address
=
(
server_address
,
5000
)
,
authkey
=
b
'abc'
)
m
.
connect
(
)
# 獲取queue對(duì)象
task
=
m
.
get_task_queue
(
)
result
=
m
.
get_result_queue
(
)
# 開(kāi)始進(jìn)行計(jì)算
for
i
in
range
(
10
)
:
try
:
n
=
task
.
get
(
timeout
=
1
)
# 獲取manager進(jìn)程放入task中的值
print
(
f
'run task {n}*{n}'
)
r
=
f
'{n} * {n} = {n*n}'
time
.
sleep
(
1
)
result
.
put
(
r
)
# 將計(jì)算的結(jié)果放入result隊(duì)列中
except
queue
.
Empty
:
print
(
'task queue is empty.'
)
print
(
'worker exit.'
)
將上面兩個(gè)程序分別拷貝到局域網(wǎng)中的兩臺(tái)linux服務(wù)器上,先運(yùn)行
task_manager.py
,隨即運(yùn)行
task_worker.py
,就可以看到計(jì)算的結(jié)果了。
注意,在運(yùn)行
task_worker.py
時(shí),可能會(huì)提示一下錯(cuò)誤:
OSError: [Errno 113] No route to host
這是由于系統(tǒng)設(shè)置了防火墻,屏蔽了通過(guò)設(shè)置的端口進(jìn)行通信導(dǎo)致的。
我使用的linux是centos,要查看哪些端口開(kāi)放,可以使用:
firewall-cmd --list-ports
開(kāi)放5000端口,可以使用:
firewall-cmd --zone=public --add-port=5000/tcp --permanent
設(shè)置完成后運(yùn)行:
firewall-cmd --reload
重啟防火墻進(jìn)行生效。
執(zhí)行完上述操作后,在linux(centos)下運(yùn)行應(yīng)該沒(méi)有問(wèn)題。
2. windows中的分布式運(yùn)算
注意,以上腳本在windows系統(tǒng)中是無(wú)法運(yùn)行的,經(jīng)過(guò)我的試驗(yàn),主要發(fā)現(xiàn)了兩個(gè)問(wèn)題:
一、
lambda
定義函數(shù)的問(wèn)題
在
task_manager.py
中,將隊(duì)列注冊(cè)到網(wǎng)絡(luò)上時(shí),對(duì)于其中的
callable
參數(shù),使用的是以
lambda
關(guān)鍵字定義的函數(shù),在windows下這是行不通的。因此,需要顯示地定義兩個(gè)函數(shù),然后傳遞給
callable
:
_task
=
queue
.
Queue
(
)
_result
=
queue
.
Queue
(
)
def
task_queue
(
)
:
return
_task
def
result_queue
(
)
:
return
_result
然后,修改
callable=lambda: task_queue
和
callable=lambda: result_queue
分別為
callable=task_queue
和
callable=result_queue
。
二、__name__問(wèn)題
在windows下,
manager.start()
及其之后的命令需要放在
if __name__ == '__main__':
語(yǔ)句塊中執(zhí)行,否則會(huì)報(bào)錯(cuò)。
三、綁定地址問(wèn)題
在linux下,實(shí)例化一個(gè)manager時(shí),ip可以留空,默認(rèn)為本機(jī)ip,但在windows下面行不通。可以指定為
localhost
或
127.0.0.1
,這樣,在同一臺(tái)機(jī)器上分別運(yùn)行兩個(gè)腳本是沒(méi)有問(wèn)題的,但將腳本分別放到兩臺(tái)機(jī)器上運(yùn)行會(huì)出問(wèn)題。
解決方法是,將ip指定為本機(jī)在局域網(wǎng)中的ip地址,如
191.168.1.123
,然后兩臺(tái)機(jī)器就可以正常工作了。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
