用python对ElasticSeaarch增删查改
来自CloudWiki
建立ES连接
#coding:utf8 import os import time from os import walk import CSVOP from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk class ElasticObj: def __init__(self, index_name,index_type,ip ="127.0.0.1"): ''' :param index_name: 索引名称 :param index_type: 索引类型 ''' self.index_name =index_name self.index_type = index_type # 无用户名密码状态 self.es = Elasticsearch([ip]) #用户名密码状态 #self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200) #obj =ElasticObj("ott","ott_type",ip ="47.93.117.127") obj = ElasticObj("ott1", "ott_type1")
创建索引
假如创建索引名称为ott,类型为ott_type的索引,该索引中有五个字段:
- title:存储中文标题,
- date:存储日期格式(2017-09-08),
- keyword:存储中文关键字,
- source:存储中文来源,
- link:存储链接,
def create_index(self,index_name="ott",index_type="ott_type"): ''' 创建索引,创建索引名称为ott,类型为ott_type的索引 :param ex: Elasticsearch对象 :return: ''' #创建映射 _index_mappings = { "mappings": { self.index_type: { "properties": { "title": { "type": "text", "index": True, "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "date": { "type": "text", "index": True }, "keyword": { "type": "string", "index": "not_analyzed" }, "source": { "type": "string", "index": "not_analyzed" }, "link": { "type": "string", "index": "not_analyzed" } } } } } if self.es.indices.exists(index=self.index_name) is not True: res = self.es.indices.create(index=self.index_name, body=_index_mappings) print res
在代码末尾obj = ElasticObj("ott1", "ott_type1")后面添加:
obj.create_index()
运行结果:
[root@localhost py_test]# python3 ElasticObj.py
{'acknowledged': True, 'shards_acknowledged': True}
索引数据
def Index_Data(self): ''' 数据存储到es :return: ''' list = [ { "date": "2017-09-13", "source": "慧聪网", "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml", "keyword": "电视", "title": "付费 电视 行业面临的转型和挑战" }, { "date": "2017-09-13", "source": "中国文明网", "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml", "keyword": "电视", "title": "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心" } ] for item in list: res = self.es.index(index=self.index_name, doc_type=self.index_type, body=item) print(res['created'])
在代码末尾添加调用它的语句(同时把上一行注释掉):
#obj =ElasticObj("ott","ott_type",ip ="47.93.117.127") obj = ElasticObj("ott1", "ott_type1") #obj.create_index() obj.Index_Data()
运行此程序:python3 ElasticObj.py
运行后的验证:
[root@localhost py_test]# curl 'localhost:9200/ott1/_search?pretty'
就会看到刚才插入的两条数据
批量索引数据
代码:
def bulk_Index_Data(self): ''' 用bulk将批量数据存储到es :return: ''' list = [ {"date": "2017-09-13", "source": "慧聪网", "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml", "keyword": "电视", "title": "付费 电视 行业面临的转型和挑战" }, {"date": "2017-09-13", "source": "中国文明网", "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml", "keyword": "电视", "title": "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心" }, {"date": "2017-09-13", "source": "人民电视", "link": "http://tv.people.com.cn/BIG5/n1/2017/0913/c67816-29533981.html", "keyword": "电视", "title": "中国第21批赴刚果(金)维和部隊启程--人民 电视 --人民网" }, {"date": "2017-09-13", "source": "站长之家", "link": "http://www.chinaz.com/news/2017/0913/804263.shtml", "keyword": "电视", "title": "电视 盒子 哪个牌子好? 吐血奉献三大选购秘笈" } ] ACTIONS = [] i = 1 for line in list: action = { "_index": self.index_name, "_type": self.index_type, "_id": i, #_id 也可以默认生成,不赋值 "_source": { "date": line['date'], "source": line['source'], "link": line['link'], "keyword": line['keyword'], "title": line['title']} } i += 1 ACTIONS.append(action) # 批量处理 success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True) print('Performed %d actions' % success)
文末调用语句:
#obj =ElasticObj("ott","ott_type",ip ="47.93.117.127") obj = ElasticObj("ott1", "ott_type1") #obj.create_index() #obj.Index_Data() obj.Get_Data_By_Body("电视") #obj.Delete_Index_Data("AWxhMSrJp5xr6Yl4nhAx") obj.bulk_Index_Data()
运行此程序:python3 ElasticObj.py
运行后的验证:
[root@localhost py_test]# curl 'localhost:9200/ott1/_search?pretty'
就会看到刚才批量插入的四条数据
从CSV文件中导入数据
代码:
import csv def Index_Data_FromCSV(self,csvfile): ''' 从CSV文件中读取数据,并存储到es中 :param csvfile: csv文件,包括完整路径 :return: ''' data = [] with open(csvfile,'r',encoding='utf-8') as f: reader = csv.reader(f,dialect='excel') for row in reader: data.append(row) index = 0 doc = {} for item in data: if index > 0:#第一行是标题 doc['title'] = item[0] doc['link'] = item[1] doc['date'] = item[2] doc['source'] = item[3] doc['keyword'] = item[4] res = self.es.index(index=self.index_name, doc_type=self.index_type, body=doc) print(res['created']) index += 1 print(index)
文末调用代码:
#obj =ElasticObj("ott","ott_type",ip ="47.93.117.127") obj = ElasticObj("ott1", "ott_type1") #obj.create_index() #obj.Index_Data() #obj.Get_Data_By_Body("电视") #obj.Delete_Index_Data("AWxhMSrJp5xr6Yl4nhAx") #obj.bulk_Index_Data() obj.Index_Data_FromCSV('news2.csv')
注:CSV文件可以有爬虫程序获取,也可以自己用Excek软件新建一个(另存为csv格式即可)
可通过ES的命令行操作 查询CSV文件是否导入成功。
查询索引
def Get_Data_By_Body(self,word): # doc = {'query': {'match_all': {}}} doc = { "query": { "match": { "keyword": word } } } _searched = self.es.search(index=self.index_name, body=doc) for hit in _searched['hits']['hits']: # print(hit['_source']) print(hit['_source']['date'], hit['_source']['source'], hit['_source']['link'], hit['_source']['keyword'], \ hit['_source']['title'])
文末的调用代码修改如下:
#obj =ElasticObj("ott","ott_type",ip ="47.93.117.127") obj = ElasticObj("ott1", "ott_type1") #obj.create_index() #obj.Index_Data() obj.Get_Data_By_Body("电视")
运行程序;
[root@localhost py_test]# python3 ElasticObj.py
2017-09-13 中国文明网 http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml 电视 电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心 2017-09-13 慧聪网 http://info.broadcast.hc360.com/2017/09/130859749974.shtml 电视 付费 电视 行业面临的转型和挑战
删除数据
删除之前先查一下该条数据的id:
[root@localhost py_test]# curl 'localhost:9200/ott1/_search?pretty' { "took" : 1, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "ott1", "_type" : "ott_type1", "_id" : "AWxhMSrJp5xr6Yl4nhAx", "_score" : 1.0, "_source" : { "date" : "2017-09-13", "source" : "中国文明网", "link" : "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml", "keyword" : "电视", "title" : "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心" } }, 。。。 }
函数代码:
def Delete_Index_Data(self,id): ''' 删除索引中的一条 :param id: :return: ''' res = self.es.delete(index=self.index_name, doc_type=self.index_type, id=id) print(res)
调用代码:
#obj =ElasticObj("ott","ott_type",ip ="47.93.117.127") obj = ElasticObj("ott1", "ott_type1") #obj.create_index() #obj.Index_Data() obj.Get_Data_By_Body("电视") obj.Delete_Index_Data("AWxhMSrJp5xr6Yl4nhAx")#待删除的id号
执行程序:
[root@localhost py_test]# python3 ElasticObj.py
AWxhMSrJp5xr6Yl4nhAx 2017-09-13 中国文明网 http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml 电视 电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心 AWxhMSpVp5xr6Yl4nhAw 2017-09-13 慧聪网 http://info.broadcast.hc360.com/2017/09/130859749974.shtml 电视 付费 电视 行业面临的转型和挑战 {'found': True, '_index': 'ott1', '_type': 'ott_type1', '_id': 'AWxhMSrJp5xr6Yl4nhAx', '_version': 2, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}}
完整代码
#coding:utf8 import os import time from os import walk import csv from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk class ElasticObj: def __init__(self, index_name,index_type,ip ="127.0.0.1"): ''' :param index_name: 索引名称 :param index_type: 索引类型 ''' self.index_name =index_name self.index_type = index_type # 无用户名密码状态 self.es = Elasticsearch( ) #用户名密码状态 #self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200) def create_index(self,index_name="ott",index_type="ott_type"): ''' 创建索引,创建索引名称为ott,类型为ott_type的索引 :param ex: Elasticsearch对象 :return: ''' #创建映射 _index_mappings = { "mappings": { self.index_type: { "properties": { "title": { "type": "text", "index": True, "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "date": { "type": "text", "index": True }, "keyword": { "type": "string", "index": "not_analyzed" }, "source": { "type": "string", "index": "not_analyzed" }, "link": { "type": "string", "index": "not_analyzed" } } } } } if self.es.indices.exists(index=self.index_name) is not True: res = self.es.indices.create(index=self.index_name, body=_index_mappings) print(res) def Index_Data(self): ''' 数据存储到es :return: ''' list = [ { "date": "2017-09-13", "source": "慧聪网", "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml", "keyword": "电视", "title": "付费 电视 行业面临的转型和挑战" }, { "date": "2017-09-13", "source": "中国文明网", "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml", "keyword": "电视", "title": "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心" } ] for item in list: res = self.es.index(index=self.index_name, doc_type=self.index_type, body=item) print(res['created']) def Get_Data_By_Body(self,word): # doc = {'query': {'match_all': {}}} doc = { "query": { "match": { "keyword": word } } } _searched = self.es.search(index=self.index_name, body=doc) for hit in _searched['hits']['hits']: # print(hit['_source']) print(hit['_id'],hit['_source']['date'], hit['_source']['source'], hit['_source']['link'], hit['_source']['keyword'], \ hit['_source']['title']) def Delete_Index_Data(self,id): ''' 删除索引中的一条 :param id: :return: ''' res = self.es.delete(index=self.index_name, doc_type=self.index_type, id=id) print(res) def bulk_Index_Data(self): ''' 用bulk将批量数据存储到es :return: ''' list = [ {"date": "2017-09-13", "source": "慧聪网", "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml", "keyword": "电视", "title": "付费 电视 行业面临的转型和挑战" }, {"date": "2017-09-13", "source": "中国文明网", "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml", "keyword": "电视", "title": "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心" }, {"date": "2017-09-13", "source": "人民电视", "link": "http://tv.people.com.cn/BIG5/n1/2017/0913/c67816-29533981.html", "keyword": "电视", "title": "中国第21批赴刚果(金)维和部隊启程--人民 电视 --人民网" }, {"date": "2017-09-13", "source": "站长之家", "link": "http://www.chinaz.com/news/2017/0913/804263.shtml", "keyword": "电视", "title": "电视 盒子 哪个牌子好? 吐血奉献三大选购秘笈" } ] ACTIONS = [] i = 1 for line in list: action = { "_index": self.index_name, "_type": self.index_type, "_id": i, #_id 也可以默认生成,不赋值 "_source": { "date": line['date'], "source": line['source'], "link": line['link'], "keyword": line['keyword'], "title": line['title']} } i += 1 ACTIONS.append(action) # 批量处理 success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True) print('Performed %d actions' % success) def Index_Data_FromCSV(self,csvfile): ''' 从CSV文件中读取数据,并存储到es中 :param csvfile: csv文件,包括完整路径 :return: ''' data = [] with open(csvfile,'r',encoding='utf-8') as f: reader = csv.reader(f,dialect='excel') for row in reader: data.append(row) index = 0 doc = {} for item in data: if index > 0:#第一行是标题 doc['title'] = item[0] doc['link'] = item[1] doc['date'] = item[2] doc['source'] = item[3] doc['keyword'] = item[4] res = self.es.index(index=self.index_name, doc_type=self.index_type, body=doc) print(res['created']) index += 1 print(index) #obj =ElasticObj("ott","ott_type",ip ="47.93.117.127") obj = ElasticObj("ott1", "ott_type1") #obj.create_index() #obj.Index_Data() #obj.Get_Data_By_Body("电视") #obj.Delete_Index_Data("AWxhMSrJp5xr6Yl4nhAx") #obj.bulk_Index_Data() obj.Index_Data_FromCSV('news2.csv')