Skip to content

Commit

Permalink
Set update_after based on config tags. #332
Browse files Browse the repository at this point in the history
  • Loading branch information
lemon24 committed Jun 12, 2024
1 parent 13c9ccb commit c57558c
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/reader/_storage/_feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,12 @@ def set_feed_stale(self, url: str, stale: bool) -> None:

@wrap_exceptions()
def update_feed(self, intent: FeedUpdateIntent) -> None:
url, _, value = intent
url, _, _, value = intent

context: dict[str, Any] = {
'url': url,
'last_retrieved': adapt_datetime(intent.last_retrieved),
'update_after': adapt_datetime(intent.update_after),
}
expressions: list[str] = []

Expand Down
3 changes: 3 additions & 0 deletions src/reader/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ class FeedUpdateIntent(NamedTuple):
#: The time at the start of updating this feed.
last_retrieved: datetime

#: The earliest time the feed will next be updated.
update_after: datetime

#: One of:
#: feed data and metadata (the feed was updated),
#: None (the feed is unchanged)
Expand Down
92 changes: 88 additions & 4 deletions src/reader/_update.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import random
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime
Expand All @@ -12,6 +13,7 @@
from typing import NamedTuple
from typing import Optional
from typing import TYPE_CHECKING
from typing import TypedDict

from ._types import EntryData
from ._types import EntryForUpdate
Expand Down Expand Up @@ -59,6 +61,7 @@ class Decider:
old_feed: FeedForUpdate
now: datetime
global_now: datetime
config: Config
log: Any = log

@classmethod
Expand All @@ -85,13 +88,15 @@ def make_intents(
old_feed: FeedForUpdate,
now: datetime,
global_now: datetime,
config: Config,
parsed_feed: ParsedFeed | None | ParseError,
entry_pairs: EntryPairs,
) -> tuple[FeedUpdateIntent, Iterable[EntryUpdateIntent]]:
decider = cls(
old_feed,
now,
global_now,
config,
PrefixLogger(log, ["update feed %r" % old_feed.url]),
)
return decider.update(parsed_feed, entry_pairs)
Expand Down Expand Up @@ -250,16 +255,80 @@ def update(
value = self.get_feed_to_update(parsed_feed, bool(entries_to_update))

# We always return a FeedUpdateIntent because
# we always want to set last_retrieved and update_after (FIXME #332),
# we always want to set last_retrieved and update_after,
# and clear last_exception (if set before the update).

return FeedUpdateIntent(self.url, self.now, value), entries_to_update
update_after = next_update_after(self.global_now, **self.config)
return (
FeedUpdateIntent(self.url, self.now, update_after, value),
entries_to_update,
)


class UpdateReasons(NamedTuple):
hash_changed: int = 0


class Config(TypedDict):
interval: int
jitter: float


DEFAULT_CONFIG = Config(interval=3600, jitter=0)
CONFIG_KEY = 'update'


def flatten_config(config: Any, default: Config) -> Config:
rv = default.copy()

if not isinstance(config, dict):
log.warning(
"invalid update config, expected dict, got %s", type(config).__name__
)
return rv

set_number('inteval', config, rv, int) # type: ignore
set_number('jitter', config, rv, float, max=1) # type: ignore
return rv


def set_number(name, src, dst, type, min=0, max=float('inf')): # type: ignore
try:
value = src[name]
except KeyError:
return

try:
value = type(value)
except (TypeError, ValueError) as e:
log.warning("invalid update config .%s: %s", name, e)
return

if not (min <= value <= max):
log.warning(
"invalid update config .%s: must be between %s and %s: %s",
name,
min,
max,
value,
)
return

dst[name] = value


# start on a Monday, so weekly amounts of seconds line up
UPDATE_AFTER_START = datetime(1970, 1, 5)
EPOCH_OFFSET = (UPDATE_AFTER_START - datetime(1970, 1, 1)).total_seconds()


def next_update_after(now: datetime, interval: int, jitter: float = 0) -> datetime:
now_s = (now.replace(tzinfo=None) - UPDATE_AFTER_START).total_seconds()
rv_s = int((now_s // interval + 1 + random.random() * jitter) * interval)
rv = datetime.utcfromtimestamp(rv_s + EPOCH_OFFSET).replace(tzinfo=now.tzinfo)
return rv


@dataclass(frozen=True)
class Pipeline:
"""Update multiple feeds.
Expand Down Expand Up @@ -303,8 +372,12 @@ def update(self, filter: FeedFilter) -> Iterable[UpdateResult]:
#
global_now = self.reader._now()

config_key = self.reader.make_reader_reserved_name(CONFIG_KEY)
config = flatten_config(self.reader.get_tag((), config_key, {}), DEFAULT_CONFIG)

process_parse_result = partial(self.process_parse_result, global_now, config)

is_parallel = self.map is not map
process_parse_result = partial(self.process_parse_result, global_now)

# ಠ_ಠ
# The pipeline is not equipped to handle ParseErrors
Expand Down Expand Up @@ -350,11 +423,22 @@ def parser_process_feeds_for_update(
def process_parse_result(
self,
global_now: datetime,
config: Config,
feed: FeedForUpdate,
result: ParsedFeed | None | ParseError,
) -> tuple[str, UpdatedFeed | None | Exception]:
# TODO: don't duplicate code from update()
# TODO: the feed tag value should come from get_feeds_for_update()
config_key = self.reader.make_reader_reserved_name(CONFIG_KEY)
config = flatten_config(self.reader.get_tag(feed, config_key, {}), config)

make_intents = partial(
self.decider.make_intents, feed, self.reader._now(), global_now, result
self.decider.make_intents,
feed,
self.reader._now(),
global_now,
config,
result,
)

try:
Expand Down
29 changes: 28 additions & 1 deletion tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def test_update_feed_updated(reader, update_feed, caplog):
added=datetime(2010, 1, 1),
last_updated=datetime(2010, 1, 2),
last_retrieved=datetime(2010, 1, 2),
update_after=datetime(2010, 1, 2, 1),
)
assert set(reader.get_entries()) == {
entry_one.as_entry(
Expand All @@ -105,6 +106,7 @@ def test_update_feed_updated(reader, update_feed, caplog):
added=datetime(2010, 1, 1),
last_updated=datetime(2010, 1, 3),
last_retrieved=datetime(2010, 1, 3),
update_after=datetime(2010, 1, 3, 1),
)
assert set(reader.get_entries()) == {
entry_one.as_entry(
Expand Down Expand Up @@ -132,6 +134,7 @@ def test_update_feed_updated(reader, update_feed, caplog):
added=datetime(2010, 1, 1),
last_updated=datetime(2010, 1, 3, 12),
last_retrieved=datetime(2010, 1, 3, 12),
update_after=datetime(2010, 1, 3, 13),
)
assert reader.get_feed(feed) == feed
assert "feed hash changed, treating as updated" in caplog.text
Expand All @@ -151,6 +154,7 @@ def test_update_feed_updated(reader, update_feed, caplog):
updated=datetime(2009, 1, 1),
last_updated=datetime(2010, 1, 4),
last_retrieved=datetime(2010, 1, 4),
update_after=datetime(2010, 1, 4, 1),
)
assert set(reader.get_entries()) == {
entry_one.as_entry(
Expand Down Expand Up @@ -187,6 +191,7 @@ def test_update_feed_updated(reader, update_feed, caplog):
last_updated=datetime(2010, 1, 4),
# changes always
last_retrieved=datetime(2010, 1, 4, 12),
update_after=datetime(2010, 1, 4, 13),
)
assert set(reader.get_entries()) == {
entry_one.as_entry(
Expand Down Expand Up @@ -217,6 +222,7 @@ def test_update_feed_updated(reader, update_feed, caplog):
added=datetime(2010, 1, 1),
last_updated=datetime(2010, 1, 5),
last_retrieved=datetime(2010, 1, 5),
update_after=datetime(2010, 1, 5, 1),
)

with caplog.at_level(logging.DEBUG, logger='reader'):
Expand Down Expand Up @@ -271,6 +277,7 @@ def test_update_entry_updated(reader, update_feed, caplog, monkeypatch):
added=datetime(2010, 2, 1),
last_updated=datetime(2010, 2, 2),
last_retrieved=datetime(2010, 2, 2),
update_after=datetime(2010, 2, 2, 1),
)

assert set(reader.get_entries()) == {
Expand All @@ -295,6 +302,7 @@ def test_update_entry_updated(reader, update_feed, caplog, monkeypatch):
updated=datetime(2010, 1, 1),
last_updated=datetime(2010, 2, 2),
last_retrieved=datetime(2010, 2, 3),
update_after=datetime(2010, 2, 3, 1),
)
assert set(reader.get_entries()) == {
old_entry.as_entry(
Expand All @@ -320,6 +328,7 @@ def test_update_entry_updated(reader, update_feed, caplog, monkeypatch):
added=datetime(2010, 2, 1),
last_updated=datetime(2010, 2, 3, 12),
last_retrieved=datetime(2010, 2, 3, 12),
update_after=datetime(2010, 2, 3, 13),
)
assert set(reader.get_entries()) == {
new_entry.as_entry(
Expand All @@ -344,6 +353,7 @@ def test_update_entry_updated(reader, update_feed, caplog, monkeypatch):
added=datetime(2010, 2, 1),
last_updated=datetime(2010, 2, 4),
last_retrieved=datetime(2010, 2, 4),
update_after=datetime(2010, 2, 4, 1),
)
assert set(reader.get_entries()) == {
new_entry.as_entry(
Expand Down Expand Up @@ -392,6 +402,7 @@ def test_update_no_updated(reader, chunk_size, update_feed):
added=datetime(2010, 1, 1),
last_updated=datetime(2010, 1, 1),
last_retrieved=datetime(2010, 1, 1),
update_after=datetime(2010, 1, 1, 1),
)

assert set(reader.get_feeds()) == {feed}
Expand All @@ -413,6 +424,7 @@ def test_update_no_updated(reader, chunk_size, update_feed):
added=datetime(2010, 1, 1),
last_updated=datetime(2010, 1, 2),
last_retrieved=datetime(2010, 1, 2),
update_after=datetime(2010, 1, 2, 1),
)

assert set(reader.get_feeds()) == {feed}
Expand Down Expand Up @@ -525,6 +537,7 @@ def test_update_new(reader):
added=datetime(2010, 1, 1, 12),
last_updated=datetime(2010, 1, 2),
last_retrieved=datetime(2010, 1, 2),
update_after=datetime(2010, 1, 2, 1),
)
assert len(set(reader.get_feeds())) == 2
assert set(reader.get_entries()) == {
Expand All @@ -542,6 +555,7 @@ def test_update_new(reader):
added=datetime(2010, 1, 1),
last_updated=datetime(2010, 1, 3),
last_retrieved=datetime(2010, 1, 3),
update_after=datetime(2010, 1, 3, 1),
)
assert len(set(reader.get_feeds())) == 2
assert set(reader.get_entries()) == {
Expand All @@ -551,7 +565,10 @@ def test_update_new(reader):
last_updated=datetime(2010, 1, 3),
),
entry_two.as_entry(
feed=two._replace(last_retrieved=datetime(2010, 1, 3)),
feed=two._replace(
last_retrieved=datetime(2010, 1, 3),
update_after=datetime(2010, 1, 3, 1),
),
added=datetime(2010, 1, 2),
last_updated=datetime(2010, 1, 2),
),
Expand Down Expand Up @@ -989,11 +1006,13 @@ def test_update_feed(reader, feed_arg):
added=datetime(2010, 1, 1),
last_updated=datetime(2010, 1, 1),
last_retrieved=datetime(2010, 1, 1),
update_after=datetime(2010, 1, 1, 1),
)
two = two.as_feed(
added=datetime(2010, 1, 1),
last_updated=datetime(2010, 1, 1),
last_retrieved=datetime(2010, 1, 1),
update_after=datetime(2010, 1, 1, 1),
)

reader.add_feed(one.url)
Expand Down Expand Up @@ -1316,11 +1335,13 @@ def test_add_remove_get_feeds(reader, feed_arg):
added=datetime(2010, 1, 1),
last_updated=datetime(2010, 1, 2),
last_retrieved=datetime(2010, 1, 2),
update_after=datetime(2010, 1, 2, 1),
)
two = two.as_feed(
added=datetime(2010, 1, 1),
last_updated=datetime(2010, 1, 2),
last_retrieved=datetime(2010, 1, 2),
update_after=datetime(2010, 1, 2, 1),
)
entry_one = entry_one.as_entry(
feed=one, added=datetime(2010, 1, 2), last_updated=datetime(2010, 1, 2)
Expand Down Expand Up @@ -1501,6 +1522,7 @@ def test_data_roundtrip(reader):
added=datetime(2010, 1, 2),
last_updated=datetime(2010, 1, 3),
last_retrieved=datetime(2010, 1, 3),
update_after=datetime(2010, 1, 3, 1),
),
added=datetime(2010, 1, 3),
last_updated=datetime(2010, 1, 3),
Expand Down Expand Up @@ -1576,6 +1598,7 @@ def test_get_entry(reader, entry_arg):
added=datetime(2010, 1, 2),
last_updated=datetime(2010, 1, 3),
last_retrieved=datetime(2010, 1, 3),
update_after=datetime(2010, 1, 3, 1),
),
added=datetime(2010, 1, 3),
last_updated=datetime(2010, 1, 3),
Expand Down Expand Up @@ -1749,6 +1772,7 @@ def test_change_feed_url_feed(reader):
last_updated=None,
last_exception=None,
last_retrieved=None,
update_after=None,
)


Expand Down Expand Up @@ -1830,6 +1854,7 @@ def test_change_feed_url_second_update(reader, new_feed_url):
updated=None,
last_updated=None,
last_retrieved=None,
update_after=None,
)

reader._parser.feed(
Expand All @@ -1850,6 +1875,7 @@ def test_change_feed_url_second_update(reader, new_feed_url):
updated=datetime(2010, 1, 2),
last_updated=datetime(2010, 1, 3),
last_retrieved=datetime(2010, 1, 3),
update_after=datetime(2010, 1, 3, 1),
title='new title',
author='new author',
link='new link',
Expand Down Expand Up @@ -2492,6 +2518,7 @@ def test_add_entry(reader):
added=datetime(2010, 1, 1),
last_updated=datetime(2010, 1, 3),
last_retrieved=datetime(2010, 1, 3),
update_after=datetime(2010, 1, 3, 1),
),
)

Expand Down
1 change: 1 addition & 0 deletions tests/test_reader_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class datetime_mock(datetime):
added=utc_datetime(2010, 1, 1),
last_updated=utc_datetime(2010, 1, 2),
last_retrieved=utc_datetime(2010, 1, 2),
update_after=utc_datetime(2010, 1, 2, 1),
)

assert feed == expected_feed
Expand Down
Loading

0 comments on commit c57558c

Please sign in to comment.