import requests import feedparser # from dataclasses import dataclass, field # 已移除 from typing import Optional # , List, Dict # List 和 Dict 不再需要 from datetime import datetime import time from ..schemas import PodcastEpisode, PodcastChannel def _parse_date(date_str: Optional[str]) -> Optional[datetime]: if not date_str: return None try: # feedparser 已经将日期解析为 time.struct_time 类型 # 我们将其转换为 datetime 类型 if isinstance(date_str, time.struct_time): return datetime.fromtimestamp(time.mktime(date_str)) # 如果 feedparser 解析失败或返回字符串,则回退使用其他字符串格式解析 # 这是一种常见的 RSS 日期格式 return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %z') except (ValueError, TypeError): try: return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %Z') # 处理 GMT, EST 等时区 except (ValueError, TypeError): # 如果时区缺失或无法解析,则尝试不带时区解析 try: return datetime.strptime(date_str[:-6], '%a, %d %b %Y %H:%M:%S') except (ValueError, TypeError): print(f"Warning: Could not parse date string: {date_str}") return None def fetch_rss_content(rss_url: str) -> Optional[bytes]: """ 通过 HTTP 请求获取 RSS feed 的内容。 参数: rss_url: 播客 RSS feed 的 URL。 返回: bytes 类型的 RSS 内容,如果获取失败则返回 None。 """ headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36' } try: response = requests.get(rss_url, headers=headers, timeout=30) response.raise_for_status() # 针对 HTTP 错误抛出异常 return response.content except requests.exceptions.RequestException as e: print(f"获取 RSS feed 时出错: {e}") return None def parse_rss_xml_content(rss_content: bytes) -> Optional[PodcastChannel]: """ 解析播客 RSS XML 内容,并返回其主要信息和剧集详情。 参数: rss_content: bytes 类型的 RSS XML 内容。 返回: 一个包含已解析信息的 PodcastChannel 对象,如果解析失败则返回 None。 """ feed = feedparser.parse(rss_content) if feed.bozo: # 如果 feed 格式不正确,bozo 为 True # feed.bozo_exception 包含异常信息 print(f"警告: RSS feed 可能格式不正确。Bozo 异常: {feed.bozo_exception}") # 即使格式不完全正确,feedparser 通常仍会尝试解析,所以我们不在此处直接返回 None # 但如果关键的 feed 或 channel_info 缺失,后续会自然失败 channel_info = feed.get('feed', {}) if not channel_info: # 如果连基本的 feed 结构都没有,则认为解析失败 print("错误: RSS 内容无法解析为有效的 feed 结构。") return None podcast_channel = PodcastChannel( title=channel_info.get('title'), link=channel_info.get('link'), description=channel_info.get('subtitle') or channel_info.get('description'), language=channel_info.get('language'), image_url=channel_info.get('image', {}).get('href') if channel_info.get('image') else None, author=channel_info.get('author') or channel_info.get('itunes_author'), last_build_date=_parse_date(channel_info.get('updated_parsed') or channel_info.get('published_parsed')) ) for entry in feed.entries: # 确定 shownotes:优先使用 content:encoded,然后是 itunes:summary,其次是 description/summary shownotes = None # 1. 优先尝试 # entry.content 是一个 FeedParserDict 对象列表 if 'content' in entry and entry.content: for content_item in entry.content: # 检查 content_item 是否有 value 属性并且该值非空 if hasattr(content_item, 'value') and content_item.value: shownotes = content_item.value break # 找到第一个有效的 content:encoded,停止查找 # 2. 如果没有从 content:encoded 获得,尝试 itunes:summary if not shownotes and 'itunes_summary' in entry: shownotes = entry.itunes_summary # 3. 最后回退到 summary 或 description if not shownotes: # 回退到 summary 或 description shownotes = entry.get('summary') or entry.get('description') # 从 enclosures 获取音频 URL audio_url = None if 'enclosures' in entry: for enc in entry.enclosures: if enc.get('type', '').startswith('audio/'): audio_url = enc.get('href') break # 解析特定于剧集的 iTunes 标签 itunes_season = None try: itunes_season_str = entry.get('itunes_season') if itunes_season_str: itunes_season = int(itunes_season_str) except (ValueError, TypeError): pass # 如果不是有效整数则忽略 itunes_episode_number = None try: itunes_episode_number_str = entry.get('itunes_episode') if itunes_episode_number_str: itunes_episode_number = int(itunes_episode_number_str) except (ValueError, TypeError): pass # 如果不是有效整数则忽略 episode = PodcastEpisode( title=entry.get('title'), link=entry.get('link'), published_date=_parse_date(entry.get('published_parsed')), summary=entry.get('summary'), # 这通常是较短的版本 shownotes=shownotes, # 这是我们尝试获取的更详细版本 audio_url=audio_url, guid=entry.get('id') or entry.get('guid'), duration=entry.get('itunes_duration'), episode_type=entry.get('itunes_episodetype'), season=itunes_season, episode_number=itunes_episode_number ) podcast_channel.episodes.append(episode) return podcast_channel def parse_podcast_rss(rss_url: str) -> Optional[PodcastChannel]: """ 从给定的 RSS URL 获取并解析播客数据。 参数: rss_url: 播客 RSS feed 的 URL。 返回: 一个包含已解析信息的 PodcastChannel 对象,如果获取或解析失败则返回 None。 """ rss_content = fetch_rss_content(rss_url) if rss_content: return parse_rss_xml_content(rss_content) return None