- 首页
- 正文
极简的端到端 Scrapy 教程(第三部分)【翻译】
在第二部分,你已经从网页里提取了所必需的数据并把它们存储在条目里。在第三部分,我将会介绍条目管道,以使用 ORM(SQLAlchemy)将提取的数据保存到数据库和处理重复的数据问题。
爬虫返回的每个条目都会按顺序被发送到条目管道作额外的处理,例如存储条目到数据库,数据校验,移除重复项等等。条目管道被作为类定义在 pipelines.py 文件中,打开这个自动产生的文件,你会看到一个名叫 “TutorialPipeline” 的空管道:
你需要在 settings.py 文件指定启用了哪个管道以及管道的顺序,默认,管道是不开启的。要启用上面的空管道,请在 settings.py 注释掉以下部分:
整数的值(通常范围为0到1000),例如上面所示的300,用来管道的执行顺序(值较低的管道先运行)。
接下来,让我们开发一个管道将条目存储到数据库。在这里,我使用对象关系映射(ORM)来查询和修改数据库中使用面向对象范式的数据。特别是,我使用 SQLAlchemy。我不会涵盖 ORM 的细节和请参阅一些 Pros 和 Cons 的文章。
首先,让我们设计数据库模式。请注意,这个条目里有6个字段,例如,引用内容,标签,作者姓名,作者生日,作者出生地和简历。我将使用三个表来存储这些数据,比如,引用,标签和作者。在引用和标签表里有多对多的关系(一个引用能有一个或者多个标签和一个标签能关联一个或者多个引用),在作者和引用表里有一对多的关系(一个作者能有一个或者多个引用,但是一个引用只能属于一个作者)
通过 SQLAlchemy 使用 ORM 来定义模式,你需要:
在 requirements.txt 添加 SQLAlchemy>=1.3.6 和 在虚拟的环境里,通过运行 pip install -r requirements.txt 来安装这个包
用下面的内容创建 models.py 文件:
from sqlalchemy import create_engine, Column, Table, ForeignKey, MetaData
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import (
Integer, String, Date, DateTime, Float, Boolean, Text)
from scrapy.utils.project import get_project_settings
Base = declarative_base()
def db_connect():
"""
Performs database connection using database settings from settings.py.
Returns sqlalchemy engine instance
"""
return create_engine(get_project_settings().get("CONNECTION_STRING"))
def create_table(engine):
Base.metadata.create_all(engine)
# Association Table for Many-to-Many relationship between Quote and Tag
# https://docs.sqlalchemy.org/en/13/orm/basic_relationships.html#many-to-many
quote_tag = Table('quote_tag', Base.metadata,
Column('quote_id', Integer, ForeignKey('quote.id')),
Column('tag_id', Integer, ForeignKey('tag.id'))
)
class Quote(Base):
__tablename__ = "quote"
id = Column(Integer, primary_key=True)
quote_content = Column('quote_content', Text())
author_id = Column(Integer, ForeignKey('author.id')) # Many quotes to one author
tags = relationship('Tag', secondary='quote_tag',
lazy='dynamic', backref="quote") # M-to-M for quote and tag
class Author(Base):
__tablename__ = "author"
id = Column(Integer, primary_key=True)
name = Column('name', String(50), unique=True)
birthday = Column('birthday', DateTime)
bornlocation = Column('bornlocation', String(150))
bio = Column('bio', Text())
quotes = relationship('Quote', backref='author') # One author to many Quotes
class Tag(Base):
__tablename__ = "tag"
id = Column(Integer, primary_key=True)
name = Column('name', String(30), unique=True)
quotes = relationship('Quote', secondary='quote_tag',
lazy='dynamic', backref="tag") # M-to-M for quote and tag
db_connect()
函数使用 create_engine(get_project_settings().get(“CONNECTION_STRING”))
来连接数据库。CONNECTION_STRING
是在 settings.py 文件里指定的。你可以更改这个连接字符来连接不同的数据库系统,例如 SQLite,MySQL,Postgres,而不需要更改代码。在本教程中,我使用 SQLite,它本质上是一个名为 scrapy_quotes.db
的本地文件,当第一次爬虫运行的时候,它会在根目录里创建这个db文件。
CONNECTION_STRING = 'sqlite:///scrapy_quotes.db'
我还提供一个连接 MySQL 的例子(已注释):
# MySQL
CONNECTION_STRING = "{drivername}://{user}:{passwd}@{host}:{port}/{db_name}?charset=utf8".format(
drivername="mysql",
user="harrywang",
passwd="tutorial",
host="localhost",
port="3306",
db_name="scrapy_quotes",
)
现在,让我们创建一个管道来存储条目到数据库。打开 pipelines.py 和 添加下面的类(管道):
class SaveQuotesPipeline(object):
def __init__(self):
"""
Initializes database connection and sessionmaker
Creates tables
"""
engine = db_connect()
create_table(engine)
self.Session = sessionmaker(bind=engine)
def process_item(self, item, spider):
"""Save quotes in the database
This method is called for every item pipeline component
"""
session = self.Session()
quote = Quote()
author = Author()
tag = Tag()
author.name = item["author_name"]
author.birthday = item["author_birthday"]
author.bornlocation = item["author_bornlocation"]
author.bio = item["author_bio"]
quote.quote_content = item["quote_content"]
# check whether the author exists
exist_author = session.query(Author).filter_by(name = author.name).first()
if exist_author is not None: # the current author exists
quote.author = exist_author
else:
quote.author = author
# check whether the current quote has tags or not
if "tags" in item:
for tag_name in item["tags"]:
tag = Tag(name=tag_name)
# check whether the current tag already exists in the database
exist_tag = session.query(Tag).filter_by(name = tag.name).first()
if exist_tag is not None: # the current tag exists
tag = exist_tag
quote.tags.append(tag)
try:
session.add(quote)
session.commit()
except:
session.rollback()
raise
finally:
session.close()
return item
确保你也引用了需要的包和函数:
在 models.py 文件中使用下面的 init 函数用来连接到数据库(db_connect
)和 创建表(create_table
)如果还不存在的话(否则忽略 )
在 process_item 函数,我首先创建一个数据库会话实例和三张表。然后,我分配了作者的信息和引用文本的值到相应的表行里。
下一步,我们需要检查当前条目里的作者和标签是否已经存在数据库了,如果它们目前不存在,就可以创建新的作者/标签。
最后,我添加 quote 到数据库:
注意,由于ORM中指定的关系(quote.author和quote.tags),您不需要显式添加作者和标记-新的作者/标记(如果有)将由SQLAlchemy自动创建和插入。
现在,运行爬虫 scrapy crawl quotes,你应该能看到一个名叫 scrapy_quotes.db 的 SQLite 文件被创建。你能使用 SQLite 命令行来打开这个文件以查看提取的内容:
$ sqlite3 scrapy_quotes.db
...
sqlite> .tables
author quote quote_tag tag
sqlite> select * from quote limit 3;
1|The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking.|1
2|Imperfection is beauty, madness is genius and it's better to be absolutely ridiculous than absolutely boring.|2
3|The person, be it gentleman or lady, who has not pleasure in a good novel, must be intolerably stupid.|3
sqlite> .quit
或者用 DB 来浏览 SQLite:
注意,我们提取了50条的引用。假设网站可能添加额外的引用,你希望每周运行一次爬虫来收集新的条目如果有的话。因此,让我们再一次运行爬虫 scrapy crawl quotes,你可能会注意到一个问题:现在我们在数据库里有100条引用 - 同样的50个引用被再次提取和存储!
下一步,让我们添加另外一个管道来检查这个条目是否重复,如果有,删除这个条目,使得这个条目不会通过其余的管道。
打开 pipelines.py 文件和添加下面的类(pipeline):
class DuplicatesPipeline(object):
def __init__(self):
"""
Initializes database connection and sessionmaker.
Creates tables.
"""
engine = db_connect()
create_table(engine)
self.Session = sessionmaker(bind=engine)
logging.info("****DuplicatesPipeline: database connected****")
def process_item(self, item, spider):
session = self.Session()
exist_quote = session.query(Quote).filter_by(quote_content = item["quote_content"]).first()
session.close()
if exist_quote is not None: # the current quote exists
raise DropItem("Duplicate item found: %s" % item["quote_content"])
else:
return item
确保导入 DropItem 的异常: from scrapy.exceptions import DropItem。逻辑非常简单:在数据库里查询是否已经存在当前条目引用文本,如果是,就删除这个条目。现在,你需要在 settings.py 里开启这个管道并确保在保存到数据库管道之前执行重复的管道:
你可以先删除 SQLite 文件并运行这个爬虫几次,你将会看到在数据库里有只有开始时的50条引用。之后,你能看到警告的信息提示重复的条目被删除了。
2019-09-12 11:16:04 [scrapy.core.scraper] WARNING: Dropped: Duplicate item found
...
2019-09-12 11:16:04 [scrapy.core.engine] INFO: Closing spider (finished)
2019-09-12 11:16:04 [scrapy.statscollectors] INFO: Dumping Scrapy stats:
...
'item_dropped_count': 50,
'item_dropped_reasons_count/DropItem': 50,
...
你已经完成了第三部分了!!干杯,在第四部分,我将会给你展示如何部署爬虫来定期爬取和监视,例如,每10分钟自动运行爬虫。