无意中发现 Liuli 这个项目,项目 Github:https://github.com/liuli-io/liuli
看了其文章,发现 Liuli 是 Python 实现的,便打算简单看看其实现细节,老规矩,看项目,先将好奇点写下来:
对,我就对这两点感兴趣,经过一番阅读后,关于好奇 1,其实人家没有实现漂亮的 PC 软件界面,Liuli 只是采集,然后将内容推送过去,所以本文的重点,就是看一下它是怎么采集公众号文章的,此外在阅读过程中,发现 LiuLi 还使用了简单的方法来识别文章是否为广告文章,这点也挺有意思的,也记录一下。
Liuli 基于搜狗微信(https://weixin.sogou.com/)对公众号文章进行采集,而且实现了 2 种方式:
我们可以通过相应的配置文件控制 Liuli 使用其中哪种方式来进行文章采集,其默认使用 ruia 的方式进行采集。
Liuli 将功能分为多个模块,然后通过调度器去调度不同的模块,调度器启动方法代码如下:
# src/liuli_schedule.py
def start(ll_config_name: str = ""):
"""调度启动函数
Args:
task_config (dict): 调度任务配置
"""
if not ll_config_name:
freeze_support()
# 默认启动 liuli_config 目录下所有配置
ll_config_name_list = []
for each_file in os.listdir(Config.LL_CONFIG_DIR):
if each_file.endswith("json"):
# 加入启动列表
ll_config_name_list.append(each_file.replace(".json", ""))
# 进程池
p = Pool(len(ll_config_name_list))
for each_ll_config_name in ll_config_name_list:
LOGGER.info(f"Task {each_ll_config_name} register successfully!")
p.apply_async(run_liuli_schedule, args=(each_ll_config_name,))
p.close()
p.join()
else:
run_liuli_schedule(ll_config_name)
从代码可知,调度器会启动 Python 的进程池,然后向其中添加 run_liuli_schedule 异步任务,该异步任务中,会执行 run_liuli_task 方法,该方法才是一次完整的任务流程,代码如下:
def run_liuli_task(ll_config: dict):
"""执行调度任务
Args:
ll_config (dict): Liuli 任务配置
"""
# 文章源, 用于基础查询条件
doc_source: str = ll_config["doc_source"]
basic_filter = {"basic_filter": {"doc_source": doc_source}}
# 采集器配置
collector_conf: dict = ll_config["collector"]
# 处理器配置
processor_conf: dict = ll_config["processor"]
# 分发器配置
sender_conf: dict = ll_config["sender"]
sender_conf.update(basic_filter)
# 备份器配置
backup_conf: dict = ll_config["backup"]
backup_conf.update(basic_filter)
# 采集器执行
LOGGER.info("采集器开始执行!")
for collect_type, collect_config in collector_conf.items():
collect_factory(collect_type, collect_config)
LOGGER.info("采集器执行完毕!")
# 采集器执行
LOGGER.info("处理器(after_collect): 开始执行!")
for each in processor_conf["after_collect"]:
func_name = each.pop("func")
# 注入查询条件
each.update(basic_filter)
LOGGER.info(f"处理器(after_collect): {func_name} 正在执行...")
processor_dict[func_name](**each)
LOGGER.info("处理器(after_collect): 执行完毕!")
# 分发器执行
LOGGER.info("分发器开始执行!")
send_doc(sender_conf)
LOGGER.info("分发器执行完毕!")
# 备份器执行
LOGGER.info("备份器开始执行!")
backup_doc(backup_conf)
LOGGER.info("备份器执行完毕!")
从 run_liuli_task 方法可知,Liuli 一次任务需要执行:
关于 Liuli 的功能,可以阅读作者本人的文章: [基于 Liuli 构建纯净的 RSS 公众号信息流] ,这里先只关注公众号采集的逻辑。
因为有 ruia 与 playwright 两种不同方式实现的采集器,具体使用哪种,通过配置文件决定,然后通过 import_module 方法动态导入相应的模块,然后运行模块的 run 方法,从而实现公众号文章的采集,相关代码如下:
def collect_factory(collect_type: str, collect_config: dict) -> bool:
"""
采集器工厂函数
:param collect_type: 采集器类型
:param collect_config: 采集器配置
:return:
"""
collect_status = False
try:
# import_module方法动态载入具体的采集模块
collect_module = import_module(f"src.collector.{collect_type}")
collect_status = collect_module.run(collect_config)
except ModuleNotFoundError:
LOGGER.error(f"采集器类型不存在 {collect_type} - {collect_config}")
except Exception as e:
LOGGER.error(f"采集器执行出错 {collect_type} - {collect_config} - {e}")
return collect_status
playwright 是微软出品的自动化库,与 selenium 的作用类似,定位于网页测试,但也被人用于网页信息的获取,可见即可得,使用门槛低,因为要加载网页信息,所以性能比较差,当然一些前端反爬的措施,playwright 也无法突破。
playwright 相比于 selenium,支持 python 的 async,性能有所提升(但还是比不了直接请求),这里贴一下获取某公众号下最新文章的部分逻辑(完整代码太长):
async def playwright_main(wechat_name: str):
"""利用 playwright 获取公众号元信息,输出数据格式见上方
Args:
wechat_name ([str]): 公众号名称
"""
wechat_data = {}
try:
async with async_playwright() as p:
# browser = await p.chromium.launch(headless=False)
browser = await p.chromium.launch()
context = await browser.new_context(user_agent=Config.SPIDER_UA)
page = await context.new_page()
# 进行公众号检索
await page.goto("https://weixin.sogou.com/")
await page.wait_for_load_state()
await page.click('input[name="query"]')
await page.fill('input[name="query"]', wechat_name)
await asyncio.sleep(1)
await page.click("text=搜公众号")
await page.wait_for_load_state()
从上述代码可知,playwright 用法与 selenium 很像,将用户操作网站的流程自动化便可以获取相应的数据了。
ruia 是轻量级的 Python 异步爬虫框架,因为比较轻量,我也将其代码读了一遍,作为下篇文章的内容。
它的用法与 scrapy 有点像,需要定义继承于 ruia.Spider 的子类,然后调用 start 方法实现对目标网站的请求,然后 ruia 会自动调用 parse 方法实现对网页内容的解析,来看一下具体的代码,首先是入口逻辑:
def run(collect_config: dict):
"""微信公众号文章抓取爬虫
Args:
collect_config (dict, optional): 采集器配置
"""
s_nums = 0
wechat_list = collect_config["wechat_list"]
delta_time = collect_config.get("delta_time", 5)
for wechat_name in wechat_list:
SGWechatSpider.wechat_name = wechat_name
SGWechatSpider.request_config = {
"RETRIES": 3,
"DELAY": delta_time,
"TIMEOUT": 20,
}
sg_url = f"https://weixin.sogou.com/weixin?type=1&query={wechat_name}&ie=utf8&s_from=input&_sug_=n&_sug_type_="
SGWechatSpider.start_urls = [sg_url]
try:
# 启动爬虫
SGWechatSpider.start(middleware=ua_middleware)
s_nums += 1
except Exception as e:
err_msg = f" 公众号->{wechat_name} 文章更新失败! 错误信息: {e}"
LOGGER.error(err_msg)
msg = f" 微信公众号文章更新完毕({s_nums}/{len(wechat_list)})!"
LOGGER.info(msg)
上述代码中,通过 SGWechatSpider.start (middleware=ua_middleware) 启动了爬虫,它会自动请求 start_urls 的 url,然后回调 parse 方法,parse 方法代码如下:
async def parse(self, response: Response):
"""解析公众号原始链接数据"""
html = await response.text()
item_list = []
async for item in SGWechatItem.get_items(html=html):
if item.wechat_name == self.wechat_name:
item_list.append(item)
yield self.request(
url=item.latest_href,
metadata=item.results,
# 下一个回调方法
callback=self.parse_real_wechat_url,
)
break
parse 方法中,会通过 self.request 请求新的 url,然后回调 self.parse_real_wechat_url 方法,一切都与 scrapy 如此相似。
至此,采集模块的阅读就结束了(代码中还涉及一些简单的数据清洗,本文就不讨论了),没有特别复杂的部分,从代码上看,也没有发送作者做反爬逻辑的处理,搜狗微信没有反爬?
接着看一下广告文章识别,Liuli 对于广告文章,还是会采集的,采集后,在文章处理模块,会将广告文章标注出来,先理一下广告文章标注的入口逻辑,回到 liuli_schedule.py 的 run_lili_task 方法,关注到 process(文章处理模块)的逻辑,代码如下:
LOGGER.info("处理器(after_collect): 开始执行!")
for each in processor_conf["after_collect"]:
func_name = each.pop("func")
# 注入查询条件
each.update(basic_filter)
LOGGER.info(f"处理器(after_collect): {func_name} 正在执行...")
processor_dict[func_name](**each)
LOGGER.info("处理器(after_collect): 执行完毕!")
从上述代码可知,处理器的主要逻辑是 processor_dict 字典中的方法,该字典的定义的路径为 src/processor/__init__.py
,代码如下:
from .rss_utils import to_rss
from .text_utils import (
ad_marker,
extract_core_html,
extract_keyword_list,
html_to_text_h2t,
str_replace,
)
processor_dict = {
"to_rss": to_rss,
"ad_marker": ad_marker,
"str_replace": str_replace,
}
其中 ad_marker 方法便是识别文章是否为广告文章的方法,其实写的有点绕,核心逻辑就是计算当前文章与采集到的广告文章词频构建向量的余弦值,判断余弦值大小来判断是否为广告文章,简单看一下相关的逻辑。
ad_marker 方法中会调用 model_predict_factory 方法,将当前文章的标题、文章内容以及分类的 cos_value 传入,相关代码如下(清理了代码,只展示了需要部分):
def ad_marker(
cos_value: float = 0.6,
is_force=False,
basic_filter={},
**kwargs,
):
# 基于余弦相似度
cos_model_resp = model_predict_factory(
model_name="cos",
model_path="",
input_dict={"text": doc_name + doc_keywords, "cos_value": cos_value},
# input_dict={"text": doc_name, "cos_value": Config.COS_VALUE},
).to_dict()
cos_value 为 0.6,即如果计算出当前文章与广告文章余弦值大于等于 0.6,则认为当前文章为广告文章,其最终预测逻辑在 classifier/model_base/cos_model_loader.py 的 predict 方法中,代码如下:
def predict(self, text: str, cos_value: float = 0.8) -> dict:
"""
对文本相似度进行预测
:param text: 文本
:param cos_value: 阈值 默认是0.9
:return:
"""
max_pro, result = 0.0, 0
for each in self.train_data:
# 余弦值具体的运算逻辑
cos = CosineSimilarity(self.process_text(text), each)
res_dict = cos.calculate()
value = res_dict["value"]
# 大于等于cos_value,就返回1,则表示当前的文章是广告文章
result = 1 if value >= cos_value else 0
max_pro = value if value > max_pro else max_pro
if result == 1:
break
return {"result": result, "value": max_pro}
余弦值具体的运算逻辑在 CosineSimilarity 的 calculate 方法中,都是数学相关的代码,就不看了,其核心是希望判断当前文章与广告文章的相似度,类似的还可以通过 TFIDF、文本聚类等算法来做,相关的库,几行代码就可以搞定(所以我感觉这里写绕了)。
END
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8