基于elsearch比价引擎

来自CloudWiki
Heart讨论 | 贡献2020年9月5日 (六) 10:17的版本 基于elsearch比价引擎
跳转至: 导航搜索

基于elsearch比价引擎

csv文件上传到Elsearch
1.安装Logstash
解压软件包:tar -zxvf logstash-7.3.2.tar.gz
将安装包移动到 mv logstash-7.3.2 /usr/local/
cd /usr/local/
重命名: mv logstash-7.3.2 logstash
进入配置文件目录:/usr/local/logstash/config
原始文件格式:sj.txt

1,1,Apparel_Textiles & Accessories,Apparel,Apparel Design Services,//s.alicdn.com/@sc01/kf/H50cfdfc8a6c4445ab6f3a926a2542c00t.jpg_300x300.jpg,embroidery 
digitizing service 24h on line,1,$1.00-$2.00,Unit,Shen Zhen Happitoo Textile Co._ Ltd.,//happitoo.en.alibaba.com/company_profile.html#top-nav-bar,China,1,Apparel Design Apparel_Textiles Accessories Services

在/usr/local/logstash/config目录下创建abc-es.conf

input {
 file {
   path => ["/root/sj.txt"]
   start_position => "beginning"
 }
}
filter {
 csv {
   separator => ","
   columns => 
["productId","productPage","categoryOne","categoryTwo","categoryThr","productImg","productName","productPrice","productPrices","productUnit","companyName","companyUrl","address","year","tags"]
 } 
}
output {
 elasticsearch {
       hosts => ["master:9200"]
       index => "flight"
 }
}

执行命令:./bin/logstash -f config/abc-es.conf --path.data=/root/abc
2020-09-05 173900.png


将mysql数据上传到Elsearch

首先在数据库中创建一个表:

create table data(
   productId int,
   productPage int,
   categoryOne varchar(300),
   categoryTwo varchar(300),
   categoryThr varchar(300),
   productImg varchar(300),	
   productName varchar(300),
   productPrice float,
   productPrices varchar(300),
   productUnit varchar(300),
   companyName varchar(300),
   companyUrl varchar(300),
   address varchar(300),
   year int,
   tags varchar(300) );

接下来将数据导入到表: load data local infile "/root/sj.txt" into table data FIELDS TERMINATED BY "," LINES TERMINATED BY "\n"

查询发现数据正常:
2020-09-05 180142.png

创建Logstash配置文件将mysql数据导入到elsearch
需要注意驱动文件要写绝对路径mysql-connector-java-5.1.38.jar


input {

 # 多张表的同步只需要设置多个jdbc的模块就行了
 jdbc {
     # mysql 数据库链接,shop为数据库名
     jdbc_connection_string => "jdbc:mysql://master:3306/price?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
     # 用户名和密码
     jdbc_user => "root"
     jdbc_password => "root"
     # 驱动
     jdbc_driver_library => "/usr/local/logstash/config/mysql-connector-java-5.1.38.jar"
     # 驱动类名
     jdbc_driver_class => "com.mysql.jdbc.Driver"
     jdbc_validate_connection => "true"
     #是否分页
     jdbc_paging_enabled => "true"
     jdbc_page_size => "10000"
     #时区
     jdbc_default_timezone => "Asia/Shanghai"
     #直接执行sql语句
     statement => "select * from data"
     # 执行的sql 文件路径+名称
     # statement_filepath => "/hw/elasticsearch/logstash-6.2.4/bin/test.sql"
     #设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
     #schedule => "* * * * *"
     #每隔10分钟执行一次
     #schedule => "*/10 * * * *"
     #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到last_run_metadata_path
     record_last_run => true
     #记录最新的同步的offset信息
     last_run_metadata_path => "/root/last_id.txt"
     use_column_value => true
     #递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型
     tracking_column_type => "numeric"
     tracking_column => "id"  
     clean_run => false
     # 索引类型
     #type => "jdbc"
   }

} output {

 elasticsearch {
       #es的ip和端口
       hosts => ["http://master:9200"]
       #ES索引名称(自己定义的)
       index => "spider"
       #文档类型
       document_type => "_doc"
       #设置数据的id为数据库中的字段
       #document_id => "%{id}"
   }
   stdout {
       codec => json_lines
   }

}