Skip to content

Commit

Permalink
Improve response handling and binary download
Browse files Browse the repository at this point in the history
This commit improves the handling of HTTP requests made via
the Java HttpClient to make sure the response body is *always*
used and the associated InputStream close when avaiable.

This is required otherwise it can result in a never released
SelectorManager thread associated with HttpClient object and
corresponsing memory leak. Read more here https://bugs.openjdk.org/browse/JDK-8308364

It also includes the HTTP status code 303 as a redirection code.

Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso committed Sep 30, 2023
1 parent 1eda4c6 commit bdc7394
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 22 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies {
implementation("io.micronaut.groovy:micronaut-runtime-groovy")
implementation("jakarta.annotation:jakarta.annotation-api")
implementation("io.micronaut:micronaut-validation")
implementation("io.micronaut.rxjava3:micronaut-rxjava3")
implementation "org.codehaus.groovy:groovy-json"
implementation "org.codehaus.groovy:groovy-nio"
implementation 'com.google.guava:guava:32.1.2-jre'
Expand Down
2 changes: 1 addition & 1 deletion src/main/groovy/io/seqera/wave/WaveDefault.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ interface WaveDefault {
'application/vnd.docker.distribution.manifest.list.v2+json' ) )


final public static int[] HTTP_REDIRECT_CODES = [301, 302, 307, 308]
final public static int[] HTTP_REDIRECT_CODES = [301, 302, 303, 307, 308]

final public static List<Integer> HTTP_SERVER_ERRORS = [502, 503, 504]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
.onRetry((event) -> log.warn("Unable to connect '$endpoint' - event: $event}"))
// make the request
final response = retryable.apply(()-> httpClient.send(request, HttpResponse.BodyHandlers.ofString()))
final body = response.body()
// check the response
if( response.statusCode() == 200 ) {
log.debug "Container registry '$endpoint' login - response: ${StringUtils.trunc(response.body())}"
log.debug "Container registry '$endpoint' login - response: ${StringUtils.trunc(body)}"
return true
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
.onRetry((event) -> log.warn("Unable to connect '$endpoint' - event: $event"))
// submit the request
final response = retryable.apply(()-> httpClient.send(request, HttpResponse.BodyHandlers.ofString()))
final body = response.body()
// check response
final code = response.statusCode()
if( code == 401 ) {
Expand All @@ -92,7 +93,7 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
return new RegistryAuth(endpoint)
}
else {
throw new IllegalArgumentException("Request '$endpoint' unexpected response code: $code; message: ${response.body()} ")
throw new IllegalArgumentException("Request '$endpoint' unexpected response code: $code; message: ${body} ")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package io.seqera.wave.controller

import java.time.Instant

import java.util.concurrent.CompletableFuture
import java.util.function.Consumer
import javax.annotation.Nullable
Expand All @@ -35,16 +35,17 @@ import io.micronaut.http.HttpMethod
import io.micronaut.http.HttpRequest
import io.micronaut.http.HttpResponse
import io.micronaut.http.HttpStatus
import io.micronaut.http.MediaType
import io.micronaut.http.MutableHttpHeaders
import io.micronaut.http.MutableHttpResponse
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Error
import io.micronaut.http.annotation.Get
import io.micronaut.http.server.types.files.StreamedFile
import io.micronaut.http.server.util.HttpClientAddressResolver
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.schedulers.Schedulers
import io.seqera.wave.ErrorHandler
import io.seqera.wave.configuration.HttpClientConfig
import io.seqera.wave.core.RegistryProxyService
Expand Down Expand Up @@ -125,7 +126,6 @@ class RegistryProxyController {
}
}


@Error
HttpResponse<RegistryErrorResponse> handleError(HttpRequest request, Throwable t) {
return errorHandler.handle(request, t, (msg, code) -> new RegistryErrorResponse(msg,code) )
Expand Down Expand Up @@ -338,17 +338,30 @@ class RegistryProxyController {

MutableHttpResponse<?> fromDelegateResponse(final DelegateResponse response){

final Long len = response.headers
final Long contentLength = response.headers
.find {it.key.toLowerCase()=='content-length'}?.value?.first() as Long ?: null

final streamedFile = len
? new StreamedFile(response.body, MediaType.APPLICATION_OCTET_STREAM_TYPE, Instant.now().toEpochMilli(), len)
: new StreamedFile(response.body, MediaType.APPLICATION_OCTET_STREAM_TYPE)
Flowable bodyReader = Flowable.create({ emitter ->
try (final stream = response.body) {
final buffer = new byte[32 * 1024]
int len
while ((len=stream.read(buffer)) != -1) {
emitter.onNext(Arrays.copyOf(buffer, len));
}
emitter.onComplete()
}
catch (Throwable e) {
emitter.onError(e);
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())

final override = contentLength!=null
? Map.<String, String>of('content-length', String.valueOf(contentLength))
: Map.<String, String>of()
HttpResponse
.status(HttpStatus.valueOf(response.statusCode))
.body(streamedFile)
.headers(toMutableHeaders(response.headers))
.body(bodyReader)
.headers(toMutableHeaders(response.headers, override))
}

MutableHttpResponse<?> fromManifestResponse(DelegateResponse resp) {
Expand Down
23 changes: 18 additions & 5 deletions src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import io.seqera.wave.service.CredentialsService
import io.seqera.wave.service.persistence.PersistenceService
import io.seqera.wave.storage.DigestStore
import io.seqera.wave.storage.Storage
import io.seqera.wave.util.RegHelper
import jakarta.inject.Inject
import jakarta.inject.Singleton
import static io.seqera.wave.WaveDefault.HTTP_REDIRECT_CODES
Expand Down Expand Up @@ -136,16 +137,28 @@ class RegistryProxyService {
ProxyClient proxyClient = client(route)
final resp1 = proxyClient.getStream(route.path, headers, false)
final redirect = resp1.headers().firstValue('Location').orElse(null)
if( redirect && resp1.statusCode() in HTTP_REDIRECT_CODES ) {
final status = resp1.statusCode()
if( redirect && status in HTTP_REDIRECT_CODES ) {
// the redirect location can be a relative path i.e. without hostname
// therefore resolve it against the target registry hostname
final target = proxyClient.registry.host.resolve(redirect).toString()
return new DelegateResponse(
final result = new DelegateResponse(
location: target,
statusCode: resp1.statusCode(),
headers:resp1.headers().map())
statusCode: status,
headers:resp1.headers().map(),
body: resp1.body())
// close the response to prevent leaks
RegHelper.closeResponse(resp1)
return result
}


if( redirect ) {
log.warn "Unexpected redirect location '${redirect}' with status code: ${status}"
}
else if( status>=300 && status<400 ) {
log.warn "Unexpected redirect status code: ${status}; headers: ${RegHelper.dumpHeaders(resp1.headers())}"
}

new DelegateResponse(
statusCode: resp1.statusCode(),
headers: resp1.headers().map(),
Expand Down
14 changes: 14 additions & 0 deletions src/main/groovy/io/seqera/wave/util/RegHelper.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.seqera.wave.util

import java.net.http.HttpHeaders
import java.net.http.HttpResponse
import java.nio.charset.Charset
import java.nio.file.Files
import java.nio.file.Path
Expand Down Expand Up @@ -266,4 +267,17 @@ class RegHelper {
return layerName(layer).replace(/.tar.gz/,'')
}

static void closeResponse(HttpResponse<?> response) {
log.debug "Closing HttpClient response: $response"
try {
// close the httpclient response to prevent leaks
// https://bugs.openjdk.org/browse/JDK-8308364
final b0 = response.body()
if( b0 instanceof Closeable )
b0.close()
}
catch (Throwable e) {
log.debug "Unexpected error while closing http response - cause: ${e.message}", e
}
}
}
13 changes: 9 additions & 4 deletions src/main/groovy/io/seqera/wave/util/Retryable.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.seqera.wave.util

import java.net.http.HttpResponse
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.function.Consumer
Expand Down Expand Up @@ -97,14 +98,18 @@ class Retryable<R> {
}

protected RetryPolicy retryPolicy() {
final attempt = new EventListener<ExecutionAttemptedEvent<R>>() {
final retry0 = new EventListener<ExecutionAttemptedEvent<R>>() {
@Override
void accept(ExecutionAttemptedEvent event) throws Throwable {
retryEvent?.accept(new Event("Retry", event.attemptCount, event.lastResult, event.lastFailure))
// close the http response
if( event.lastResult instanceof HttpResponse<?> ) {
RegHelper.closeResponse((HttpResponse<?>) event.lastResult)
}
}
}

final failure = new EventListener<ExecutionCompletedEvent<R>>() {
final failure0 = new EventListener<ExecutionCompletedEvent<R>>() {
@Override
void accept(ExecutionCompletedEvent event) throws Throwable {
retryEvent?.accept(new Event("Failure", event.attemptCount, event.result, event.failure))
Expand All @@ -122,8 +127,8 @@ class Retryable<R> {
.withBackoff(d.toMillis(), m.toMillis(), ChronoUnit.MILLIS)
.withMaxAttempts(a)
.withJitter(j)
.onRetry(attempt)
.onFailure(failure)
.onRetry(retry0)
.onFailure(failure0)
if( handleResult!=null )
policy.handleResultIf(handleResult)
return policy.build()
Expand Down

0 comments on commit bdc7394

Please sign in to comment.