diff --git a/migrator/tests/extract/test_kafka_extract.py b/migrator/tests/extract/test_kafka_extract.py index 37189d95..06f7a7e4 100644 --- a/migrator/tests/extract/test_kafka_extract.py +++ b/migrator/tests/extract/test_kafka_extract.py @@ -15,6 +15,7 @@ from unittest.mock import PropertyMock from invenio_rdm_migrator.extract import Tx +from invenio_rdm_migrator.load.postgresql.transactions.operations import OperationType from zenodo_rdm_migrator.extract import KafkaExtract, KafkaExtractEnd @@ -104,6 +105,9 @@ def _assert_result( ): assert len(result) == count assert all(isinstance(t, Tx) for t in result) + assert all( + all(isinstance(o["op"], OperationType) for o in t.operations) for t in result + ) tx_dict = {t.id: t for t in result} tx_ids = list(tx_dict.keys()) assert tx_ids == sorted(tx_ids) diff --git a/migrator/zenodo_rdm_migrator/extract/kafka.py b/migrator/zenodo_rdm_migrator/extract/kafka.py index 95d883da..afb7c044 100644 --- a/migrator/zenodo_rdm_migrator/extract/kafka.py +++ b/migrator/zenodo_rdm_migrator/extract/kafka.py @@ -15,6 +15,7 @@ import dictdiffer from invenio_rdm_migrator.extract import Extract, Tx +from invenio_rdm_migrator.load.postgresql.transactions.operations import OperationType from invenio_rdm_migrator.logging import Logger from kafka import KafkaConsumer, TopicPartition from sortedcontainers import SortedDict, SortedList @@ -53,6 +54,8 @@ def info(self, val): def append(self, op): """Add a single table row operation to the transaction state.""" + # Convert the "op" key to an enum + op["op"] = OperationType(op["op"].upper()) self.ops.add(op) # Update table row counts with the operations so far