Skip to content

Commit

Permalink
migrator: flexible Kafka offset start
Browse files Browse the repository at this point in the history
  • Loading branch information
slint committed Aug 15, 2023
1 parent 318a4bd commit 71e29f9
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions migrator/zenodo_rdm_migrator/extract/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class KafkaExtract(Extract):
ops_topic="zenodo-migration.public",
tx_topic="zenodo-migration.postgres_transaction",
last_tx=563385187,
from_ts=datetime.utcnow() - timedelta(minutes=5),
offset=datetime.utcnow() - timedelta(minutes=5),
config={
"bootstrap_servers": [
"kafka01.server",
Expand All @@ -109,8 +109,9 @@ class KafkaExtract(Extract):
consumer in one iteration.
:param max_ops_fetch: Max number of operation messages to fetch from the consumer
in one iteration.
:param from_ts: Offset timestamp from which to start consuming messages from. If
not passed we start consuming from the earliest possible offset.
:param offset: Offset timestamp from which to start consuming messages from. Can be
either a datetime, or the strings "earliest"/"latest" to start from the
beginning/end of the topic.
:param remove_unchanged_fields: If ``True``, removes unchanged fields for UPDATEs.
:param _dump_dir: Path to dump consumed message to (useful for tests).
"""
Expand All @@ -136,16 +137,18 @@ def __init__(
max_tx_info_fetch=200,
max_ops_fetch=2000,
config=None,
from_ts=None,
offset="earliest",
remove_unchanged_fields=True,
_dump_dir=None,
):
"""Constructor."""
self.ops_topic = ops_topic
self.tx_topic = tx_topic
if isinstance(from_ts, datetime):
from_ts = int(from_ts.timestamp() * 1000)
self.from_ts = from_ts
if isinstance(offset, datetime):
offset = int(offset.timestamp() * 1000)
if isinstance(offset, str):
assert offset in ("earliest", "latest")
self.offset = offset
assert last_tx is not None, "`last_tx` is required."
self.last_tx = last_tx
self.config = config or {}
Expand All @@ -168,19 +171,21 @@ def _dump_msg(self, topic, msg):
with outpath.open("a") as fout:
fout.write(json.dumps(msg._asdict()) + "\n")

def _seek_dt_offsets(self, consumer, topic, ts=None):
def _seek_offsets(self, consumer, topic, target_offset="earliest"):
"""Seek/set offsets to the ."""
partitions = {
TopicPartition(topic, p): None for p in consumer.partitions_for_topic(topic)
}
consumer.assign(partitions)
# If we have a target timestamp to start from, use it...
if ts:
offsets = consumer.offsets_for_times({p: ts for p in partitions})
if isinstance(target_offset, int):
offsets = consumer.offsets_for_times({p: target_offset for p in partitions})
partitions.update({p: o for p, (o, _) in offsets.items()})
else:
# ...else start from the very beginning of the topic partitions
partitions = consumer.beginning_offsets(partitions)
elif isinstance(target_offset, str):
if target_offset == "earliest":
partitions = consumer.beginning_offsets(partitions)
elif target_offset == "latest":
partitions = consumer.end_offsets(partitions)

for partition, offset in partitions.items():
consumer.seek(partition, offset)
Expand All @@ -202,10 +207,10 @@ def _get_consumer(self, topic, group_id):
**self.config,
)
if topic not in self._topic_states:
self._topic_states[topic] = self._seek_dt_offsets(
self._topic_states[topic] = self._seek_offsets(
consumer,
topic,
ts=self.from_ts,
target_offset=self.offset,
)
else:
self._seek_committed_offsets(consumer, topic)
Expand Down

0 comments on commit 71e29f9

Please sign in to comment.