From 9c3bd03327c1a52b4f33af97219833dda042e204 Mon Sep 17 00:00:00 2001 From: Jonathan Poholarz Date: Mon, 8 Mar 2021 12:50:05 -0600 Subject: [PATCH 1/3] Added event emit Meter and kafka max lag Gauge --- .../trident/KafkaTridentSpoutEmitter.java | 105 +++++++++++++----- .../KafkaTridentSpoutEmitterEmitTest.java | 23 +++- 2 files changed, 100 insertions(+), 28 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java index 69df4618d2f..961e1590af8 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java @@ -25,6 +25,8 @@ import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_TIMESTAMP; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; import com.google.common.annotations.VisibleForTesting; import java.io.Serializable; import java.util.ArrayList; @@ -38,10 +40,13 @@ import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.FirstPollOffsetStrategy; import org.apache.storm.kafka.spout.RecordTranslator; @@ -60,6 +65,14 @@ public class KafkaTridentSpoutEmitter implements Serializable { private static final long serialVersionUID = -7343927794834130435L; private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class); + // Metrics + public static final String UNDERSCORE = "_"; + public static final String INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC = "records-lag-max"; + public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag"; + protected transient Gauge kafkaClientMaxLag; + public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate"; + protected transient Meter eventEmitRate; + // Kafka private final Consumer consumer; private final KafkaTridentSpoutConfig kafkaSpoutConfig; @@ -87,7 +100,7 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig kafkaSpoutConfig, @VisibleForTesting KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig kafkaSpoutConfig, TopologyContext topologyContext, - ConsumerFactory consumerFactory, TopicAssigner topicAssigner) { + ConsumerFactory consumerFactory, TopicAssigner topicAssigner) { this.kafkaSpoutConfig = kafkaSpoutConfig; this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps()); this.topologyContext = topologyContext; @@ -97,13 +110,48 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig kafkaSpoutConfig, this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); this.startTimeStamp = kafkaSpoutConfig.getStartTimeStamp(); LOG.debug("Created {}", this.toString()); + + registerMetric(); + } + + /** + * Acquires metric instances through registration with the TopologyContext + */ + private void registerMetric() { + LOG.info("Registering Spout Metrics"); + + String configGroupId = ""; + if (kafkaSpoutConfig.getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG) != null) { + configGroupId = kafkaSpoutConfig.getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG).toString() + UNDERSCORE; + } + + eventEmitRate = topologyContext.registerMeter( + configGroupId + EVENT_EMIT_METRIC_NAME); + kafkaClientMaxLag = topologyContext.registerGauge( + configGroupId + KAFKA_CLIENT_MAX_LAG_METRIC_NAME, + new Gauge() { + @Override + public Double getValue() { + if (consumer == null) { + return 0.0; + } + // Extract spout lag from consumer's internal metrics + for (Map.Entry metricKeyVal : consumer.metrics().entrySet()) { + Metric metric = metricKeyVal.getValue(); + if (metric.metricName().name().equals(INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC)) { + return metric.value(); + } + } + return 0.0; + } + }); } /** * Emit a batch that has already been emitted. */ public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collector, - KafkaTridentSpoutTopicPartition currBatchPartition, Map currBatch) { + KafkaTridentSpoutTopicPartition currBatchPartition, Map currBatch) { final TopicPartition currBatchTp = currBatchPartition.getTopicPartition(); @@ -115,12 +163,12 @@ public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collect if (!topologyContext.getStormId().equals(currBatchMeta.getTopologyId()) && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) { LOG.debug("Skipping re-emit of batch that was originally emitted by another topology," - + " because the current first poll offset strategy ignores committed offsets."); + + " because the current first poll offset strategy ignores committed offsets."); return; } LOG.debug("Re-emitting batch: [transaction= {}], [currBatchPartition = {}], [currBatchMetadata = {}], [collector = {}]", - tx, currBatchPartition, currBatch, collector); + tx, currBatchPartition, currBatch, collector); try { // pause other topic-partitions to only poll from current topic-partition @@ -129,9 +177,9 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) { long seekOffset = currBatchMeta.getFirstOffset(); if (seekOffset < 0 && currBatchMeta.getFirstOffset() == currBatchMeta.getLastOffset()) { LOG.debug("Skipping re-emit of batch with negative starting offset." - + " The spout may set a negative starting offset for an empty batch that occurs at the start of a partition." - + " It is not expected that Trident will replay such an empty batch," - + " but this guard is here in case it tries to do so. See STORM-2990, STORM-3279 for context."); + + " The spout may set a negative starting offset for an empty batch that occurs at the start of a partition." + + " It is not expected that Trident will replay such an empty batch," + + " but this guard is here in case it tries to do so. See STORM-2990, STORM-3279 for context."); return; } LOG.debug("Seeking to offset [{}] for topic partition [{}]", seekOffset, currBatchTp); @@ -146,9 +194,9 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) { } if (record.offset() > currBatchMeta.getLastOffset()) { throw new RuntimeException(String.format("Error when re-emitting batch. Overshot the end of the batch." - + " The batch end offset was [{%d}], but received [{%d}]." - + " Ensure log compaction is disabled in Kafka, since it is incompatible with non-opaque transactional spouts.", - currBatchMeta.getLastOffset(), record.offset())); + + " The batch end offset was [{%d}], but received [{%d}]." + + " Ensure log compaction is disabled in Kafka, since it is incompatible with non-opaque transactional spouts.", + currBatchMeta.getLastOffset(), record.offset())); } emitTuple(collector, record); } @@ -157,17 +205,17 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) { LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions); } LOG.debug("Re-emitted batch: [transaction = {}], [currBatchPartition = {}], [currBatchMetadata = {}], " - + "[collector = {}]", tx, currBatchPartition, currBatchMeta, collector); + + "[collector = {}]", tx, currBatchPartition, currBatchMeta, collector); } /** * Emit a new batch. */ public Map emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, - KafkaTridentSpoutTopicPartition currBatchPartition, Map lastBatch) { + KafkaTridentSpoutTopicPartition currBatchPartition, Map lastBatch) { LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]", - tx, currBatchPartition, lastBatch, collector); + tx, currBatchPartition, lastBatch, collector); final TopicPartition currBatchTp = currBatchPartition.getTopicPartition(); @@ -208,28 +256,31 @@ public Map emitPartitionBatchNew(TransactionAttempt tx, TridentC LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions); } LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " - + "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector); + + "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector); return currentBatch.toMap(); } private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() { return firstPollOffsetStrategy == FirstPollOffsetStrategy.EARLIEST - || firstPollOffsetStrategy == FirstPollOffsetStrategy.LATEST; + || firstPollOffsetStrategy == FirstPollOffsetStrategy.LATEST; } private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) { final Set assignments = consumer.assignment(); if (!assignments.contains(currBatchTp)) { throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned." - + " This indicates a bug in the TopicFilter or ManualPartitioner implementations." - + " The current partition is [" + currBatchTp + "], the assigned partitions are [" + assignments + "]."); + + " This indicates a bug in the TopicFilter or ManualPartitioner implementations." + + " The current partition is [" + currBatchTp + "], the assigned partitions are [" + assignments + "]."); } } private void emitTuple(TridentCollector collector, ConsumerRecord record) { final List tuple = translator.apply(record); collector.emit(tuple); + + // Track the number of records emitted + eventEmitRate.mark(tuple.size()); LOG.debug("Emitted tuple {} for record [{}]", tuple, record); } @@ -247,8 +298,8 @@ private void emitTuple(TridentCollector collector, ConsumerRecord record) */ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) { if (isFirstPollSinceExecutorStarted(tp)) { - boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null - || !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId()); + boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null + || !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId()); if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) { LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp); consumer.seekToBeginning(Collections.singleton(tp)); @@ -269,7 +320,7 @@ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMet consumer.seekToEnd(Collections.singleton(tp)); } else if (firstPollOffsetStrategy == UNCOMMITTED_TIMESTAMP) { LOG.debug("First poll for topic partition [{}] with no last batch metadata, " - + "seeking to partition based on startTimeStamp", tp); + + "seeking to partition based on startTimeStamp", tp); seekOffsetByStartTimeStamp(tp); } tpToFirstSeekOffset.put(tp, consumer.position(tp)); @@ -284,7 +335,7 @@ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMet long initialFetchOffset = tpToFirstSeekOffset.get(tp); consumer.seek(tp, initialFetchOffset); LOG.debug("First poll for topic partition [{}], no last batch metadata present." - + " Using stored initial fetch offset [{}]", tp, initialFetchOffset); + + " Using stored initial fetch offset [{}]", tp, initialFetchOffset); } final long fetchOffset = consumer.position(tp); @@ -327,7 +378,7 @@ public List getOrderedPartitions(final List allPartitions = newKafkaTridentSpoutTopicPartitions(sortedPartitions); LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ", - allPartitions, topologyContext.getThisTaskIndex(), getNumTasks()); + allPartitions, topologyContext.getThisTaskIndex(), getNumTasks()); return allPartitions; } @@ -335,7 +386,7 @@ public List getOrderedPartitions(final List getPartitionsForTask(int taskId, int numTasks, - List allPartitionInfoSorted) { + List allPartitionInfoSorted) { List tps = allPartitionInfoSorted.stream() .map(kttp -> kttp.getTopicPartition()) .collect(Collectors.toList()); @@ -377,8 +428,8 @@ public void close() { @Override public final String toString() { return super.toString() - + "{kafkaSpoutConfig=" + kafkaSpoutConfig - + '}'; + + "{kafkaSpoutConfig=" + kafkaSpoutConfig + + '}'; } /** @@ -389,13 +440,13 @@ private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceLi @Override public void onPartitionsRevoked(Collection partitions) { LOG.info("Partitions revoked. [consumer={}, topic-partitions={}]", - consumer, partitions); + consumer, partitions); } @Override public void onPartitionsAssigned(Collection partitions) { LOG.info("Partitions reassignment. [consumer={}, topic-partitions={}]", - consumer, partitions); + consumer, partitions); } } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java index c18289e0904..a1ce86831ba 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java @@ -17,6 +17,7 @@ package org.apache.storm.kafka.spout.trident; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.mock; @@ -30,6 +31,8 @@ import java.util.List; import java.util.Map; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -82,6 +85,8 @@ public class KafkaTridentSpoutEmitterEmitTest { @BeforeEach public void setUp() { when(topologyContextMock.getStormId()).thenReturn(topologyId); + when(topologyContextMock.registerMeter(KafkaTridentSpoutEmitter.EVENT_COUNT_METRIC_NAME)) + .thenReturn(new Meter()); consumer.assign(Collections.singleton(partition)); consumer.updateBeginningOffsets(Collections.singletonMap(partition, firstOffsetInKafka)); consumer.updateEndOffsets(Collections.singletonMap(partition, firstOffsetInKafka + recordsInKafka)); @@ -185,6 +190,8 @@ public void testEmitEmptyBatches() throws Exception { KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(lastBatchMeta); assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstNewRecordOffset)); assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstNewRecordOffset + numNewRecords - 1)); + + assertEquals(numNewRecords, emitter.eventEmitRate.getCount()); } @Test @@ -202,6 +209,8 @@ public void testReEmitBatch() { List> emits = emitCaptor.getAllValues(); assertThat(emits.get(0).get(0), is(firstEmittedOffset)); assertThat(emits.get(emits.size() - 1).get(0), is(firstEmittedOffset + numEmittedRecords - 1)); + + assertEquals(10, emitter.eventEmitRate.getCount()); } @Test @@ -216,6 +225,8 @@ public void testReEmitBatchForOldTopologyWhenIgnoringCommittedOffsets() { emitter.reEmitPartitionBatch(txid, collectorMock, kttp, batchMeta.toMap()); verify(collectorMock, never()).emit(anyList()); + + assertEquals(0, emitter.eventEmitRate.getCount()); } @Test @@ -249,6 +260,8 @@ public void testEmitEmptyFirstBatch() { deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta); assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstNewRecordOffset)); assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstNewRecordOffset + numNewRecords - 1)); + + assertEquals(numNewRecords, emitter.eventEmitRate.getCount()); } @ParameterizedTest @@ -276,6 +289,8 @@ public void testUnconditionalStrategyWhenSpoutWorkerIsRestarted(FirstPollOffsetS KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta); assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstEmittedOffset)); assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(lastOffsetInKafka)); + + assertEquals(emittedRecords, emitter.eventEmitRate.getCount()); } @Test @@ -298,6 +313,8 @@ public void testEarliestStrategyWhenTopologyIsRedeployed() { KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta); assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffsetInKafka)); assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(lastOffsetInKafka)); + + assertEquals(recordsInKafka, emitter.eventEmitRate.getCount()); } @Test @@ -314,6 +331,8 @@ public void testLatestStrategyWhenTopologyIsRedeployed() { Map meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap()); verify(collectorMock, never()).emit(anyList()); + + assertEquals(0, emitter.eventEmitRate.getCount()); } @Test @@ -327,7 +346,7 @@ public void testTimeStampStrategyWhenTopologyIsRedeployed() { long timeStampStartOffset = 2L; long pollTimeout = 1L; KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + preRestartEmittedRecords - 1, "Some older topology"); - + KafkaConsumer kafkaConsumer = Mockito.mock(KafkaConsumer.class); when(kafkaConsumer.assignment()).thenReturn(Collections.singleton(partition)); OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(timeStampStartOffset, startTimeStamp); @@ -350,6 +369,8 @@ public void testTimeStampStrategyWhenTopologyIsRedeployed() { assertThat(emits.get(0).get(0), is(timeStampStartOffset)); KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta); assertThat("The batch should start at the first offset for startTimestamp", deserializedMeta.getFirstOffset(), is(timeStampStartOffset)); + + assertEquals(recordsInKafka, emitter.eventEmitRate.getCount()); } } From 07bd08e9d8c1b3e58d204d35a8c343ce6ae08ff1 Mon Sep 17 00:00:00 2001 From: Jonathan Poholarz Date: Mon, 8 Mar 2021 14:01:40 -0600 Subject: [PATCH 2/3] Fixed missing period and line length --- .../trident/KafkaTridentSpoutEmitter.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java index 961e1590af8..0fdf0bd5ea4 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java @@ -115,7 +115,7 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig kafkaSpoutConfig, } /** - * Acquires metric instances through registration with the TopologyContext + * Acquires metric instances through registration with the TopologyContext. */ private void registerMetric() { LOG.info("Registering Spout Metrics"); @@ -193,10 +193,12 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) { break; } if (record.offset() > currBatchMeta.getLastOffset()) { - throw new RuntimeException(String.format("Error when re-emitting batch. Overshot the end of the batch." - + " The batch end offset was [{%d}], but received [{%d}]." - + " Ensure log compaction is disabled in Kafka, since it is incompatible with non-opaque transactional spouts.", - currBatchMeta.getLastOffset(), record.offset())); + throw new RuntimeException(String.format( + "Error when re-emitting batch. Overshot the end of the batch." + + " The batch end offset was [{%d}], but received [{%d}]." + + " Ensure log compaction is disabled in Kafka, since it is" + + " incompatible with non-opaque transactional spouts.", + currBatchMeta.getLastOffset(), record.offset())); } emitTuple(collector, record); } @@ -269,9 +271,10 @@ private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() { private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) { final Set assignments = consumer.assignment(); if (!assignments.contains(currBatchTp)) { - throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned." - + " This indicates a bug in the TopicFilter or ManualPartitioner implementations." - + " The current partition is [" + currBatchTp + "], the assigned partitions are [" + assignments + "]."); + throw new IllegalStateException( + "The spout is asked to emit tuples on a partition it is not assigned." + + " This indicates a bug in the TopicFilter or ManualPartitioner implementations." + + " The current partition is [" + currBatchTp + "], the assigned partitions are [" + assignments + "]."); } } From 449ded9a096e64d88b856db3899d199278623f2e Mon Sep 17 00:00:00 2001 From: Jonathan Poholarz Date: Mon, 8 Mar 2021 14:42:25 -0600 Subject: [PATCH 3/3] Fixed variable name typo --- .../kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java index a1ce86831ba..0ed363c8998 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java @@ -85,7 +85,7 @@ public class KafkaTridentSpoutEmitterEmitTest { @BeforeEach public void setUp() { when(topologyContextMock.getStormId()).thenReturn(topologyId); - when(topologyContextMock.registerMeter(KafkaTridentSpoutEmitter.EVENT_COUNT_METRIC_NAME)) + when(topologyContextMock.registerMeter(KafkaTridentSpoutEmitter.EVENT_EMIT_METRIC_NAME)) .thenReturn(new Meter()); consumer.assign(Collections.singleton(partition)); consumer.updateBeginningOffsets(Collections.singletonMap(partition, firstOffsetInKafka));