Skip to content

Commit

Permalink
migrator: convert "op" to OperationType enum
Browse files Browse the repository at this point in the history
  • Loading branch information
slint committed Aug 16, 2023
1 parent 71e29f9 commit 3e5e608
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 0 deletions.
4 changes: 4 additions & 0 deletions migrator/tests/extract/test_kafka_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from unittest.mock import PropertyMock

from invenio_rdm_migrator.extract import Tx
from invenio_rdm_migrator.load.postgresql.transactions.operations import OperationType

from zenodo_rdm_migrator.extract import KafkaExtract, KafkaExtractEnd

Expand Down Expand Up @@ -104,6 +105,9 @@ def _assert_result(
):
assert len(result) == count
assert all(isinstance(t, Tx) for t in result)
assert all(
all(isinstance(o["op"], OperationType) for o in t.operations) for t in result
)
tx_dict = {t.id: t for t in result}
tx_ids = list(tx_dict.keys())
assert tx_ids == sorted(tx_ids)
Expand Down
3 changes: 3 additions & 0 deletions migrator/zenodo_rdm_migrator/extract/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import dictdiffer
from invenio_rdm_migrator.extract import Extract, Tx
from invenio_rdm_migrator.load.postgresql.transactions.operations import OperationType
from invenio_rdm_migrator.logging import Logger
from kafka import KafkaConsumer, TopicPartition
from sortedcontainers import SortedDict, SortedList
Expand Down Expand Up @@ -53,6 +54,8 @@ def info(self, val):

def append(self, op):
"""Add a single table row operation to the transaction state."""
# Convert the "op" key to an enum
op["op"] = OperationType(op["op"].upper())
self.ops.add(op)

# Update table row counts with the operations so far
Expand Down

0 comments on commit 3e5e608

Please sign in to comment.