diff --git a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala index f2c76da12..a3db45390 100644 --- a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala @@ -5,7 +5,6 @@ package akka.kafka.internal -import java.io.{PrintWriter, StringWriter} import java.util import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger @@ -137,6 +136,7 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w var commitRequestedOffsets = Map.empty[TopicPartition, Long] var committedOffsets = Map.empty[TopicPartition, Long] var commitRefreshDeadline: Option[Deadline] = None + var initialPoll = true var wakeups = 0 var stopInProgress = false var delayedPollInFlight = false @@ -321,8 +321,8 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w def poll(): Unit = { val wakeupTask = context.system.scheduler.scheduleOnce(settings.wakeupTimeout) { log.warning( - "KafkaConsumer poll has exceeded wake up timeout ({}ms). Waking up consumer to avoid thread starvation.", - settings.wakeupTimeout.toMillis + "KafkaConsumer poll has exceeded wake up timeout ({}). Waking up consumer to avoid thread starvation.", + settings.wakeupTimeout.toCoarsest ) if (settings.wakeupDebug && wakeups > settings.maxWakeups / 2) { val stacks = @@ -346,23 +346,36 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w def tryPoll(timeout: Long): ConsumerRecords[K, V] = try { val records = consumer.poll(timeout) + initialPoll = false wakeups = 0 records } catch { case w: WakeupException => wakeups = wakeups + 1 if (wakeups == settings.maxWakeups) { - log.error("WakeupException limit exceeded, stopping.") + log.error( + "WakeupException limit exceeded during poll({}), stopping (max-wakeups = {}, wakeup-timeout = {}).", + timeout, + settings.maxWakeups, + settings.wakeupTimeout.toCoarsest + ) context.stop(self) } else { - if (log.isWarningEnabled) { - val sw = new StringWriter - w.printStackTrace(new PrintWriter(sw)) + if (log.isWarningEnabled && wakeups > settings.maxWakeups / 2) { log.warning( - s"Consumer interrupted with WakeupException after timeout. Message: ${w.getMessage}. " + - s"Current value of akka.kafka.consumer.wakeup-timeout is ${settings.wakeupTimeout}. Exception: {}", - sw.toString + "Consumer poll({}) interrupted with WakeupException (#{} of max-wakeups = {}, wakeup-timeout = {}).", + timeout, + wakeups, + settings.maxWakeups, + settings.wakeupTimeout.toCoarsest ) + if (initialPoll) { + log.error( + "Initial consumer poll({}) with bootstrap servers {} did not succeed, still trying", + timeout, + settings.properties.getOrElse(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "not set") + ) + } } // If the current consumer is using group assignment (i.e. subscriptions is non empty) the wakeup might @@ -477,9 +490,9 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w new OffsetCommitCallback { override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = { // this is invoked on the thread calling consumer.poll which will always be the actor, so it is safe - val duration = FiniteDuration(System.nanoTime() - startTime, NANOSECONDS) - if (duration > settings.commitTimeWarning) { - log.warning("Kafka commit took longer than `commit-time-warning`: {} ms", duration.toMillis) + val duration = System.nanoTime() - startTime + if (duration > settings.commitTimeWarning.toNanos) { + log.warning("Kafka commit took longer than `commit-time-warning`: {} ms", duration / 1000000L) } commitsInProgress -= 1 if (exception != null) reply ! Status.Failure(exception)