您当前的位置:首页 > 计算机 > 编程开发 > Python

python 小小的分布式爬虫

时间:04-01来源:作者:点击数:

一, 编写URL管理器 与 数据存储器 URLManager.py

import pickle
import hashlib
import codecs
import time

class UrlManager(object):
	""" URL 管理器 """
	def __init__(self):
		# 未爬取的 URL 集合
		self.new_urls = self.load_progress('new_urls.txt')
		# 已经爬取的 URL md5 集合
		self.old_urls = self.load_progress('old_urls.txt')
		
	def has_new_url(self):
		""" 判断是否有未爬取的URL """
		return self.new_url_size() != 0

	def get_new_url(self):
		""" 获取一个未爬取的URL"""
		new_url = self.new_urls.pop()
		m = hashlib.md5()
		m.update(new_url.encode())
		# 由于url较少,md5冲突不大,只取中间16为字符串
		self.old_urls.add(m.hexdigest()[8:-8])
		return new_url

	def add_new_url(self,url):
		""" 将新的URL添加到未爬取的URL集合中 """
		if url is None:
			return
		m = hashlib.md5()
		m.update(url.encode())
		url_md5 = m.hexdigest()[8:-8]
		if url not in self.new_urls and url_md5 not in self.old_urls:
			self.new_urls.add(url)

	def add_new_urls(self,urls):
		""" 将新的 URL 列表添加到未爬取的 URL 集合中 """
		if urls is None or len(urls)==0:
			return
		for url in urls:
			self.add_new_url(url)

	def new_url_size(self):
		""" 获取未爬取的 URL 集合的大小 """
		return len(self.new_urls)

	def old_url_size(self):
		""" 获取已经爬取的 URL 集合的大小 """
		return len(self.old_urls)

	def save_progress(self,path,data):
		""" 保存进度 """
		with open(path,'wb') as f:
			pickle.dump(data, f)

	def load_progress(self,path):
		""" 从本地文件加载进度 """
		print('[+] 从本地文件加载进度: %s'%path)
		try:
			with open(path,'rb') as f:
				tmp = pickle.load(f)
				return tmp
		except:
			print('[!] 无进度文件, 创建: %s'%path)
		return set()


class DataOutput(object):
	""" 数据存储器 """
	def __init__(self):
		self.filepath = 'baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S",time.localtime()))
		self.output_head(self.filepath)
		self.datas = []

	def store_data(self,data):
		if data is None:
			return
		self.datas.append(data)
		if len(self.datas) > 10:
			self.output_html(self.filepath)

	def output_head(self,path):
		""" 将 HTML 头写进去 """
		fout = codecs.open(path,'w',encoding='utf-8')
		fout.write("<html>")
		fout.write("<head><meta charset='utf-8'/></head>")
		fout.write("<body>")
		fout.write("<table>")
		fout.close()

	def output_html(self,path):
		""" 将数据写入 HTML 文件中 """
		fout = codecs.open(path,'a',encoding='utf-8')
		for data in self.datas:
			fout.write("<tr>")
			wurl = "<td><a href='%s'>%s</a></td>"%(data['url'],data['url']) if data.get('url') else "<td></td>"
			wtitle = "<td>%s</td>"%data['title'] if data.get('title') else "<td></td>"
			wsum = "<td>%s</td>"%data['summary'] if data.get('summary') else "<td></td>"
			fout.write(wurl)
			fout.write(wtitle)
			fout.write(wsum)
			fout.write("</tr>")
			self.datas.remove(data)
		fout.close()

	def ouput_end(self,path):
		""" 输出 HTML 结束 """
		fout = codecs.open(path,'a',encoding='utf-8')
		fout.write("</table>")
		fout.write("</body>")
		fout.write("</html>")
		fout.close()
		

二 编写爬虫节点 SpiderNode.py

import re
import requests
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from multiprocessing.managers import BaseManager

"""
爬虫节点
"""

class HtmlDownloader(object):
	""" HTML 下载器 """
	
	def download(self,url):
		if url is None:
			return None
		user_agent = "Mozilla/5.0 (Windows NT 5.1; rv:52.0) Gecko/20100101 Firefox/52.0"
		headers = {'User-Agent':user_agent}
		r = requests.get(url,headers=headers)
		if r.status_code==200:
			r.encoding = 'utf-8'
			return r.text
		return None




class HtmlParser(object):
	"""HTML 解析器"""
	
	def parser(self,page_url,html_cont):
		''' 用于解析网页内容,抽取URL和数据'''
		if page_url is None or html_cont is None:
			return
		soup = BeautifulSoup(html_cont,'html.parser')  # ,from_encoding='utf-8'
		new_urls = self._get_new_urls(page_url,soup)
		new_data = self._get_new_data(page_url,soup)
		return new_urls,new_data

	def _get_new_urls(self,page_url,soup):
		''' 抽取新的URL集合'''
		new_urls = set()
		# 抽取符合要求的a标记
		links = soup.find_all('a')  # ,href=re.compile(r'/view/\d+\.htm')
		for link in links:
			try:
				# 提取 href 属性
				new_url = link['href']
				# 拼接成完整网址
				new_full_url = urljoin(page_url,new_url)
				new_urls.add(new_full_url)
			except Exception as exc:
				print(exc)
		return new_urls
		
	def _get_new_data(self,page_url,soup):
		''' 抽取有效数据 '''
		data = {}
		data['url'] = page_url
		try:
			title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
			data['title'] = title.get_text()
			summary = soup.find('div',class_='lemma-summary')
			# 获取 tag 中包含的所有文本内容,包括子孙 tag 中的内容
			# 并将结果作为 Unicode 字符串返回
			data['summary'] = summary.get_text()
		except Exception as exc:
			print(exc)
		return data



class SpiderWork(object):
	""" 爬虫调度器 """

	def __init__(self):
		# 初始化分布式进程中工作节点的连接工作
		# 第一步:使用 BaseManager 注册用于获取 Queue 的方法名称
		BaseManager.register('get_task_queue')
		BaseManager.register('get_result_queue')
		# 第二步:连接到服务器
		server_addr = b'192.168.1.105'
		print('Connect to sever %s...'%server_addr)
		# 注意保持端口和验证口令与服务进程的设置完全一致
		self.m = BaseManager(address=(server_addr,8001),authkey=b'isspider')
		# 从网络连接
		self.m.connect()
		# 第三步:获取 Queue 对象
		self.task = self.m.get_task_queue()
		self.result = self.m.get_result_queue()
		# 初始化网页下载器和解析器
		self.downloader = HtmlDownloader()
		self.parser = HtmlParser()
		print('init finish')

	def crawl(self):
		while True:
			try:
				if not self.task.empty():
					url = self.task.get()
					if url == 'end':
						print('控制节点通知爬虫节点停止工作...')
						# 接着通知其他节点停止工作
						self.result.put({'new_urls':'end','data':'end'})
						return
					print('爬虫节点正在解析:%s'%url.encode('utf-8'))
					content = self.downloader.download(url)
					new_urls,data = self.parser.parser(url,content)
					self.result.put({'new_urls':new_urls,'data':data})
			except EOFError as exc:
				print('连接工作节点失败!')
				return
			except Exception as exc:
				print(exc)
				print('Crawl fali')

if __name__ == '__main__':
	spider = SpiderWork()
	spider.crawl()

三 编写控制调度器 NoderManager.py

import re
import requests
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from multiprocessing.managers import BaseManager

"""
爬虫节点
"""

class HtmlDownloader(object):
	""" HTML 下载器 """
	
	def download(self,url):
		if url is None:
			return None
		user_agent = "Mozilla/5.0 (Windows NT 5.1; rv:52.0) Gecko/20100101 Firefox/52.0"
		headers = {'User-Agent':user_agent}
		r = requests.get(url,headers=headers)
		if r.status_code==200:
			r.encoding = 'utf-8'
			return r.text
		return None




class HtmlParser(object):
	"""HTML 解析器"""
	
	def parser(self,page_url,html_cont):
		''' 用于解析网页内容,抽取URL和数据'''
		if page_url is None or html_cont is None:
			return
		soup = BeautifulSoup(html_cont,'html.parser')  # ,from_encoding='utf-8'
		new_urls = self._get_new_urls(page_url,soup)
		new_data = self._get_new_data(page_url,soup)
		return new_urls,new_data

	def _get_new_urls(self,page_url,soup):
		''' 抽取新的URL集合'''
		new_urls = set()
		# 抽取符合要求的a标记
		links = soup.find_all('a')  # ,href=re.compile(r'/view/\d+\.htm')
		for link in links:
			try:
				# 提取 href 属性
				new_url = link['href']
				# 拼接成完整网址
				new_full_url = urljoin(page_url,new_url)
				new_urls.add(new_full_url)
			except Exception as exc:
				print(exc)
		return new_urls
		
	def _get_new_data(self,page_url,soup):
		''' 抽取有效数据 '''
		data = {}
		data['url'] = page_url
		try:
			title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
			data['title'] = title.get_text()
			summary = soup.find('div',class_='lemma-summary')
			# 获取 tag 中包含的所有文本内容,包括子孙 tag 中的内容
			# 并将结果作为 Unicode 字符串返回
			data['summary'] = summary.get_text()
		except Exception as exc:
			print(exc)
		return data



class SpiderWork(object):
	""" 爬虫调度器 """

	def __init__(self):
		# 初始化分布式进程中工作节点的连接工作
		# 第一步:使用 BaseManager 注册用于获取 Queue 的方法名称
		BaseManager.register('get_task_queue')
		BaseManager.register('get_result_queue')
		# 第二步:连接到服务器
		server_addr = b'192.168.1.105'
		print('Connect to sever %s...'%server_addr)
		# 注意保持端口和验证口令与服务进程的设置完全一致
		self.m = BaseManager(address=(server_addr,8001),authkey=b'isspider')
		# 从网络连接
		self.m.connect()
		# 第三步:获取 Queue 对象
		self.task = self.m.get_task_queue()
		self.result = self.m.get_result_queue()
		# 初始化网页下载器和解析器
		self.downloader = HtmlDownloader()
		self.parser = HtmlParser()
		print('init finish')

	def crawl(self):
		while True:
			try:
				if not self.task.empty():
					url = self.task.get()
					if url == 'end':
						print('控制节点通知爬虫节点停止工作...')
						# 接着通知其他节点停止工作
						self.result.put({'new_urls':'end','data':'end'})
						return
					print('爬虫节点正在解析:%s'%url.encode('utf-8'))
					content = self.downloader.download(url)
					new_urls,data = self.parser.parser(url,content)
					self.result.put({'new_urls':new_urls,'data':data})
			except EOFError as exc:
				print('连接工作节点失败!')
				return
			except Exception as exc:
				print(exc)
				print('Crawl fali')

if __name__ == '__main__':
	spider = SpiderWork()
	spider.crawl()



四 执行

1 运行控制调度器 NodeManager.py

python NodeManager.py

2 运行爬虫节点 (可在多台机器运行) SpiderNode.py

python NodeManager.py

执行后生成的HTML文件结果如下:


方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门