From de66913ddbefcf4e46d2c634f7612e759c574318 Mon Sep 17 00:00:00 2001 From: lemon24 Date: Sat, 11 May 2024 11:52:54 +0300 Subject: [PATCH] Pass entire Reader to _update.Pipeline. #332 (Pipeline will need the name scheme and get_tag() too.) --- src/reader/_update.py | 44 +++++++++++++++---------------------------- src/reader/core.py | 2 +- 2 files changed, 16 insertions(+), 30 deletions(-) diff --git a/src/reader/_update.py b/src/reader/_update.py index 70fdb584..a983a5d1 100644 --- a/src/reader/_update.py +++ b/src/reader/_update.py @@ -1,7 +1,6 @@ from __future__ import annotations import logging -from collections.abc import Callable from collections.abc import Iterable from dataclasses import dataclass from datetime import datetime @@ -22,7 +21,6 @@ from ._types import FeedToUpdate from ._types import FeedUpdateIntent from ._types import ParsedFeed -from ._types import UpdateHooks from ._utils import count_consumed from ._utils import PrefixLogger from .exceptions import FeedNotFoundError @@ -35,9 +33,7 @@ if TYPE_CHECKING: # pragma: no cover - from ._parser import Parser from ._types import FeedFilter - from ._types import StorageType from ._utils import MapFunction from .core import Reader @@ -288,23 +284,10 @@ class Pipeline: """ - storage: StorageType - parser: Parser - hooks: UpdateHooks[Any] - now: Callable[[], datetime] + reader: Reader map: MapFunction[Any, Any] decider = Decider - @classmethod - def from_reader(cls, reader: Reader, map: MapFunction[Any, Any]) -> Pipeline: - return cls( - storage=reader._storage, - parser=reader._parser, - hooks=reader._update_hooks, - now=reader._now, - map=map, - ) - def update(self, filter: FeedFilter) -> Iterable[UpdateResult]: # global_now is used as first_updated_epoch for all new entries, # so that the subset of new entries from an update appears before @@ -318,7 +301,7 @@ def update(self, filter: FeedFilter) -> Iterable[UpdateResult]: # # However, added == last_updated for the first update. # - global_now = self.now() + global_now = self.reader._now() is_parallel = self.map is not map process_parse_result = partial(self.process_parse_result, global_now) @@ -338,16 +321,18 @@ def parser_process_feeds_for_update( ) -> Iterable[FeedForUpdate]: for feed in feeds: try: - yield self.parser.process_feed_for_update(feed) + yield self.reader._parser.process_feed_for_update(feed) except ParseError as e: parser_process_feeds_for_update_errors.append((feed, e)) # assemble pipeline - feeds_for_update = self.storage.get_feeds_for_update(filter) + feeds_for_update = self.reader._storage.get_feeds_for_update(filter) # feeds_for_update = map(self.parser.process_feed_for_update, feeds_for_update) feeds_for_update = parser_process_feeds_for_update(feeds_for_update) feeds_for_update = map(self.decider.process_feed_for_update, feeds_for_update) - parse_results = self.parser.parallel(feeds_for_update, self.map, is_parallel) + parse_results = self.reader._parser.parallel( + feeds_for_update, self.map, is_parallel + ) parse_results = chain(parse_results, parser_process_feeds_for_update_errors) update_results = starmap(process_parse_result, parse_results) @@ -369,7 +354,7 @@ def process_parse_result( result: ParsedFeed | None | ParseError, ) -> tuple[str, UpdatedFeed | None | Exception]: make_intents = partial( - self.decider.make_intents, feed, self.now(), global_now, result + self.decider.make_intents, feed, self.reader._now(), global_now, result ) try: @@ -377,7 +362,7 @@ def process_parse_result( entry_pairs = self.get_entry_pairs(result) if result and not isinstance(result, Exception): - entry_pairs = self.parser.process_entry_pairs( + entry_pairs = self.reader._parser.process_entry_pairs( feed.url, result.mime_type, entry_pairs ) entry_pairs, get_total_count = count_consumed(entry_pairs) @@ -402,7 +387,7 @@ def get_entry_pairs(self, result: ParsedFeed | None | ParseError) -> EntryPairs: # give storage a chance to consume entries in a streaming fashion entries1, entries2 = tee(result.entries) - entries_for_update = self.storage.get_entries_for_update( + entries_for_update = self.reader._storage.get_entries_for_update( (e.feed_url, e.id) for e in entries1 ) return zip(entries2, entries_for_update, strict=True) @@ -413,17 +398,18 @@ def update_feed( entries: Iterable[EntryUpdateIntent], ) -> tuple[int, int]: url = feed.url + hooks = self.reader._update_hooks - self.hooks.run('before_feed_update', (url,), url) + hooks.run('before_feed_update', (url,), url) if entries: - self.storage.add_or_update_entries(entries) - self.storage.update_feed(feed) + self.reader._storage.add_or_update_entries(entries) + self.reader._storage.update_feed(feed) # if feed_for_update.url != parsed_feed.feed.url, the feed was redirected. # TODO: Maybe handle redirects somehow else (e.g. change URL if permanent). - hook_errors = self.hooks.group("got unexpected after-update hook errors") + hook_errors = hooks.group("got unexpected after-update hook errors") new_count = 0 updated_count = 0 diff --git a/src/reader/core.py b/src/reader/core.py index 9b5c841c..1e34645f 100644 --- a/src/reader/core.py +++ b/src/reader/core.py @@ -973,7 +973,7 @@ def update_feeds_iter( self._update_hooks.run('before_feeds_update', None) with make_map as map: - yield from Pipeline.from_reader(self, map).update(filter) + yield from Pipeline(self, map).update(filter) if _call_feeds_update_hooks: hook_errors = self._update_hooks.group(