“丝路通:分布式爬虫任务分配”的版本间的差异

来自CloudWiki
跳转至: 导航搜索
 
(未显示同一用户的6个中间版本)
第120行: 第120行:
  
 
==任务表建立==
 
==任务表建立==
===安装mysql和django===
+
===安装mysql===
 
*[[Centos7 安装python3]],本项目安装python3.6
 
*[[Centos7 安装python3]],本项目安装python3.6
 
*[[Centos7 安装MySQL]]
 
*[[Centos7 安装MySQL]]
*[[Django安装与启动]]
 
  
==model设计和资源导入 ==
+
===建立数据表===
*[[项目初始化]]、补充:[[MySQL中的character set和collation]]
+
 
*[[user models设计]]
+
<nowiki>
*[[goods modeles设计]]、补充:[[django中choice的使用]]
+
CREATE DATABASE crawler;
*[[trade交易的model设计]]
+
 
*[[用户操作的model设计]]、补充:[[django中unique together使用]]
+
MariaDB [crawler]> CREATE TABLE IF NOT EXISTS `task`(
 +
    ->    `id` INT UNSIGNED AUTO_INCREMENT,
 +
    ->    `site_title` VARCHAR(100) NOT NULL,
 +
    ->    `task_name` VARCHAR(40) NOT NULL,
 +
    ->    `task_status` INT UNSIGNED NOT NULL,
 +
    ->    `availability_zones` VARCHAR(60) NOT NULL,
 +
    ->    `start_date` DATE,
 +
    ->    PRIMARY KEY ( `id` ))ENGINE=InnoDB DEFAULT CHARSET=utf8;
 +
Query OK, 0 rows affected (0.01 sec)
 +
 
 +
MariaDB [crawler]> INSERT INTO task(site_title,task_name,task_status,availability_zones,start_date) VALUES('alibaba','ali_task_1','0','A','20200609');
 +
Query OK, 1 row affected (0.00 sec)
 +
 
 +
MariaDB [crawler]> select * from task;
 +
+----+------------+------------+-------------+--------------------+------------+
 +
| id | site_title | task_name  | task_status | availability_zones | start_date |
 +
+----+------------+------------+-------------+--------------------+------------+
 +
|  1 | alibaba    | ali_task_1 |          0 | A                  | 2020-06-09 |
 +
+----+------------+------------+-------------+--------------------+------------+</nowiki>
 +
 
 +
===关闭防火墙===
 +
 
 +
systemctl disable firewalld
 +
systemctl stop firewalld
 +
 
 +
在公网上须设置防火墙,不能一关了知
 +
 
 +
setenforce 0
 +
 
 +
===数据库连接===
 +
 
 +
<nowiki>import pymysql
 +
 +
# 打开数据库连接
 +
db = pymysql.connect("10.0.0.30","root","000000","crawler" )
 +
#这4个参数依次是:主机名,用户名,密码和数据库名
 +
# 使用 cursor() 方法创建一个游标对象 cursor
 +
cursor = db.cursor()
 +
 +
# 使用 execute()  方法执行 SQL 查询
 +
cursor.execute("SELECT VERSION()")
 +
 +
# 使用 fetchone() 方法获取单条数据.
 +
data = cursor.fetchone()
 +
 +
print ("Database version : %s " % data)
 +
 +
# 关闭数据库连接
 +
db.close()</nowiki>
 +
 
 +
===查询数据库===
 +
<nowiki>import pymysql
 +
 +
 
 +
# 打开数据库连接
 +
db = pymysql.connect("10.0.0.30","root","000000","crawler" )# 使用cursor()方法获取操作游标
 +
cursor = db.cursor()
 +
 +
# SQL 查询语句
 +
sql = "SELECT * FROM task \
 +
      WHERE 1"
 +
try:
 +
  # 执行SQL语句
 +
  cursor.execute(sql)
 +
  # 获取所有记录列表
 +
  results = cursor.fetchall()
 +
  for row in results:
 +
      print(row)
 +
except:
 +
  print ("Error: unable to fetch data")
 +
 +
# 关闭数据库连接
 +
db.close()</nowiki>
 +
 
 +
==交付和分配任务==
 +
<nowiki>import pymysql
 +
 
 +
# 打开数据库连接
 +
db = pymysql.connect("10.0.0.30","root","000000","crawler" )# 使用cursor()方法获取操作游标
 +
cursor = db.cursor()# 获取cursor对象
 +
 +
# SQL 查询语句
 +
#找寻alibaba还没做的任务
 +
 
 +
#0,待完成;1:进行;2:完毕
 +
 
 +
def search_task(site,status,num1):#根据条件 搜索符合条件的任务
 +
 
 +
  a_list = []
 +
  sql = "SELECT * FROM task \
 +
      WHERE site_title = '%s' \
 +
      AND task_status ='%d' LIMIT %d" % (site,status,num1)
 +
 
 +
  try:
 +
      # 执行SQL语句
 +
      cursor.execute(sql)
 +
      # 获取所有记录列表
 +
      results = cursor.fetchall()
 +
      for row in results:
 +
        a_list.append(row[0])
 +
 
 +
     
 +
  except:
 +
      print ("Error: unable to fetch data")
 +
 
 +
  finally:
 +
      return a_list
 +
 
 +
def get_task(id):#获得当前任务的详细信息
 +
 
 +
  a_list = []
 +
  sql = "SELECT * FROM task \
 +
      WHERE id = '%d' " % (id)
 +
 
 +
  try:
 +
      # 执行SQL语句
 +
      cursor.execute(sql)
 +
      # 获取所有记录列表
 +
      results = cursor.fetchall()
 +
      for row in results:
 +
        #print(row)
 +
        a_list.append(row)
 +
 
 +
      #id号 ,
 +
      #print(a_list)
 +
  except:
 +
      print ("Error: unable to fetch data")
 +
 
 +
  finally:
 +
      return a_list
 +
 
 +
def update_task(id,status):
 +
  sql2 = "update task set task_status ='%d' where id = %d" % (status,id)
 +
  try:
 +
      # 执行SQL语句
 +
      cursor.execute(sql2)
 +
      # 提交之前的操作,如果之前已经执行多次的execute,那么就都进行提交
 +
      db.commit()
 +
      print (str(id)+ " 更新成功!")
 +
  except:
 +
      print ("Error: unable to fetch data")
 +
 
 +
def clear_all( ):
 +
  sql2 = "update task set task_status ='%d' where 1=1" % (0)
 +
  try:
 +
      # 执行SQL语句
 +
      cursor.execute(sql2)
 +
      # 提交之前的操作,如果之前已经执行多次的execute,那么就都进行提交
 +
      db.commit()
 +
      print (" 清空成功!")
 +
     
 +
  except:
 +
      print ("Error: unable to fetch data")
 +
 
 +
if __name__ == '__main__':
 +
  #搜寻待完成任务,并把他们设为1
 +
  a_list = search_task("alibaba",0,2)#搜寻alibaba待完成的任务
 +
  detail_list =[]
 +
  for i in a_list:
 +
      this_task = get_task(i)
 +
      print(this_task)
 +
      detail_list +=this_task
 +
      update_task(i,1)
 +
  print("序号为",a_list,"的任务开始执行...")
 +
 
 +
  #完成任务标记的,设为2
 +
  b_list=[1,2,3,4]
 +
  for i in b_list:
 +
      update_task(i,2)
 +
  print("序号为",b_list,"的任务执行完毕...")
 +
 
 +
  #清空任务池,重新开始
 +
  clear_all( )
 +
  # 关闭数据库连接
 +
  cursor.close()
 +
  db.close()
 +
</nowiki>

2020年9月19日 (六) 07:41的最新版本

任务切割

将原始的待爬目录表 分割成许多小份,当作许多小任务去完成。

敦煌网

import time

task_header ='../../task/dh_task/dh_task_' #header
def assign_task():
    task_content = ""  # 创建类别网址列表
    fp = open('dh_sub_category.csv', "rt")  # 打开csv文件
    
    count= 0
    num =0
    #类别名  类目级别  父类目级别

    s =set()#储存已有的类别
    for line in fp:  # 文件对象可以直接迭代
        count +=1
        task_content +=line
        
        if count%100 ==0:
            num += 1
            fw = open(task_header+str(num)+".csv","w",encoding="utf-8")
            fw.write(task_content)
            fw.close()
            task_content =""
        
    
    fw = open(task_header+str(num)+".csv","a",encoding="utf-8")
    fw.write(task_content)
    fw.close()
    task_content =""    
    fp.close()
    

if __name__ == '__main__':
    assign_task()


阿里巴巴

import time

task_header ='../../task/ali_task/ali_task_' #header
def assign_task():
    task_content = ""  # 创建类别网址列表
    fp = open('alibaba_categary.csv', "rt")  # 打开csv文件
    
    count= 0
    num =0
    #类别名  类目级别  父类目级别

    
    for line in fp:  # 文件对象可以直接迭代
        count +=1
        task_content +=line
        
        if count%100 == 0:
            num += 1
            fw = open(task_header+str(num)+".csv","w",encoding="utf-8")
            fw.write(task_content)
            fw.close()
            task_content =""
        
    if num <= 50:
        fw = open(task_header+str(num)+".csv","a",encoding="utf-8")
    else:
        fw = open(task_header+str(num+1)+".csv","a",encoding="utf-8")
        
    fw.write(task_content)
    fw.close()    
    task_content =""    
    fp.close()
    

if __name__ == '__main__':
    assign_task()
    


中国制造网

import time

task_header ='../../task/mc_task/mc_task_' #header
def assign_task():
    task_content = ""  # 创建类别网址列表
    fp = open('made_in_china_sub_cat.csv', "rt")  # 打开csv文件
    
    count= 0
    num =0
 
    for line in fp:  # 文件对象可以直接迭代
        count +=1
        task_content +=line
        
        if count%200 ==0:
            num += 1
            fw = open(task_header+str(num)+".csv","w",encoding="utf-8")
            fw.write(task_content)
            fw.close()
            task_content =""
        
    if num <= 100:
        fw = open(task_header+str(num)+".csv","a",encoding="utf-8")
    else:
        fw = open(task_header+str(num+1)+".csv","a",encoding="utf-8")
    
    fw.write(task_content)
    fw.close()
    task_content =""    
    fp.close()
    

if __name__ == '__main__':
    assign_task()
    


任务表建立

安装mysql

建立数据表

CREATE DATABASE crawler;

MariaDB [crawler]> CREATE TABLE IF NOT EXISTS `task`(
    ->    `id` INT UNSIGNED AUTO_INCREMENT,
    ->    `site_title` VARCHAR(100) NOT NULL,
    ->    `task_name` VARCHAR(40) NOT NULL,
    ->    `task_status` INT UNSIGNED NOT NULL,
    ->    `availability_zones` VARCHAR(60) NOT NULL,
    ->    `start_date` DATE,
    ->    PRIMARY KEY ( `id` ))ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query OK, 0 rows affected (0.01 sec)

MariaDB [crawler]> INSERT INTO task(site_title,task_name,task_status,availability_zones,start_date) VALUES('alibaba','ali_task_1','0','A','20200609');
Query OK, 1 row affected (0.00 sec)

MariaDB [crawler]> select * from task;
+----+------------+------------+-------------+--------------------+------------+
| id | site_title | task_name  | task_status | availability_zones | start_date |
+----+------------+------------+-------------+--------------------+------------+
|  1 | alibaba    | ali_task_1 |           0 | A                  | 2020-06-09 |
+----+------------+------------+-------------+--------------------+------------+

关闭防火墙

systemctl disable firewalld
systemctl stop firewalld

在公网上须设置防火墙,不能一关了知

setenforce 0

数据库连接

import pymysql
 
# 打开数据库连接
db = pymysql.connect("10.0.0.30","root","000000","crawler" )
#这4个参数依次是:主机名,用户名,密码和数据库名
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
 
# 使用 execute()  方法执行 SQL 查询 
cursor.execute("SELECT VERSION()")
 
# 使用 fetchone() 方法获取单条数据.
data = cursor.fetchone()
 
print ("Database version : %s " % data)
 
# 关闭数据库连接
db.close()

查询数据库

import pymysql
 

# 打开数据库连接
db = pymysql.connect("10.0.0.30","root","000000","crawler" )# 使用cursor()方法获取操作游标 
cursor = db.cursor()
 
# SQL 查询语句
sql = "SELECT * FROM task \
       WHERE 1" 
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 获取所有记录列表
   results = cursor.fetchall()
   for row in results:
      print(row)
except:
   print ("Error: unable to fetch data")
 
# 关闭数据库连接
db.close()

交付和分配任务

import pymysql

# 打开数据库连接
db = pymysql.connect("10.0.0.30","root","000000","crawler" )# 使用cursor()方法获取操作游标 
cursor = db.cursor()# 获取cursor对象
 
# SQL 查询语句
#找寻alibaba还没做的任务

#0,待完成;1:进行;2:完毕

def search_task(site,status,num1):#根据条件 搜索符合条件的任务
   
   a_list = []
   sql = "SELECT * FROM task \
       WHERE site_title = '%s' \
       AND task_status ='%d' LIMIT %d" % (site,status,num1)
   
   try:
      # 执行SQL语句
      cursor.execute(sql)
      # 获取所有记录列表
      results = cursor.fetchall()
      for row in results:
         a_list.append(row[0])

      
   except:
      print ("Error: unable to fetch data")

   finally:
      return a_list

def get_task(id):#获得当前任务的详细信息
   
   a_list = []
   sql = "SELECT * FROM task \
       WHERE id = '%d' " % (id)
   
   try:
      # 执行SQL语句
      cursor.execute(sql)
      # 获取所有记录列表
      results = cursor.fetchall()
      for row in results:
         #print(row)
         a_list.append(row)

      #id号 ,
      #print(a_list)
   except:
      print ("Error: unable to fetch data")

   finally:
      return a_list

def update_task(id,status):
   sql2 = "update task set task_status ='%d' where id = %d" % (status,id)
   try:
      # 执行SQL语句
      cursor.execute(sql2)
      # 提交之前的操作,如果之前已经执行多次的execute,那么就都进行提交
      db.commit()
      print (str(id)+ " 更新成功!")
   except:
      print ("Error: unable to fetch data")

def clear_all( ):
   sql2 = "update task set task_status ='%d' where 1=1" % (0)
   try:
      # 执行SQL语句
      cursor.execute(sql2)
      # 提交之前的操作,如果之前已经执行多次的execute,那么就都进行提交
      db.commit()
      print (" 清空成功!")
      
   except:
      print ("Error: unable to fetch data")

if __name__ == '__main__':
   #搜寻待完成任务,并把他们设为1
   a_list = search_task("alibaba",0,2)#搜寻alibaba待完成的任务
   detail_list =[]
   for i in a_list:
      this_task = get_task(i)
      print(this_task)
      detail_list +=this_task
      update_task(i,1)
   print("序号为",a_list,"的任务开始执行...")
   
   #完成任务标记的,设为2
   b_list=[1,2,3,4]
   for i in b_list:
      update_task(i,2)
   print("序号为",b_list,"的任务执行完毕...")
   
   #清空任务池,重新开始
   clear_all( )
   # 关闭数据库连接
   cursor.close()
   db.close()