From bdc73943954194462863d40dfd6b26cccb989ff8 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sat, 30 Sep 2023 17:34:15 +0200 Subject: [PATCH] Improve response handling and binary download This commit improves the handling of HTTP requests made via the Java HttpClient to make sure the response body is *always* used and the associated InputStream close when avaiable. This is required otherwise it can result in a never released SelectorManager thread associated with HttpClient object and corresponsing memory leak. Read more here https://bugs.openjdk.org/browse/JDK-8308364 It also includes the HTTP status code 303 as a redirection code. Signed-off-by: Paolo Di Tommaso --- build.gradle | 1 + .../groovy/io/seqera/wave/WaveDefault.groovy | 2 +- .../wave/auth/RegistryAuthServiceImpl.groovy | 3 +- .../auth/RegistryLookupServiceImpl.groovy | 3 +- .../controller/RegistryProxyController.groovy | 33 +++++++++++++------ .../wave/core/RegistryProxyService.groovy | 23 ++++++++++--- .../io/seqera/wave/util/RegHelper.groovy | 14 ++++++++ .../io/seqera/wave/util/Retryable.groovy | 13 +++++--- 8 files changed, 70 insertions(+), 22 deletions(-) diff --git a/build.gradle b/build.gradle index 56744853d..7d4b08205 100644 --- a/build.gradle +++ b/build.gradle @@ -48,6 +48,7 @@ dependencies { implementation("io.micronaut.groovy:micronaut-runtime-groovy") implementation("jakarta.annotation:jakarta.annotation-api") implementation("io.micronaut:micronaut-validation") + implementation("io.micronaut.rxjava3:micronaut-rxjava3") implementation "org.codehaus.groovy:groovy-json" implementation "org.codehaus.groovy:groovy-nio" implementation 'com.google.guava:guava:32.1.2-jre' diff --git a/src/main/groovy/io/seqera/wave/WaveDefault.groovy b/src/main/groovy/io/seqera/wave/WaveDefault.groovy index 401e8b30b..2c395a522 100644 --- a/src/main/groovy/io/seqera/wave/WaveDefault.groovy +++ b/src/main/groovy/io/seqera/wave/WaveDefault.groovy @@ -39,7 +39,7 @@ interface WaveDefault { 'application/vnd.docker.distribution.manifest.list.v2+json' ) ) - final public static int[] HTTP_REDIRECT_CODES = [301, 302, 307, 308] + final public static int[] HTTP_REDIRECT_CODES = [301, 302, 303, 307, 308] final public static List HTTP_SERVER_ERRORS = [502, 503, 504] diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy index b4340e000..d53f4b9c0 100644 --- a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy @@ -120,9 +120,10 @@ class RegistryAuthServiceImpl implements RegistryAuthService { .onRetry((event) -> log.warn("Unable to connect '$endpoint' - event: $event}")) // make the request final response = retryable.apply(()-> httpClient.send(request, HttpResponse.BodyHandlers.ofString())) + final body = response.body() // check the response if( response.statusCode() == 200 ) { - log.debug "Container registry '$endpoint' login - response: ${StringUtils.trunc(response.body())}" + log.debug "Container registry '$endpoint' login - response: ${StringUtils.trunc(body)}" return true } else { diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy index 5f9b33ed7..89812476c 100644 --- a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy @@ -77,6 +77,7 @@ class RegistryLookupServiceImpl implements RegistryLookupService { .onRetry((event) -> log.warn("Unable to connect '$endpoint' - event: $event")) // submit the request final response = retryable.apply(()-> httpClient.send(request, HttpResponse.BodyHandlers.ofString())) + final body = response.body() // check response final code = response.statusCode() if( code == 401 ) { @@ -92,7 +93,7 @@ class RegistryLookupServiceImpl implements RegistryLookupService { return new RegistryAuth(endpoint) } else { - throw new IllegalArgumentException("Request '$endpoint' unexpected response code: $code; message: ${response.body()} ") + throw new IllegalArgumentException("Request '$endpoint' unexpected response code: $code; message: ${body} ") } } diff --git a/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy b/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy index 1f5144b1d..3bfcd99f9 100644 --- a/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy @@ -18,7 +18,7 @@ package io.seqera.wave.controller -import java.time.Instant + import java.util.concurrent.CompletableFuture import java.util.function.Consumer import javax.annotation.Nullable @@ -35,16 +35,17 @@ import io.micronaut.http.HttpMethod import io.micronaut.http.HttpRequest import io.micronaut.http.HttpResponse import io.micronaut.http.HttpStatus -import io.micronaut.http.MediaType import io.micronaut.http.MutableHttpHeaders import io.micronaut.http.MutableHttpResponse import io.micronaut.http.annotation.Controller import io.micronaut.http.annotation.Error import io.micronaut.http.annotation.Get -import io.micronaut.http.server.types.files.StreamedFile import io.micronaut.http.server.util.HttpClientAddressResolver import io.micronaut.scheduling.TaskExecutors import io.micronaut.scheduling.annotation.ExecuteOn +import io.reactivex.rxjava3.core.BackpressureStrategy +import io.reactivex.rxjava3.core.Flowable +import io.reactivex.rxjava3.schedulers.Schedulers import io.seqera.wave.ErrorHandler import io.seqera.wave.configuration.HttpClientConfig import io.seqera.wave.core.RegistryProxyService @@ -125,7 +126,6 @@ class RegistryProxyController { } } - @Error HttpResponse handleError(HttpRequest request, Throwable t) { return errorHandler.handle(request, t, (msg, code) -> new RegistryErrorResponse(msg,code) ) @@ -338,17 +338,30 @@ class RegistryProxyController { MutableHttpResponse fromDelegateResponse(final DelegateResponse response){ - final Long len = response.headers + final Long contentLength = response.headers .find {it.key.toLowerCase()=='content-length'}?.value?.first() as Long ?: null - final streamedFile = len - ? new StreamedFile(response.body, MediaType.APPLICATION_OCTET_STREAM_TYPE, Instant.now().toEpochMilli(), len) - : new StreamedFile(response.body, MediaType.APPLICATION_OCTET_STREAM_TYPE) + Flowable bodyReader = Flowable.create({ emitter -> + try (final stream = response.body) { + final buffer = new byte[32 * 1024] + int len + while ((len=stream.read(buffer)) != -1) { + emitter.onNext(Arrays.copyOf(buffer, len)); + } + emitter.onComplete() + } + catch (Throwable e) { + emitter.onError(e); + } + }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()) + final override = contentLength!=null + ? Map.of('content-length', String.valueOf(contentLength)) + : Map.of() HttpResponse .status(HttpStatus.valueOf(response.statusCode)) - .body(streamedFile) - .headers(toMutableHeaders(response.headers)) + .body(bodyReader) + .headers(toMutableHeaders(response.headers, override)) } MutableHttpResponse fromManifestResponse(DelegateResponse resp) { diff --git a/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy b/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy index 8086d21ba..73d01e3cb 100644 --- a/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy +++ b/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy @@ -38,6 +38,7 @@ import io.seqera.wave.service.CredentialsService import io.seqera.wave.service.persistence.PersistenceService import io.seqera.wave.storage.DigestStore import io.seqera.wave.storage.Storage +import io.seqera.wave.util.RegHelper import jakarta.inject.Inject import jakarta.inject.Singleton import static io.seqera.wave.WaveDefault.HTTP_REDIRECT_CODES @@ -136,16 +137,28 @@ class RegistryProxyService { ProxyClient proxyClient = client(route) final resp1 = proxyClient.getStream(route.path, headers, false) final redirect = resp1.headers().firstValue('Location').orElse(null) - if( redirect && resp1.statusCode() in HTTP_REDIRECT_CODES ) { + final status = resp1.statusCode() + if( redirect && status in HTTP_REDIRECT_CODES ) { // the redirect location can be a relative path i.e. without hostname // therefore resolve it against the target registry hostname final target = proxyClient.registry.host.resolve(redirect).toString() - return new DelegateResponse( + final result = new DelegateResponse( location: target, - statusCode: resp1.statusCode(), - headers:resp1.headers().map()) + statusCode: status, + headers:resp1.headers().map(), + body: resp1.body()) + // close the response to prevent leaks + RegHelper.closeResponse(resp1) + return result } - + + if( redirect ) { + log.warn "Unexpected redirect location '${redirect}' with status code: ${status}" + } + else if( status>=300 && status<400 ) { + log.warn "Unexpected redirect status code: ${status}; headers: ${RegHelper.dumpHeaders(resp1.headers())}" + } + new DelegateResponse( statusCode: resp1.statusCode(), headers: resp1.headers().map(), diff --git a/src/main/groovy/io/seqera/wave/util/RegHelper.groovy b/src/main/groovy/io/seqera/wave/util/RegHelper.groovy index 45aeaf6b9..649f1bc8d 100644 --- a/src/main/groovy/io/seqera/wave/util/RegHelper.groovy +++ b/src/main/groovy/io/seqera/wave/util/RegHelper.groovy @@ -19,6 +19,7 @@ package io.seqera.wave.util import java.net.http.HttpHeaders +import java.net.http.HttpResponse import java.nio.charset.Charset import java.nio.file.Files import java.nio.file.Path @@ -266,4 +267,17 @@ class RegHelper { return layerName(layer).replace(/.tar.gz/,'') } + static void closeResponse(HttpResponse response) { + log.debug "Closing HttpClient response: $response" + try { + // close the httpclient response to prevent leaks + // https://bugs.openjdk.org/browse/JDK-8308364 + final b0 = response.body() + if( b0 instanceof Closeable ) + b0.close() + } + catch (Throwable e) { + log.debug "Unexpected error while closing http response - cause: ${e.message}", e + } + } } diff --git a/src/main/groovy/io/seqera/wave/util/Retryable.groovy b/src/main/groovy/io/seqera/wave/util/Retryable.groovy index db60d98d8..51a0a6376 100644 --- a/src/main/groovy/io/seqera/wave/util/Retryable.groovy +++ b/src/main/groovy/io/seqera/wave/util/Retryable.groovy @@ -18,6 +18,7 @@ package io.seqera.wave.util +import java.net.http.HttpResponse import java.time.Duration import java.time.temporal.ChronoUnit import java.util.function.Consumer @@ -97,14 +98,18 @@ class Retryable { } protected RetryPolicy retryPolicy() { - final attempt = new EventListener>() { + final retry0 = new EventListener>() { @Override void accept(ExecutionAttemptedEvent event) throws Throwable { retryEvent?.accept(new Event("Retry", event.attemptCount, event.lastResult, event.lastFailure)) + // close the http response + if( event.lastResult instanceof HttpResponse ) { + RegHelper.closeResponse((HttpResponse) event.lastResult) + } } } - final failure = new EventListener>() { + final failure0 = new EventListener>() { @Override void accept(ExecutionCompletedEvent event) throws Throwable { retryEvent?.accept(new Event("Failure", event.attemptCount, event.result, event.failure)) @@ -122,8 +127,8 @@ class Retryable { .withBackoff(d.toMillis(), m.toMillis(), ChronoUnit.MILLIS) .withMaxAttempts(a) .withJitter(j) - .onRetry(attempt) - .onFailure(failure) + .onRetry(retry0) + .onFailure(failure0) if( handleResult!=null ) policy.handleResultIf(handleResult) return policy.build()