Skip to content

Commit

Permalink
Pass entire Reader to _update.Pipeline. #332
Browse files Browse the repository at this point in the history
(Pipeline will need the name scheme and get_tag() too.)
  • Loading branch information
lemon24 committed May 11, 2024
1 parent 61c7c67 commit de66913
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 30 deletions.
44 changes: 15 additions & 29 deletions src/reader/_update.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import logging
from collections.abc import Callable
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime
Expand All @@ -22,7 +21,6 @@
from ._types import FeedToUpdate
from ._types import FeedUpdateIntent
from ._types import ParsedFeed
from ._types import UpdateHooks
from ._utils import count_consumed
from ._utils import PrefixLogger
from .exceptions import FeedNotFoundError
Expand All @@ -35,9 +33,7 @@


if TYPE_CHECKING: # pragma: no cover
from ._parser import Parser
from ._types import FeedFilter
from ._types import StorageType
from ._utils import MapFunction
from .core import Reader

Expand Down Expand Up @@ -288,23 +284,10 @@ class Pipeline:
"""

storage: StorageType
parser: Parser
hooks: UpdateHooks[Any]
now: Callable[[], datetime]
reader: Reader
map: MapFunction[Any, Any]
decider = Decider

@classmethod
def from_reader(cls, reader: Reader, map: MapFunction[Any, Any]) -> Pipeline:
return cls(
storage=reader._storage,
parser=reader._parser,
hooks=reader._update_hooks,
now=reader._now,
map=map,
)

def update(self, filter: FeedFilter) -> Iterable[UpdateResult]:
# global_now is used as first_updated_epoch for all new entries,
# so that the subset of new entries from an update appears before
Expand All @@ -318,7 +301,7 @@ def update(self, filter: FeedFilter) -> Iterable[UpdateResult]:
#
# However, added == last_updated for the first update.
#
global_now = self.now()
global_now = self.reader._now()

is_parallel = self.map is not map
process_parse_result = partial(self.process_parse_result, global_now)
Expand All @@ -338,16 +321,18 @@ def parser_process_feeds_for_update(
) -> Iterable[FeedForUpdate]:
for feed in feeds:
try:
yield self.parser.process_feed_for_update(feed)
yield self.reader._parser.process_feed_for_update(feed)
except ParseError as e:
parser_process_feeds_for_update_errors.append((feed, e))

# assemble pipeline
feeds_for_update = self.storage.get_feeds_for_update(filter)
feeds_for_update = self.reader._storage.get_feeds_for_update(filter)
# feeds_for_update = map(self.parser.process_feed_for_update, feeds_for_update)
feeds_for_update = parser_process_feeds_for_update(feeds_for_update)
feeds_for_update = map(self.decider.process_feed_for_update, feeds_for_update)
parse_results = self.parser.parallel(feeds_for_update, self.map, is_parallel)
parse_results = self.reader._parser.parallel(
feeds_for_update, self.map, is_parallel
)
parse_results = chain(parse_results, parser_process_feeds_for_update_errors)
update_results = starmap(process_parse_result, parse_results)

Expand All @@ -369,15 +354,15 @@ def process_parse_result(
result: ParsedFeed | None | ParseError,
) -> tuple[str, UpdatedFeed | None | Exception]:
make_intents = partial(
self.decider.make_intents, feed, self.now(), global_now, result
self.decider.make_intents, feed, self.reader._now(), global_now, result
)

try:
# assemble pipeline
entry_pairs = self.get_entry_pairs(result)

if result and not isinstance(result, Exception):
entry_pairs = self.parser.process_entry_pairs(
entry_pairs = self.reader._parser.process_entry_pairs(
feed.url, result.mime_type, entry_pairs
)
entry_pairs, get_total_count = count_consumed(entry_pairs)
Expand All @@ -402,7 +387,7 @@ def get_entry_pairs(self, result: ParsedFeed | None | ParseError) -> EntryPairs:

# give storage a chance to consume entries in a streaming fashion
entries1, entries2 = tee(result.entries)
entries_for_update = self.storage.get_entries_for_update(
entries_for_update = self.reader._storage.get_entries_for_update(
(e.feed_url, e.id) for e in entries1
)
return zip(entries2, entries_for_update, strict=True)
Expand All @@ -413,17 +398,18 @@ def update_feed(
entries: Iterable[EntryUpdateIntent],
) -> tuple[int, int]:
url = feed.url
hooks = self.reader._update_hooks

self.hooks.run('before_feed_update', (url,), url)
hooks.run('before_feed_update', (url,), url)

if entries:
self.storage.add_or_update_entries(entries)
self.storage.update_feed(feed)
self.reader._storage.add_or_update_entries(entries)
self.reader._storage.update_feed(feed)

# if feed_for_update.url != parsed_feed.feed.url, the feed was redirected.
# TODO: Maybe handle redirects somehow else (e.g. change URL if permanent).

hook_errors = self.hooks.group("got unexpected after-update hook errors")
hook_errors = hooks.group("got unexpected after-update hook errors")

new_count = 0
updated_count = 0
Expand Down
2 changes: 1 addition & 1 deletion src/reader/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ def update_feeds_iter(
self._update_hooks.run('before_feeds_update', None)

with make_map as map:
yield from Pipeline.from_reader(self, map).update(filter)
yield from Pipeline(self, map).update(filter)

if _call_feeds_update_hooks:
hook_errors = self._update_hooks.group(
Expand Down

0 comments on commit de66913

Please sign in to comment.