使用Python抓取网页内容并保存到es

在今天的文章里,我们来介绍如何使用Python来访问Elasticsearch。如果大家对Elasicsearch的安装及使用还不是很熟的话,建议看我之前的博客文章:如何在Linux,MacOS及Windows上进行安装Elasticsearch,并熟悉Elasticsearch的最基本的使用:开始使用Elasticsearch (1)/(2)/(3)。

在今天的文章中,我们来介绍如何使用Python来把我们需要的数据存入到一个Elasticsearch的索引中,并使用它进行搜索数据及分析数据。

准备工作

安装Python及Elasticsearch python包

首先我们需要安装Python及Elasticsearch相关的Python包。我们可以通过如下的方法来安装:

$ pip install elasticsearch

针对Python3,我们可能需要如下的方法:

$ pip3 install elasticsearch

使用Python创建index及访问index

使用Python创建一个index及访问其index非常直接:

from datetime import datetime
from elasticsearch import Elasticsearch

es = Elasticsearch()

doc = {
    'author': 'kimchy',
    'text': 'Elasticsearch: cool. bonsai cool.',
    'timestamp': datetime.now(),
}

res = es.index(index="test-index", doc_type='_doc', id=1, body=doc)
print(res['result'])

res = es.get(index="test-index", doc_type='_doc', id=1)
print(res['_source'])

es.indices.refresh(index="test-index")

res = es.search(index="test-index", body={"query": {"match_all": {}}})
print("Got %d Hits:" % res['hits']['total']['value'])
for hit in res['hits']['hits']:
    print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])

在这里,首先建立一个连接到Elasticsearch的实例es。然后通过es来创建index,并访问这个新建立的index。我们运行的结果是:

updated
{'author': 'kimchy', 'text': 'Elasticsearch: cool. bonsai cool.', 'timestamp': '2019-08-27T05:18:12.375857'}
Got 1 Hits:
2019-08-27T05:18:12.375857 kimchy: Elasticsearch: cool. bonsai cool.

这里显示是“updated”,这是因为我之前已经创建一个id为1的文档。再次创建时返回updated,并且它的version会自动加1。

在默认的情况下,它使用默认的地址localhost:9200。如果我们想为Elasticsearch链接定义一个新的地址,我们可以使用如下的办法:

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

在上面,我们可以把我们的host及port信息输入到Elasticsearch中,这样我们可以连接到任何我们想要的Elasticsearch安装的实例中。

SSL和身份验证

如果我们的Elasticsearch有安全的认证,您可以将客户端配置为使用SSL连接到elasticsearch集群,包括证书验证和HTTP身份验证:

那么我需要使用如下的方法:

from elasticsearch import Elasticsearch

# you can use RFC-1738 to specify the url

es = Elasticsearch(['https://user:secret@localhost:443'])

# ... or specify common parameters as kwargs

es = Elasticsearch(
    ['localhost', 'otherhost'],
    http_auth=('user', 'secret'),
    scheme="https",
    port=443,
)

# SSL client authentication using client_cert and client_key

from ssl import create_default_context

context = create_default_context(cafile="path/to/cert.pem")
es = Elasticsearch(
    ['localhost', 'otherhost'],
    http_auth=('user', 'secret'),
    scheme="https",
    port=443,
    ssl_context=context,
)
Web scraper及Elasticsearch

下面介绍一个简单的使用Elasticsearch来实现从网路抓取数据的Web Scraper。我们的主要目的是从一个在线的recipe(食谱)抓取数据并存放于Elasticsearch中提供搜索并进行分析。这个网站的内容在https://www.allrecipes.com/recipes/96/salad/。 从网站上我们可以看到有很多的菜谱在那里。我们的分析应用从这个网站抓取数据。

获取并导入数据

抓取数据

首先,我们创建一个叫做get_recipes.py的文件。它的内容是:

import json
from time import sleep
import requests
from bs4 import BeautifulSoup
def parse(u):
    title = '-'
    submit_by = '-'
    description = '-'
    calories = 0
    ingredients = []
    rec = {}
    try:
    r = requests.get(u, headers=headers)
    if r.status_code == 200:
        html = r.text
        soup = BeautifulSoup(html, 'lxml')
        # title
        title_section = soup.select('.recipe-summary__h1')
        # submitter
        submitter_section = soup.select('.submitter__name')
        # description
        description_section = soup.select('.submitter__description')
        # ingredients
        ingredients_section = soup.select('.recipe-ingred_txt')
        # calories
        calories_section = soup.select('.calorie-count')
 
        if calories_section:
            calories = calories_section[0].text.replace('cals', '').strip()
 
        if ingredients_section:
            for ingredient in ingredients_section:
                ingredient_text = ingredient.text.strip()
                if 'Add all ingredients to list' not in ingredient_text and ingredient_text != '':
                    ingredients.append({'step': ingredient.text.strip()})
 
        if description_section:
            description = description_section[0].text.strip().replace('"', '')
 
        if submitter_section:
            submit_by = submitter_section[0].text.strip()
 
        if title_section:
            title = title_section[0].text
 
        rec = {'title': title, 'submitter': submit_by, 'description': description, 'calories': calories,
               'ingredients': ingredients}
except Exception as ex:
    print('Exception while parsing')
    print(str(ex))
finally:
    return json.dumps(rec)
  
  
  if __name__ == '__main__':
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
        'Pragma': 'no-cache'
    }
    url = 'https://www.allrecipes.com/recipes/96/salad/'
    r = requests.get(url, headers=headers)
    if r.status_code == 200:
        html = r.text
        soup = BeautifulSoup(html, 'lxml')
        links = soup.select('.fixed-recipe-card__h3 a')
        for link in links:
            sleep(2)
            result = parse(link['href'])
            print(result)
            print('=================================')
这是一个一个最基本的python应用框架。在主程序里,我们对网址https://www.allrecipes.com/recipes/96/salad/进行访问。如果访问成功,我们BeautifulSoup对返回的html内容进行分析。我们可以得到所有以'.fixed-recipe-card__

h3 a’标识的内容。这个非常类似于jQuery对html进行的查询。这样我们可以的到像如下内容的一个links:

<a class="fixed-recipe-card__title-link" data-content-provider-id="" data-internal-referrer-link="hub recipe" href="https://www.allrecipes.com/recipe/14469/jamies-cranberry-spinach-salad/">
<span class="fixed-recipe-card__title-link">Jamie's Cranberry Spinach Salad</span>
</a>, <a class="fixed-recipe-card__title-link" data-content-provider-id="" data-internal-referrer-link="hub recipe" href="https://www.allrecipes.com/recipe/142027/sweet-restaurant-slaw/">
<span class="fixed-recipe-card__title-link">Sweet Restaurant Slaw</span>
</a>, <a class="fixed-recipe-card__title-link" data-content-provider-id="" data-internal-referrer-link="hub recipe" href="https://www.allrecipes.com/recipe/14276/strawberry-spinach-salad-i/">
<span class="fixed-recipe-card__title-link">Strawberry Spinach Salad I</span>

img

上面的内容是一个数组,它里面含有一个叫做href的项。它是一个链接指向另外一个页面描述这个菜的的食谱,比如https://www.allrecipes.com/recipe/14469/jamies-cranberry-spinach-salad/

parse是一个用来解析一个食谱链接的数据。通过BeautifulSoup的使用,如法炮制,解析其中的数据项,比如title_section, submitter_section等,并最终得到我们所需要的title, submitter等数据。最终这个数据以json的形式返回。返回的结果就像如下的数据:

{
  "calories": "253",
  "description": "This is a great salad for a buffet, with interesting textures and southwest flavors combined in one delicious salad.  Leftovers store well refrigerated for several days.",
  "ingredients": [
    {
      "step": "1 cup uncooked couscous"
    },
    {
      "step": "1 1/4 cups chicken broth"
    },
    {
      "step": "3 tablespoons extra virgin olive oil"
    },
    {
      "step": "2 tablespoons fresh lime juice"
    },
    {
      "step": "1 teaspoon red wine vinegar"
    },
    {
      "step": "1/2 teaspoon ground cumin"
    },
    {
      "step": "8 green onions, chopped"
    },
    {
      "step": "1 red bell pepper, seeded and chopped"
    },
    {
      "step": "1/4 cup chopped fresh cilantro"
    },
    {
      "step": "1 cup frozen corn kernels, thawed"
    },
    {
      "step": "2 (15 ounce) cans black beans, drained"
    },
    {
      "step": "salt and pepper to taste"
    }
  ],
  "submitter": "Paula",
  "title": "Black Bean and Couscous Salad"
}

创建Index

我们从上面parse的数据最终我们想存储于一个Elasticsearch的Index里,并供以后的搜索及分析。为了达到这个目的,我们必须创建一个index。我们命名这个Index的名字为recipes。我们把type的名字叫做salad。另外我们也必须创建一个mapping。

为了能够创建一个index,我们必须连接Elasticsearch服务器。

def connect_elasticsearch():
    """
    :rtype: object
    """
    _es = None
    _es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    if _es.ping():
        print('Yay Connected')
    else:
        print('Awww it could not connect!')
    return _es

为了能够是的上面的代码工作,我们必须加入使用Elasticsearch库:

from elasticsearch import Elasticsearch

我们可以修改上面的localhost来连接到我们自己的Elasticsearch服务器。如果连接成功,它将返回”Yay Connected”,并最终返回一个可以被使用的Elasticsearch实例。这里的_es.ping()可以用来ping一下服务器。如果连接成功将返回True。

下面,我们用上面返回的Elasticsearch实例来创建一个index:

def create_index(es_object, index_name):
    created = False
    # index settings
    settings = {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        },
        "mappings": {
            "salads": {
                "dynamic": "strict",
                "properties": {
                    "title": {
                        "type": "text"
                    },
                    "submitter": {
                        "type": "text"
                    },
                    "description": {
                        "type": "text"
                    },
                    "calories": {
                        "type": "integer"
                    },
                    "ingredients": {
                        "type": "nested",
                        "properties": {
                            "step": {"type": "text"}
                        }
                    },
                }
            }
        }
    }
    try:
    if not es_object.indices.exists(index_name):
        # Ignore 400 means to ignore "Index Already Exist" error.
        es_object.indices.create(index=index_name, ignore=400, body=settings)
        print('Created Index')
    created = True
except Exception as ex:
    print(str(ex))
finally:
    return created

这里,我们通过一个settings变量把Elasticsearch所需要的settings及mappings一并放入这个字典中,并通过上面通过连接到Elasticsearch服务器返回的es_object来创建这个index。如果成功将返回True,否则返回False。我们可以看看我们这里定义的数据类型,和我上面显示的返回结果。这里我们定义了nested数据类型,这是因为ingredients是一个1对多的关系。如果大家对这个还不是很熟的话,可以参阅我之前写的文章 “Elasticsearch: nested对象”。

接下来,我确保索引根本不存在然后创建它。 检查后不再需要参数ignore = 400,但如果不检查是否存在,则可以抑制错误并覆盖现有索引。 但这有风险。 这就像覆盖数据库一样。

我们可以在浏览器中地址栏输入地址:http://localhost:9200/recipes/_mappings?pretty。 如果我们看到如下的结果,表名,我们的mapping已经创建成功:

{
  "recipes" : {
    "mappings" : {
      "properties" : {
        "calories" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "description" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "ingredients" : {
          "properties" : {
            "step" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            }
          }
        },
        "submitter" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "title" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}

通过设置dynamic:strict,我们强制Elasticsearch对我们任何新的文档进行严格的检查。注意这里salads是我们的文档的type。在新的Elasticsearch中,我们针对一个index有且只有一个type。我们也可以通过_doc来访问。

存储数据

下一步我们来存储文档

def store_record(elastic_object, index_name, record):
    is_stored = True
    try:
        outcome = elastic_object.index(index=index_name, doc_type='salads', body=record)
        print(outcome)
    except Exception as ex:
        print('Error in indexing data')
        print(str(ex))
        is_stored = False
    finally:
        return is_stored

我们通过传入是的record来把我们需要的数据进行存储。为了能够我们能够存储数据,我们可以必须修改我们之前的main部分代码:

if __name__ == '__main__':
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
        'Pragma': 'no-cache'
    }
    logging.basicConfig(level=logging.ERROR)
    print("starting ...")
 
url = 'https://www.allrecipes.com/recipes/96/salad/'
r = requests.get(url, headers=headers)
if r.status_code == 200:
    html = r.text
    soup = BeautifulSoup(html, 'lxml')
    # print(soup)
    links = soup.select('.fixed-recipe-card__h3 a')
    # print(links)
 
    if len(links) > 0:
        es = connect_elasticsearch()
 
    for link in links:
        # print(link)
 
        sleep(2)
        result = parse(link['href'])
        # print(result)
        if es is not None:
            if create_index(es, 'recipes'):
                out = store_record(es, 'recipes', result)
                print('Data indexed successfully')

搜索数据

现在数据都已经被建立为索引,并存于一个叫做recipies的index里。我们可以Elasticsearch来进行搜索,并分析数据。

def search(es_object, index_name, search):
    res = es_object.search(index=index_name, body=search)
    return res

我们可以通过如下的main来调用:

if __name__ == '__main__':
    es = connect_elasticsearch()
    if es is not None:
        # search_object = {'query': {'match': {'calories': '102'}}}
        # search_object = {'_source': ['title'], 'query': {'match': {'calories': '102'}}}
        search_object = {'query': {'range': {'calories': {'gte': 20}}}}
        result = search(es, 'recipes', json.dumps(search_object))
        print(result)

你可能看到如下的结果:

{'took': 0, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 37, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'recipes', '_type': 'salads', '_id' ... }}

为了完成这个应用的运行,我们必须安装如下的python包:

beautifulsoup4==4.8.0
bs4==0.0.1
certifi==2019.6.16
chardet==3.0.4
elasticsearch==7.0.4
idna==2.8
lxml==4.4.1
requests==2.22.0
soupsieve==1.9.3
urllib3==1.25.3

至此,我们已经完成了整个应用的构造。你可以找到最终的代码:https://github.com/liu-xiao-guo/recipies

参考:


版权声明:本文为CSDN博主「Elastic 中国社区官方博客」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/UbuntuTouch/article/details/100088821

这些信息有用吗?
Do you have any suggestions for improvement?

Thanks for your feedback!