Skip to content

Commit

Permalink
Add pool handling for Jackson mapper objects (#666)
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso authored Oct 3, 2024
1 parent afeaf83 commit 1e5c64a
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 9 deletions.
30 changes: 21 additions & 9 deletions src/main/groovy/io/seqera/wave/util/JacksonHelper.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ import groovy.transform.CompileStatic
@CompileStatic
class JacksonHelper {

static private ObjectMapper DEFAULT_JSON_MAPPER = defaultJsonMapper()
static private SimplePool<ObjectMapper> defaultJsonMapper = new SimplePool<>(()-> defaultJsonMapper())

static private ObjectMapper PUBLIC_JSON_MAPPER = publicViewJsonMapper()
static private SimplePool<ObjectMapper> publicJsonMapper = new SimplePool<>(()-> publicViewJsonMapper())

static private ObjectMapper YAML_MAPPER = defaultYamlMapper()
static private SimplePool<ObjectMapper> yamlMapper = new SimplePool<>(()-> defaultYamlMapper())

static private ObjectMapper createMapper0(boolean yaml=false, boolean failOnUnknownProperties=false) {
// GString serializer
Expand Down Expand Up @@ -77,11 +77,15 @@ class JacksonHelper {
* @return A concrete instance of {@code T}
*/
static <T> T fromJson(String str, Class<T> type) {
str != null ? DEFAULT_JSON_MAPPER.readValue(str, type) : null
if( str==null )
return null
return defaultJsonMapper.apply((mapper)-> mapper.readValue(str, type))
}

static <T> T fromJson(String str, TypeReference<T> type) {
str != null ? DEFAULT_JSON_MAPPER.readValue(str, type) : null
if( str==null )
return null
return defaultJsonMapper.apply((mapper)->mapper.readValue(str, type))
}

/**
Expand All @@ -92,11 +96,15 @@ class JacksonHelper {
* @return A json representation of the specified object
*/
static String toJson(Object config) {
config != null ? DEFAULT_JSON_MAPPER.writeValueAsString(config) : null
if( config==null )
return null
return defaultJsonMapper.apply((mapper)-> mapper.writeValueAsString(config))
}

static String toJsonWithPublicView(Object config) {
config != null ? PUBLIC_JSON_MAPPER.writerWithView(Views.Public).writeValueAsString(config) : null
if( config==null )
return null
return publicJsonMapper.apply((mapper)-> mapper.writerWithView(Views.Public).writeValueAsString(config))
}

/**
Expand All @@ -106,7 +114,9 @@ class JacksonHelper {
* @return A concrete instance of {@code T}
*/
static <T> T fromYaml(String str, Class<T> type) {
str != null ? YAML_MAPPER.readValue(str, type) : null
if( str==null )
return null
return yamlMapper.apply((mapper)->mapper.readValue(str, type))
}

/**
Expand All @@ -117,7 +127,9 @@ class JacksonHelper {
* @return A yaml representation of the specified object
*/
static String toYaml(Object config) {
config != null ? YAML_MAPPER.writeValueAsString(config) : null
if( config==null )
return null
return yamlMapper.apply((mapper)->mapper.writeValueAsString(config))
}

}
93 changes: 93 additions & 0 deletions src/main/groovy/io/seqera/wave/util/SimplePool.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.seqera.wave.util

import java.lang.ref.SoftReference
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.function.Function
import java.util.function.Supplier

import groovy.transform.CompileStatic
/**
* Implement a simple object pool retaining objects using a {@link SoftReference}.
*
* The pool is meant to be created specifying a {@link Supplier} function as argument to
* the {@link SimplePool} constructor to instantiate pooled objects.
*
* Pool object need to be accessed by {@link SimplePool#borrow()} and release
* once used via {@link SimplePool#release(Object)}.
*
* Alternatively a function can be applied to a pooled object by using {@link SimplePool#apply(Function)}
* method which takes care of borrowing an object and releasing it automatically when done. If the pool
* is empty a new object is created implicitly and returned to the pool on completion.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
class SimplePool<T> {

// A thread-safe queue to hold SoftReferences to pooled objects
private final Queue<SoftReference<T>> pool = new ConcurrentLinkedQueue<>()

// Factory for creating new objects
private final Supplier<T> factory

/**
* Create a pool with the given supplier function. The supplier behave as factory
* to instantiate new object instances when the pool is empty.
*
* @param factory A {@link Supplier} function that creates instances of the pooled objects.
*/
SimplePool(Supplier<T> factory) {
this.factory = factory
}

/**
* Borrow an object from the pool. The object is expected to be returned by using the {@link #release(Object)}
* method.
*
* @return The object instance taken from the pool or a new instance when the pool is empty.
*/
T borrow() {
// Attempt to retrieve a non-collected object from the pool
T result
while (!pool.isEmpty()) {
SoftReference<T> ref = pool.poll()
if( ref!=null && (result=ref.get())!=null ) {
return result
}
}

// If no valid object was found, create a new one
return factory.get()
}

/**
* Return an object to the pool.
*
* @param obj The object to be returned
*/
void release(T obj) {
if (obj != null) {
pool.offer(new SoftReference<>(obj))
}
}

/**
* Apply the specified function to an object in the pool or create a new object if needed. Once
* the function has been carried out the object is automatically returned to the pool.
*
* This method is a shortcut to the {@link #borrow()}, operation and {@link #release} pattern
*
* @param function A {@link Function} to be applied to the pooled object.
* @return The value returned by the {@link Function} execution.
*/
<R> R apply(Function<T,R> function) {
final object = borrow()
try {
return function.apply(object)
}
finally {
release(object)
}
}

}
86 changes: 86 additions & 0 deletions src/test/groovy/io/seqera/wave/util/SimplePoolTest.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.seqera.wave.util

import spock.lang.Specification

import java.util.concurrent.CompletableFuture

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class SimplePoolTest extends Specification {

static class MyObject {

}

def 'should borrow the same object' () {
given:
def pool = new SimplePool(()-> new MyObject())

when:
def o1 = pool.borrow()
pool.release(o1)
and:
def o2 = pool.borrow()
pool.release(o2)
then:
// is the same object
o1.is(o2)
}

def 'should borrow a new object' () {
given:
def pool = new SimplePool(()-> new MyObject())

when:
def o1 = pool.borrow()
and:
def o2 = pool.borrow()
then:
// is not the same object
!o1.is(o2)
}

def 'should apply a function and use the same object' () {
given:
def objects = new HashSet()
and:
def pool = new SimplePool(()-> new MyObject())

expect:
// the first time is added to the `objects` set
pool.apply( (it) -> { assert objects.add(it); return it } ) instanceof MyObject
and:
// the following invocations use the same instance, the object is not added anymore
// to the set because it already contains it
pool.apply( (it) -> { assert !objects.add(it); return it } ) instanceof MyObject
pool.apply( (it) -> { assert !objects.add(it); return it } ) instanceof MyObject
and:
objects.size()==1
}


def 'should apply a function and use a new object instance' () {
given:
def objects = new HashSet()
def started = new CompletableFuture()
and:
def pool = new SimplePool(()-> new MyObject())

when:
// the first time is added to the `objects` set
def thread = Thread.start { pool.apply( (obj) -> { started.complete('yes'); sleep 300; assert objects.add(obj); return obj } ) }
and:
started.join()
then:
// the following invocations use a same instance, because the long running thread keep using the other instance
pool.apply( (it) -> { assert objects.add(it); return it } ) instanceof MyObject

when:
thread.join()
then:
objects.size()==2
}

}

0 comments on commit 1e5c64a

Please sign in to comment.