Skip to content

Commit

Permalink
Improve wakeup logging (#524)
Browse files Browse the repository at this point in the history
* Delay logging to `maxWakeups / 2`
* Don't log stack trace (but be clear about it happening during poll)
* Log on error level if initial poll fails more than `maxWakeups / 2`, printing bootstrap servers
* Avoid allocation of `FiniteDuration` after committing
  • Loading branch information
ennru authored Jun 26, 2018
1 parent 44f0348 commit 2ace54d
Showing 1 changed file with 26 additions and 13 deletions.
39 changes: 26 additions & 13 deletions core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package akka.kafka.internal

import java.io.{PrintWriter, StringWriter}
import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -137,6 +136,7 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w
var commitRequestedOffsets = Map.empty[TopicPartition, Long]
var committedOffsets = Map.empty[TopicPartition, Long]
var commitRefreshDeadline: Option[Deadline] = None
var initialPoll = true
var wakeups = 0
var stopInProgress = false
var delayedPollInFlight = false
Expand Down Expand Up @@ -321,8 +321,8 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w
def poll(): Unit = {
val wakeupTask = context.system.scheduler.scheduleOnce(settings.wakeupTimeout) {
log.warning(
"KafkaConsumer poll has exceeded wake up timeout ({}ms). Waking up consumer to avoid thread starvation.",
settings.wakeupTimeout.toMillis
"KafkaConsumer poll has exceeded wake up timeout ({}). Waking up consumer to avoid thread starvation.",
settings.wakeupTimeout.toCoarsest
)
if (settings.wakeupDebug && wakeups > settings.maxWakeups / 2) {
val stacks =
Expand All @@ -346,23 +346,36 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w
def tryPoll(timeout: Long): ConsumerRecords[K, V] =
try {
val records = consumer.poll(timeout)
initialPoll = false
wakeups = 0
records
} catch {
case w: WakeupException =>
wakeups = wakeups + 1
if (wakeups == settings.maxWakeups) {
log.error("WakeupException limit exceeded, stopping.")
log.error(
"WakeupException limit exceeded during poll({}), stopping (max-wakeups = {}, wakeup-timeout = {}).",
timeout,
settings.maxWakeups,
settings.wakeupTimeout.toCoarsest
)
context.stop(self)
} else {
if (log.isWarningEnabled) {
val sw = new StringWriter
w.printStackTrace(new PrintWriter(sw))
if (log.isWarningEnabled && wakeups > settings.maxWakeups / 2) {
log.warning(
s"Consumer interrupted with WakeupException after timeout. Message: ${w.getMessage}. " +
s"Current value of akka.kafka.consumer.wakeup-timeout is ${settings.wakeupTimeout}. Exception: {}",
sw.toString
"Consumer poll({}) interrupted with WakeupException (#{} of max-wakeups = {}, wakeup-timeout = {}).",
timeout,
wakeups,
settings.maxWakeups,
settings.wakeupTimeout.toCoarsest
)
if (initialPoll) {
log.error(
"Initial consumer poll({}) with bootstrap servers {} did not succeed, still trying",
timeout,
settings.properties.getOrElse(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "not set")
)
}
}

// If the current consumer is using group assignment (i.e. subscriptions is non empty) the wakeup might
Expand Down Expand Up @@ -477,9 +490,9 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w
new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// this is invoked on the thread calling consumer.poll which will always be the actor, so it is safe
val duration = FiniteDuration(System.nanoTime() - startTime, NANOSECONDS)
if (duration > settings.commitTimeWarning) {
log.warning("Kafka commit took longer than `commit-time-warning`: {} ms", duration.toMillis)
val duration = System.nanoTime() - startTime
if (duration > settings.commitTimeWarning.toNanos) {
log.warning("Kafka commit took longer than `commit-time-warning`: {} ms", duration / 1000000L)
}
commitsInProgress -= 1
if (exception != null) reply ! Status.Failure(exception)
Expand Down

0 comments on commit 2ace54d

Please sign in to comment.