您当前的位置:首页 > 计算机 > 编程开发 > Python

使用 Python 分布式进程进行大数据处理

时间:04-01来源:作者:点击数:

一、创建服务端,taskManager.py,代码如下:

import random

import time

import queue

from multiprocessing.managers import BaseManager

# 第一步:建立task_queue 和 result_queue,用来存放任务结果

task_queue = queue.Queue()

result_queue = queue.Queue()

def get_task():

return task_queue

def get_result():

return result_queue

class Queuemanager(BaseManager):

pass

def main(ip,port,kl):

# 第二步:把创建的两个队列注册在网络上,利用 register 方法,

# callable参数关联了 Queue 对象,将Queue对象在网络中暴露

Queuemanager.register('get_task_queue',callable=get_task)

Queuemanager.register('get_result_queue',callable=get_result)

# 第三步:初始化对象:绑定IP、端口、设置验证口令 。

manager = Queuemanager(address=(ip,port),authkey=kl)

# 第四步:启动管理,监听信息通道

manager.start()

# 第五步:通过管理实例的方法获得通过网络访问的Queue对象

task = manager.get_task_queue()

result = manager.get_result_queue()

# 第六步:添加任务

for url in ["ImageUrl_"+str(i) for i in range(10)]:

print('put task %s ...'%url)

task.put(url)

# 获得返回结果

print('try get result...')

for i in range(10):

print('result is %s'%result.get(timeout=10)) # 最大等待10秒

# 关闭管理

manager.shutdown()

if __name__ == '__main__':

ip = '127.0.0.1' # 要绑定的本机IP

port = 8001 # 端口号

passwd = b'distributed' # 口令

main(ip,port,passwd)

二、创建客户端,taskWorker.py,客户端代码可以放入多台电脑运行,以达到分布式需求,代码如下:

import time

from multiprocessing.managers import BaseManager

# 创建类似的 QueueManager

class QueueManager(BaseManager):

pass

def main(server_addr,port,kl):

# 第一步:使用 QueueManager 注册用于获取Queue的方法名称

QueueManager.register('get_task_queue')

QueueManager.register('get_result_queue')

# 第二步:连接到服务器

print('Connect to server %s...'%server_addr)

# 端口和验证口令注意保持与服务进程完全一致

m = QueueManager(address=(server_addr,port),authkey=kl)

# 从网络连接

m.connect()

# 第三步:获取Queue的对象

task = m.get_task_queue()

result = m.get_result_queue()

# 第四步:从task队列获取任务,并把结果写入result队列

while (not task.empty()):

image_url = task.get(True,timeout=5)

print('run task download %s...'%image_url)

time.sleep(1)

result.put('%s--->Computer 1 success'%image_url)

# 处理结束

print('worker exit.')

if __name__ == '__main__':

ip = '127.0.0.1' # 要连接的服务端的IP

port = 8001 # 服务端设定的端口号

passwd = b'distributed' # 口令

main(ip,port,passwd)

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门