查看“丝路通:分布式爬虫任务分配”的源代码
←
丝路通:分布式爬虫任务分配
跳转至:
导航
,
搜索
因为以下原因,您没有权限编辑本页:
您所请求的操作仅限于该用户组的用户使用:
用户
您可以查看与复制此页面的源代码。
==任务切割== 将原始的待爬目录表 分割成许多小份,当作许多小任务去完成。 ===敦煌网=== <nowiki> import time task_header ='../../task/dh_task/dh_task_' #header def assign_task(): task_content = "" # 创建类别网址列表 fp = open('dh_sub_category.csv', "rt") # 打开csv文件 count= 0 num =0 #类别名 类目级别 父类目级别 s =set()#储存已有的类别 for line in fp: # 文件对象可以直接迭代 count +=1 task_content +=line if count%100 ==0: num += 1 fw = open(task_header+str(num)+".csv","w",encoding="utf-8") fw.write(task_content) fw.close() task_content ="" fw = open(task_header+str(num)+".csv","a",encoding="utf-8") fw.write(task_content) fw.close() task_content ="" fp.close() if __name__ == '__main__': assign_task() </nowiki> ===阿里巴巴=== <nowiki> import time task_header ='../../task/ali_task/ali_task_' #header def assign_task(): task_content = "" # 创建类别网址列表 fp = open('alibaba_categary.csv', "rt") # 打开csv文件 count= 0 num =0 #类别名 类目级别 父类目级别 for line in fp: # 文件对象可以直接迭代 count +=1 task_content +=line if count%100 == 0: num += 1 fw = open(task_header+str(num)+".csv","w",encoding="utf-8") fw.write(task_content) fw.close() task_content ="" if num <= 50: fw = open(task_header+str(num)+".csv","a",encoding="utf-8") else: fw = open(task_header+str(num+1)+".csv","a",encoding="utf-8") fw.write(task_content) fw.close() task_content ="" fp.close() if __name__ == '__main__': assign_task() </nowiki> ===中国制造网=== <nowiki> import time task_header ='../../task/mc_task/mc_task_' #header def assign_task(): task_content = "" # 创建类别网址列表 fp = open('made_in_china_sub_cat.csv', "rt") # 打开csv文件 count= 0 num =0 for line in fp: # 文件对象可以直接迭代 count +=1 task_content +=line if count%200 ==0: num += 1 fw = open(task_header+str(num)+".csv","w",encoding="utf-8") fw.write(task_content) fw.close() task_content ="" if num <= 100: fw = open(task_header+str(num)+".csv","a",encoding="utf-8") else: fw = open(task_header+str(num+1)+".csv","a",encoding="utf-8") fw.write(task_content) fw.close() task_content ="" fp.close() if __name__ == '__main__': assign_task() </nowiki> ==任务表建立== ===安装mysql=== *[[Centos7 安装python3]],本项目安装python3.6 *[[Centos7 安装MySQL]] ===建立数据表=== <nowiki> CREATE DATABASE crawler; MariaDB [crawler]> CREATE TABLE IF NOT EXISTS `task`( -> `id` INT UNSIGNED AUTO_INCREMENT, -> `site_title` VARCHAR(100) NOT NULL, -> `task_name` VARCHAR(40) NOT NULL, -> `task_status` INT UNSIGNED NOT NULL, -> `availability_zones` VARCHAR(60) NOT NULL, -> `start_date` DATE, -> PRIMARY KEY ( `id` ))ENGINE=InnoDB DEFAULT CHARSET=utf8; Query OK, 0 rows affected (0.01 sec) MariaDB [crawler]> INSERT INTO task(site_title,task_name,task_status,availability_zones,start_date) VALUES('alibaba','ali_task_1','0','A','20200609'); Query OK, 1 row affected (0.00 sec) MariaDB [crawler]> select * from task; +----+------------+------------+-------------+--------------------+------------+ | id | site_title | task_name | task_status | availability_zones | start_date | +----+------------+------------+-------------+--------------------+------------+ | 1 | alibaba | ali_task_1 | 0 | A | 2020-06-09 | +----+------------+------------+-------------+--------------------+------------+</nowiki> ===关闭防火墙=== systemctl disable firewalld systemctl stop firewalld 在公网上须设置防火墙,不能一关了知 setenforce 0 ===数据库连接=== <nowiki>import pymysql # 打开数据库连接 db = pymysql.connect("10.0.0.30","root","000000","crawler" ) #这4个参数依次是:主机名,用户名,密码和数据库名 # 使用 cursor() 方法创建一个游标对象 cursor cursor = db.cursor() # 使用 execute() 方法执行 SQL 查询 cursor.execute("SELECT VERSION()") # 使用 fetchone() 方法获取单条数据. data = cursor.fetchone() print ("Database version : %s " % data) # 关闭数据库连接 db.close()</nowiki> ===查询数据库=== <nowiki>import pymysql # 打开数据库连接 db = pymysql.connect("10.0.0.30","root","000000","crawler" )# 使用cursor()方法获取操作游标 cursor = db.cursor() # SQL 查询语句 sql = "SELECT * FROM task \ WHERE 1" try: # 执行SQL语句 cursor.execute(sql) # 获取所有记录列表 results = cursor.fetchall() for row in results: print(row) except: print ("Error: unable to fetch data") # 关闭数据库连接 db.close()</nowiki> ==交付和分配任务== <nowiki>import pymysql # 打开数据库连接 db = pymysql.connect("10.0.0.30","root","000000","crawler" )# 使用cursor()方法获取操作游标 cursor = db.cursor()# 获取cursor对象 # SQL 查询语句 #找寻alibaba还没做的任务 #0,待完成;1:进行;2:完毕 def search_task(site,status,num1):#根据条件 搜索符合条件的任务 a_list = [] sql = "SELECT * FROM task \ WHERE site_title = '%s' \ AND task_status ='%d' LIMIT %d" % (site,status,num1) try: # 执行SQL语句 cursor.execute(sql) # 获取所有记录列表 results = cursor.fetchall() for row in results: a_list.append(row[0]) except: print ("Error: unable to fetch data") finally: return a_list def get_task(id):#获得当前任务的详细信息 a_list = [] sql = "SELECT * FROM task \ WHERE id = '%d' " % (id) try: # 执行SQL语句 cursor.execute(sql) # 获取所有记录列表 results = cursor.fetchall() for row in results: #print(row) a_list.append(row) #id号 , #print(a_list) except: print ("Error: unable to fetch data") finally: return a_list def update_task(id,status): sql2 = "update task set task_status ='%d' where id = %d" % (status,id) try: # 执行SQL语句 cursor.execute(sql2) # 提交之前的操作,如果之前已经执行多次的execute,那么就都进行提交 db.commit() print (str(id)+ " 更新成功!") except: print ("Error: unable to fetch data") def clear_all( ): sql2 = "update task set task_status ='%d' where 1=1" % (0) try: # 执行SQL语句 cursor.execute(sql2) # 提交之前的操作,如果之前已经执行多次的execute,那么就都进行提交 db.commit() print (" 清空成功!") except: print ("Error: unable to fetch data") if __name__ == '__main__': #搜寻待完成任务,并把他们设为1 a_list = search_task("alibaba",0,2)#搜寻alibaba待完成的任务 detail_list =[] for i in a_list: this_task = get_task(i) print(this_task) detail_list +=this_task update_task(i,1) print("序号为",a_list,"的任务开始执行...") #完成任务标记的,设为2 b_list=[1,2,3,4] for i in b_list: update_task(i,2) print("序号为",b_list,"的任务执行完毕...") #清空任务池,重新开始 clear_all( ) # 关闭数据库连接 cursor.close() db.close() </nowiki>
返回至
丝路通:分布式爬虫任务分配
。
导航菜单
个人工具
登录
命名空间
页面
讨论
变种
视图
阅读
查看源代码
查看历史
更多
搜索
导航
首页
最近更改
随机页面
帮助
工具
链入页面
相关更改
特殊页面
页面信息