Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
fabrizzio-dotCMS committed Oct 3, 2024
1 parent 69fa89a commit 5559853
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.dotcms.jobs.business.queue.error.JobQueueDataException;
import com.dotmarketing.exception.DotDataException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Expand Down Expand Up @@ -138,6 +139,13 @@ String createJob(String queueName, Map<String, Object> parameters)
*/
void setRetryStrategy(String queueName, RetryStrategy retryStrategy);

/**
* Retrieves the retry strategy for a specific queue.
* @param jobId The ID of the job
* @return The processor instance, or an empty optional if not found
*/
Optional<JobProcessor> getInstance(final String jobId);

/**
* @return The CircuitBreaker instance
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.dotcms.jobs.business.api.events.RealTimeJobMonitor;
import com.dotcms.jobs.business.error.CircuitBreaker;
import com.dotcms.jobs.business.error.ErrorDetail;
import com.dotcms.jobs.business.error.JobProcessorInstantiationException;
import com.dotcms.jobs.business.error.JobProcessorNotFoundException;
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
Expand All @@ -38,6 +39,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -247,14 +249,20 @@ public Map<String,Class<? extends JobProcessor>> getQueueNames() {
public String createJob(final String queueName, final Map<String, Object> parameters)
throws JobProcessorNotFoundException, DotDataException {
final String queueNameLower = queueName.toLowerCase();
if (!processors.containsKey(queueNameLower)) {
final Class<? extends JobProcessor> clazz = processors.get(queueNameLower);
if (null == clazz) {
final var error = new JobProcessorNotFoundException(queueName);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}

//first attempt instantiating the processor, cuz if we cant no use to create an entry in the db
final var processor = newInstanceOfProcessor(queueNameLower).orElseThrow();
// now that we know we can instantiate the processor, we can add it to the map of instances
// But first we need the job id
try {
String jobId = jobQueue.createJob(queueNameLower, parameters);
final String jobId = jobQueue.createJob(queueNameLower, parameters);
addInstanceRef(jobId, processor);
eventProducer.getEvent(JobCreatedEvent.class).fire(
new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters)
);
Expand Down Expand Up @@ -359,6 +367,12 @@ public void setRetryStrategy(final String queueName, final RetryStrategy retrySt
retryStrategies.put(queueName, retryStrategy);
}

@Override
@VisibleForTesting
public Optional<JobProcessor> getInstance(final String jobId) {
return Optional.ofNullable(processorInstancesByJobId.get(jobId));
}

@Override
@VisibleForTesting
public CircuitBreaker getCircuitBreaker() {
Expand Down Expand Up @@ -459,7 +473,7 @@ private void processJobs() {

try {

boolean jobProcessed = processNextJob();
final boolean jobProcessed = processNextJob();
if (jobProcessed) {
emptyQueueCount = 0;
} else {
Expand Down Expand Up @@ -620,11 +634,8 @@ private void handleNonRetryableFailedJob(final Job job) throws DotDataException
*/
private void processJob(final Job job) throws DotDataException {

Class<? extends JobProcessor> processorClass = processors.get(job.queueName());
if (processorClass != null) {

final JobProcessor processor = newJobProcessorInstance(job,
processorClass);
final JobProcessor processor = processorInstancesByJobId.get(job.id());
if (processor != null) {

final ProgressTracker progressTracker = new DefaultProgressTracker();
Job runningJob = job.markAsRunning().withProgressTracker(progressTracker);
Expand Down Expand Up @@ -669,8 +680,6 @@ private void processJob(final Job job) throws DotDataException {
handleJobFailure(
runningJob, processor, e, e.getMessage(), "Job execution"
);
} finally {
processorInstancesByJobId.remove(job.id());
}
} else {

Expand All @@ -682,22 +691,40 @@ private void processJob(final Job job) throws DotDataException {
}

/**
* Instantiate a new JobProcessor instance for a specific job. and store the reference in a map.
* @param job
* @param processorClass
* @return
* Creates a new instance of a JobProcessor for a specific queue.
* @param queueName
* @return An optional containing the new JobProcessor instance, or an empty optional if the processor could not be created.
*/
Optional<JobProcessor> newInstanceOfProcessor(final String queueName) {
final var processorClass = processors.get(queueName);
if (processorClass != null) {
try {
return Optional.of(processorClass.getDeclaredConstructor().newInstance());
} catch (Exception e) {
Logger.error(this, "Error creating job processor", e);
throw new JobProcessorInstantiationException(processorClass,e);
}
}
return Optional.empty();
}

/**
* Once we're sure a processor can be instantiated, we add it to the map of instances.
* @param jobId The ID of the job
* @param processor The processor to add
* @return The processor instance
*/
private JobProcessor newJobProcessorInstance(final Job job, final Class<? extends JobProcessor> processorClass) {
private void addInstanceRef(final String jobId, final JobProcessor processor) {
//Get an instance and put it in the map
return processorInstancesByJobId.computeIfAbsent(
job.id(), k -> {
try {
return processorClass.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new DotRuntimeException("Error creating job processor", e);
}
}
);
processorInstancesByJobId.putIfAbsent(jobId, processor);
}

/**
* Removes a processor instance from the map of instances.
* @param jobId The ID of the job
*/
private void removeInstanceRef(final String jobId) {
processorInstancesByJobId.remove(jobId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.dotcms.jobs.business.error;

import com.dotcms.jobs.business.processor.JobProcessor;

/**
* Exception thrown when an error occurs while attempting to instantiate a new JobProcessor.
*/
public class JobProcessorInstantiationException extends RuntimeException{

/**
* Constructs a new JobProcessorInstantiationException with the specified processor class and cause.
*
* @param processorClass The class of the JobProcessor that could not be instantiated
* @param cause The underlying cause of the error (can be null)
*/
public JobProcessorInstantiationException(Class<? extends JobProcessor> processorClass, Throwable cause) {
super("Failed to instantiate a new JobProcessor out of the provided class: " + processorClass.getName(), cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -48,7 +49,7 @@ public class JobQueueManagerAPIIntegrationTest {
*/
@BeforeAll
static void setUp() throws Exception {

System.out.println("JobQueueManagerAPIIntegrationTest.setUp");
// Initialize the test environment
IntegrationTestInitService.getInstance().init();

Expand All @@ -63,15 +64,20 @@ static void setUp() throws Exception {
*/
@AfterAll
static void cleanUp() throws Exception {

jobQueueManagerAPI.close();
System.out.println("JobQueueManagerAPIIntegrationTest.cleanUp");
if(null != jobQueueManagerAPI) {
jobQueueManagerAPI.close();
}
clearJobs();
}

@BeforeEach
void reset() {
System.out.println("JobQueueManagerAPIIntegrationTest.reset");
// Reset circuit breaker
jobQueueManagerAPI.getCircuitBreaker().reset();
if(null != jobQueueManagerAPI) {
jobQueueManagerAPI.getCircuitBreaker().reset();
}
}

/**
Expand All @@ -81,9 +87,9 @@ void reset() {
*/
@Test
void test_CreateAndProcessJob() throws Exception {

System.out.println("JobQueueManagerAPIIntegrationTest.test_CreateAndProcessJob");
// Register a test processor
jobQueueManagerAPI.registerProcessor("testQueue", new TestJobProcessor());
jobQueueManagerAPI.registerProcessor("testQueue", TestJobProcessor.class);

// Start the JobQueueManagerAPI
if (!jobQueueManagerAPI.isStarted()) {
Expand Down Expand Up @@ -126,8 +132,8 @@ void test_CreateAndProcessJob() throws Exception {
*/
@Test
void test_FailingJob() throws Exception {

jobQueueManagerAPI.registerProcessor("failingQueue", new FailingJobProcessor());
System.out.println("JobQueueManagerAPIIntegrationTest.test_FailingJob");
jobQueueManagerAPI.registerProcessor("failingQueue", FailingJobProcessor.class);
RetryStrategy contentImportRetryStrategy = new ExponentialBackoffRetryStrategy(
5000, 300000, 2.0, 0
);
Expand Down Expand Up @@ -174,17 +180,21 @@ void test_FailingJob() throws Exception {
*/
@Test
void test_CancelJob() throws Exception {

CancellableJobProcessor processor = new CancellableJobProcessor();
jobQueueManagerAPI.registerProcessor("cancellableQueue", processor);
System.out.println("JobQueueManagerAPIIntegrationTest.test_CancelJob");
jobQueueManagerAPI.registerProcessor("cancellableQueue", CancellableJobProcessor.class);

if (!jobQueueManagerAPI.isStarted()) {
jobQueueManagerAPI.start();
jobQueueManagerAPI.awaitStart(5, TimeUnit.SECONDS);
}

Map<String, Object> parameters = new HashMap<>();
String jobId = jobQueueManagerAPI.createJob("cancellableQueue", parameters);
final String jobId = jobQueueManagerAPI.createJob("cancellableQueue", parameters);

//Get the instance of the job processor immediately after creating the job cuz once it gets cancelled, it will be removed from the map
final Optional<JobProcessor> instance = jobQueueManagerAPI.getInstance(jobId);
assertTrue(instance.isPresent(),()->"Should have been able to create an instance of the job processor");
final CancellableJobProcessor processor = (CancellableJobProcessor)instance.get();

Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> {
Expand Down Expand Up @@ -212,6 +222,7 @@ void test_CancelJob() throws Exception {
Job job = jobQueueManagerAPI.getJob(jobId);
assertEquals(JobState.CANCELED, job.state(),
"Job should be in CANCELED state");

assertTrue(processor.wasCanceled(),
"Job processor should have been canceled");
});
Expand All @@ -226,10 +237,9 @@ void test_CancelJob() throws Exception {
*/
@Test
void test_JobRetry() throws Exception {

int maxRetries = 3;
RetryingJobProcessor processor = new RetryingJobProcessor(maxRetries);
jobQueueManagerAPI.registerProcessor("retryQueue", processor);
System.out.println("JobQueueManagerAPIIntegrationTest.test_JobRetry");
final int maxRetries = RetryingJobProcessor.MAX_RETRIES;
jobQueueManagerAPI.registerProcessor("retryQueue", RetryingJobProcessor.class);

RetryStrategy retryStrategy = new ExponentialBackoffRetryStrategy(
100, 1000, 2.0, maxRetries
Expand All @@ -243,6 +253,9 @@ void test_JobRetry() throws Exception {

Map<String, Object> parameters = new HashMap<>();
String jobId = jobQueueManagerAPI.createJob("retryQueue", parameters);
final Optional<JobProcessor> instance = jobQueueManagerAPI.getInstance(jobId);
assertTrue(instance.isPresent(),()->"Should be able to create an instance of the job processor");
RetryingJobProcessor processor = (RetryingJobProcessor)instance.get();

CountDownLatch latch = new CountDownLatch(1);
jobQueueManagerAPI.watchJob(jobId, job -> {
Expand Down Expand Up @@ -274,10 +287,9 @@ void test_JobRetry() throws Exception {
*/
@Test
void test_JobWithProgressTracker() throws Exception {

System.out.println("JobQueueManagerAPIIntegrationTest.test_JobWithProgressTracker");
// Register a processor that uses progress tracking
ProgressTrackingJobProcessor processor = new ProgressTrackingJobProcessor();
jobQueueManagerAPI.registerProcessor("progressQueue", processor);
jobQueueManagerAPI.registerProcessor("progressQueue", ProgressTrackingJobProcessor.class);

// Start the JobQueueManagerAPI
if (!jobQueueManagerAPI.isStarted()) {
Expand Down Expand Up @@ -336,11 +348,11 @@ void test_JobWithProgressTracker() throws Exception {
*/
@Test
void test_CombinedScenarios() throws Exception {

System.out.println("JobQueueManagerAPIIntegrationTest.test_CombinedScenarios");
// Register processors for different scenarios
jobQueueManagerAPI.registerProcessor("successQueue", new TestJobProcessor());
jobQueueManagerAPI.registerProcessor("failQueue", new FailingJobProcessor());
jobQueueManagerAPI.registerProcessor("cancelQueue", new CancellableJobProcessor());
jobQueueManagerAPI.registerProcessor("successQueue", TestJobProcessor.class);
jobQueueManagerAPI.registerProcessor("failQueue", FailingJobProcessor.class);
jobQueueManagerAPI.registerProcessor("cancelQueue", CancellableJobProcessor.class);

// Set up retry strategy for failing jobs
RetryStrategy retryStrategy = new ExponentialBackoffRetryStrategy(
Expand Down Expand Up @@ -424,7 +436,7 @@ void test_CombinedScenarios() throws Exception {
});
}

private static class ProgressTrackingJobProcessor implements JobProcessor {
static class ProgressTrackingJobProcessor implements JobProcessor {
@Override
public void process(Job job) {
ProgressTracker tracker = job.progressTracker().orElseThrow(
Expand All @@ -445,22 +457,23 @@ public Map<String, Object> getResultMetadata(Job job) {
}
}

private static class RetryingJobProcessor implements JobProcessor {
static class RetryingJobProcessor implements JobProcessor {

private final int maxRetries;
public static final int MAX_RETRIES = 3;
private int attempts = 0;

public RetryingJobProcessor(int maxRetries) {
this.maxRetries = maxRetries;
public RetryingJobProcessor() {
// needed for instantiation purposes
}

@Override
public void process(Job job) {
attempts++;
if (attempts <= maxRetries) {
if (attempts <= MAX_RETRIES) {
throw new RuntimeException("Simulated failure, attempt " + attempts);
}
// If we've reached here, we've exceeded maxRetries and the job should succeed
System.out.println("Job succeeded after " + attempts + " attempts");
}

@Override
Expand All @@ -475,7 +488,7 @@ public int getAttempts() {
}
}

private static class FailingJobProcessor implements JobProcessor {
static class FailingJobProcessor implements JobProcessor {

@Override
public void process(Job job) {
Expand All @@ -488,7 +501,7 @@ public Map<String, Object> getResultMetadata(Job job) {
}
}

private static class CancellableJobProcessor implements JobProcessor, Cancellable {
static class CancellableJobProcessor implements JobProcessor, Cancellable {

private final AtomicBoolean canceled = new AtomicBoolean(false);
private final AtomicBoolean wasCanceled = new AtomicBoolean(false);
Expand Down Expand Up @@ -519,7 +532,7 @@ public boolean wasCanceled() {
}
}

private static class TestJobProcessor implements JobProcessor {
static class TestJobProcessor implements JobProcessor {

@Override
public void process(Job job) {
Expand Down
Loading

0 comments on commit 5559853

Please sign in to comment.