用python对ElasticSeaarch增删查改

来自CloudWiki
跳转至: 导航搜索

建立ES连接

#coding:utf8
import os
import time
from os import walk
import CSVOP
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

class ElasticObj:
    def __init__(self, index_name,index_type,ip ="127.0.0.1"):
        '''

        :param index_name: 索引名称
        :param index_type: 索引类型
        '''
        self.index_name =index_name
        self.index_type = index_type
        # 无用户名密码状态
        self.es = Elasticsearch([ip])
        #用户名密码状态
        #self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)

#obj =ElasticObj("ott","ott_type",ip ="47.93.117.127")
obj = ElasticObj("ott1", "ott_type1")


创建索引

假如创建索引名称为ott,类型为ott_type的索引,该索引中有五个字段:

  • title:存储中文标题,
  • date:存储日期格式(2017-09-08),
  • keyword:存储中文关键字,
  • source:存储中文来源,
  • link:存储链接,
def create_index(self,index_name="ott",index_type="ott_type"):
        '''
        创建索引,创建索引名称为ott,类型为ott_type的索引
        :param ex: Elasticsearch对象
        :return:
        '''
        #创建映射
        _index_mappings = {
            "mappings": {
                self.index_type: {
                    "properties": {
                        "title": {
                            "type": "text",
                            "index": True,
                            "analyzer": "ik_max_word",
                            "search_analyzer": "ik_max_word"
                        },
                        "date": {
                            "type": "text",
                            "index": True
                        },
                        "keyword": {
                            "type": "string",
                            "index": "not_analyzed"
                        },
                        "source": {
                            "type": "string",
                            "index": "not_analyzed"
                        },
                        "link": {
                            "type": "string",
                            "index": "not_analyzed"
                        }
                    }
                }

            }
        }
        if self.es.indices.exists(index=self.index_name) is not True:
            res = self.es.indices.create(index=self.index_name, body=_index_mappings)
            print res


在代码末尾obj = ElasticObj("ott1", "ott_type1")后面添加:

obj.create_index()

运行结果:

[root@localhost py_test]# python3 ElasticObj.py

 {'acknowledged': True, 'shards_acknowledged': True}

索引数据

def Index_Data(self):
        '''
        数据存储到es
        :return:
        '''
        list = [
            {   "date": "2017-09-13",
                "source": "慧聪网",
                "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml",
                "keyword": "电视",
                "title": "付费 电视 行业面临的转型和挑战"
             },
            {   "date": "2017-09-13",
                "source": "中国文明网",
                "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml",
                "keyword": "电视",
                "title": "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心"
             }
              ]
        for item in list:
            res = self.es.index(index=self.index_name, doc_type=self.index_type, body=item)
            print(res['created'])

在代码末尾添加调用它的语句(同时把上一行注释掉):

#obj =ElasticObj("ott","ott_type",ip ="47.93.117.127")
obj = ElasticObj("ott1", "ott_type1")
#obj.create_index()
obj.Index_Data()

运行此程序:python3 ElasticObj.py

运行后的验证:

[root@localhost py_test]# curl 'localhost:9200/ott1/_search?pretty'

就会看到刚才插入的两条数据

批量索引数据

代码:

  def bulk_Index_Data(self):
        '''
        用bulk将批量数据存储到es
        :return:
        '''
        list = [
            {"date": "2017-09-13",
             "source": "慧聪网",
             "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml",
             "keyword": "电视",
             "title": "付费 电视 行业面临的转型和挑战"
             },
            {"date": "2017-09-13",
             "source": "中国文明网",
             "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml",
             "keyword": "电视",
             "title": "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心"
             },
            {"date": "2017-09-13",
             "source": "人民电视",
             "link": "http://tv.people.com.cn/BIG5/n1/2017/0913/c67816-29533981.html",
             "keyword": "电视",
             "title": "中国第21批赴刚果(金)维和部隊启程--人民 电视 --人民网"
             },
            {"date": "2017-09-13",
             "source": "站长之家",
             "link": "http://www.chinaz.com/news/2017/0913/804263.shtml",
             "keyword": "电视",
             "title": "电视 盒子 哪个牌子好? 吐血奉献三大选购秘笈"
             }
        ]
        ACTIONS = []
        i = 1
        for line in list:
            action = {
                "_index": self.index_name,
                "_type": self.index_type,
                "_id": i, #_id 也可以默认生成,不赋值
                "_source": {
                    "date": line['date'],
                    "source": line['source'],
                    "link": line['link'],
                    "keyword": line['keyword'],
                    "title": line['title']}
            }
            i += 1
            ACTIONS.append(action)
            # 批量处理
        success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)
        print('Performed %d actions' % success)



文末调用语句:

#obj =ElasticObj("ott","ott_type",ip ="47.93.117.127")
obj = ElasticObj("ott1", "ott_type1")
#obj.create_index()
#obj.Index_Data()
obj.Get_Data_By_Body("电视")
#obj.Delete_Index_Data("AWxhMSrJp5xr6Yl4nhAx")
obj.bulk_Index_Data()

运行此程序:python3 ElasticObj.py

运行后的验证:

[root@localhost py_test]# curl 'localhost:9200/ott1/_search?pretty'

就会看到刚才批量插入的四条数据

从CSV文件中导入数据

代码:

import csv
def Index_Data_FromCSV(self,csvfile):
        '''
        从CSV文件中读取数据,并存储到es中
        :param csvfile: csv文件,包括完整路径
        :return:
        '''
        data = []
        with open(csvfile,'r',encoding='utf-8') as f:
            reader = csv.reader(f,dialect='excel')
            for row in reader:
                data.append(row)

        index = 0
        doc = {}
        for item in data:
            if index > 0:#第一行是标题
                doc['title'] = item[0]
                doc['link'] = item[1]
                doc['date'] = item[2]
                doc['source'] = item[3]
                doc['keyword'] = item[4]
                res = self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
                print(res['created'])
            index += 1
            print(index)

文末调用代码:

#obj =ElasticObj("ott","ott_type",ip ="47.93.117.127")
obj = ElasticObj("ott1", "ott_type1")
#obj.create_index()
#obj.Index_Data()
#obj.Get_Data_By_Body("电视")
#obj.Delete_Index_Data("AWxhMSrJp5xr6Yl4nhAx")
#obj.bulk_Index_Data()
obj.Index_Data_FromCSV('news2.csv')

注:CSV文件可以有爬虫程序获取,也可以自己用Excek软件新建一个(另存为csv格式即可)

可通过ES的命令行操作 查询CSV文件是否导入成功。

查询索引

   def Get_Data_By_Body(self,word):
        # doc = {'query': {'match_all': {}}}
        doc = {
            "query": {
                "match": {
                    "keyword": word
                }
            }
        }

        _searched = self.es.search(index=self.index_name, body=doc)

        for hit in _searched['hits']['hits']:
            # print(hit['_source'])
            print(hit['_source']['date'], hit['_source']['source'], hit['_source']['link'], hit['_source']['keyword'], \
            hit['_source']['title'])

文末的调用代码修改如下:

#obj =ElasticObj("ott","ott_type",ip ="47.93.117.127")
obj = ElasticObj("ott1", "ott_type1")
#obj.create_index()
#obj.Index_Data()
obj.Get_Data_By_Body("电视")

运行程序;

[root@localhost py_test]# python3 ElasticObj.py

2017-09-13 中国文明网 http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml 电视 电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心
2017-09-13 慧聪网 http://info.broadcast.hc360.com/2017/09/130859749974.shtml 电视 付费 电视 行业面临的转型和挑战

删除数据

删除之前先查一下该条数据的id:

[root@localhost py_test]# curl 'localhost:9200/ott1/_search?pretty'
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "ott1",
        "_type" : "ott_type1",
        "_id" : "AWxhMSrJp5xr6Yl4nhAx",
        "_score" : 1.0,
        "_source" : {
          "date" : "2017-09-13",
          "source" : "中国文明网",
          "link" : "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml",
          "keyword" : "电视",
          "title" : "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心"
        }
      },
   。。。
}

函数代码:

    def Delete_Index_Data(self,id):
        '''
        删除索引中的一条
        :param id:
        :return:
        '''
        res = self.es.delete(index=self.index_name, doc_type=self.index_type, id=id)
        print(res)

调用代码:


#obj =ElasticObj("ott","ott_type",ip ="47.93.117.127")
obj = ElasticObj("ott1", "ott_type1")
#obj.create_index()
#obj.Index_Data()
obj.Get_Data_By_Body("电视")
obj.Delete_Index_Data("AWxhMSrJp5xr6Yl4nhAx")#待删除的id号

执行程序:

[root@localhost py_test]# python3 ElasticObj.py

AWxhMSrJp5xr6Yl4nhAx 2017-09-13 中国文明网 http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml 电视 电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心
AWxhMSpVp5xr6Yl4nhAw 2017-09-13 慧聪网 http://info.broadcast.hc360.com/2017/09/130859749974.shtml 电视 付费 电视 行业面临的转型和挑战
{'found': True, '_index': 'ott1', '_type': 'ott_type1', '_id': 'AWxhMSrJp5xr6Yl4nhAx', '_version': 2, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}}

完整代码

#coding:utf8
import os
import time
from os import walk
import csv
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

class ElasticObj:
    def __init__(self, index_name,index_type,ip ="127.0.0.1"):
        '''

        :param index_name: 索引名称
        :param index_type: 索引类型
        '''
        self.index_name =index_name
        self.index_type = index_type
        # 无用户名密码状态
        self.es = Elasticsearch( )
        #用户名密码状态
        #self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)

    def create_index(self,index_name="ott",index_type="ott_type"):
        '''
        创建索引,创建索引名称为ott,类型为ott_type的索引
        :param ex: Elasticsearch对象
        :return:
        '''
        #创建映射
        _index_mappings = {
            "mappings": {
                self.index_type: {
                    "properties": {
                        "title": {
                            "type": "text",
                            "index": True,
                            "analyzer": "ik_max_word",
                            "search_analyzer": "ik_max_word"
                        },
                        "date": {
                            "type": "text",
                            "index": True
                        },
                        "keyword": {
                            "type": "string",
                            "index": "not_analyzed"
                        },
                        "source": {
                            "type": "string",
                            "index": "not_analyzed"
                        },
                        "link": {
                            "type": "string",
                            "index": "not_analyzed"
                        }
                    }
                }

            }
        }
        if self.es.indices.exists(index=self.index_name) is not True:
            res = self.es.indices.create(index=self.index_name, body=_index_mappings)
            print(res)
    def Index_Data(self):
        '''
        数据存储到es
        :return:
        '''
        list = [
            {   "date": "2017-09-13",
                "source": "慧聪网",
                "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml",
                "keyword": "电视",
                "title": "付费 电视 行业面临的转型和挑战"
             },
            {   "date": "2017-09-13",
                "source": "中国文明网",
                "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml",
                "keyword": "电视",
                "title": "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心"
             }
              ]
        for item in list:
            res = self.es.index(index=self.index_name, doc_type=self.index_type, body=item)
            print(res['created'])

    def Get_Data_By_Body(self,word):
        # doc = {'query': {'match_all': {}}}
        doc = {
            "query": {
                "match": {
                    "keyword": word
                }
            }
        }

        _searched = self.es.search(index=self.index_name, body=doc)

        for hit in _searched['hits']['hits']:
            # print(hit['_source'])
            print(hit['_id'],hit['_source']['date'], hit['_source']['source'], hit['_source']['link'], hit['_source']['keyword'], \
            hit['_source']['title'])
    def Delete_Index_Data(self,id):
        '''
        删除索引中的一条
        :param id:
        :return:
        '''
        res = self.es.delete(index=self.index_name, doc_type=self.index_type, id=id)
        print(res)

    def bulk_Index_Data(self):
        '''
        用bulk将批量数据存储到es
        :return:
        '''
        list = [
            {"date": "2017-09-13",
             "source": "慧聪网",
             "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml",
             "keyword": "电视",
             "title": "付费 电视 行业面临的转型和挑战"
             },
            {"date": "2017-09-13",
             "source": "中国文明网",
             "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml",
             "keyword": "电视",
             "title": "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心"
             },
            {"date": "2017-09-13",
             "source": "人民电视",
             "link": "http://tv.people.com.cn/BIG5/n1/2017/0913/c67816-29533981.html",
             "keyword": "电视",
             "title": "中国第21批赴刚果(金)维和部隊启程--人民 电视 --人民网"
             },
            {"date": "2017-09-13",
             "source": "站长之家",
             "link": "http://www.chinaz.com/news/2017/0913/804263.shtml",
             "keyword": "电视",
             "title": "电视 盒子 哪个牌子好? 吐血奉献三大选购秘笈"
             }
        ]
        ACTIONS = []
        i = 1
        for line in list:
            action = {
                "_index": self.index_name,
                "_type": self.index_type,
                "_id": i, #_id 也可以默认生成,不赋值
                "_source": {
                    "date": line['date'],
                    "source": line['source'],
                    "link": line['link'],
                    "keyword": line['keyword'],
                    "title": line['title']}
            }
            i += 1
            ACTIONS.append(action)
            # 批量处理
        success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)
        print('Performed %d actions' % success)

    def Index_Data_FromCSV(self,csvfile):
        '''
        从CSV文件中读取数据,并存储到es中
        :param csvfile: csv文件,包括完整路径
        :return:
        '''
        data = []
        with open(csvfile,'r',encoding='utf-8') as f:
            reader = csv.reader(f,dialect='excel')
            for row in reader:
                data.append(row)

        index = 0
        doc = {}
        for item in data:
            if index > 0:#第一行是标题
                doc['title'] = item[0]
                doc['link'] = item[1]
                doc['date'] = item[2]
                doc['source'] = item[3]
                doc['keyword'] = item[4]
                res = self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
                print(res['created'])
            index += 1
            print(index)

#obj =ElasticObj("ott","ott_type",ip ="47.93.117.127")
obj = ElasticObj("ott1", "ott_type1")
#obj.create_index()
#obj.Index_Data()
#obj.Get_Data_By_Body("电视")
#obj.Delete_Index_Data("AWxhMSrJp5xr6Yl4nhAx")
#obj.bulk_Index_Data()
obj.Index_Data_FromCSV('news2.csv')

参考文档:[1] https://www.cnblogs.com/shaosks/p/7592229.html