Skip to content

Commit

Permalink
Merge pull request #15 from pagopa/NOD-883_async_riversamento
Browse files Browse the repository at this point in the history
Nod 883 async riversamento
  • Loading branch information
aomegax authored May 16, 2024
2 parents 60984b4 + 175d71a commit 54bc932
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 214 deletions.
384 changes: 181 additions & 203 deletions openapi/openapi.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ public ResponseEntity<List<SyncStatusResponse>> standin() {
produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<List<SyncStatusResponse>> cache() {
log.debug("[NODE-CFG-SYNC] Force {} configuration to update", TargetRefreshEnum.cache.label);

Map<String, SyncStatusEnum> syncStatusEnumMap = apiConfigCacheService.syncCache();
apiConfigCacheService.syncRiversamento();

List<SyncStatusResponse> syncStatusResponseList = syncStatusEnumMap.entrySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
public enum TargetRefreshEnum {

cache("api-config-cache"),
standin("stand-in-manager");
standin("stand-in-manager"),
riversamento("riversamento");

public final String label;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import it.gov.pagopa.node.cfgsync.model.SyncStatusEnum;
import it.gov.pagopa.node.cfgsync.model.TargetRefreshEnum;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -16,7 +15,6 @@
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Map;

@Slf4j
@Service
Expand Down Expand Up @@ -54,11 +52,20 @@ public void processEvent(EventContext eventContext) {
TargetRefreshEnum.cache.label,
eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(),
eventContext.getEventData().getBodyAsString());

try {
apiConfigCacheService.syncCache();
} catch (Exception ex) {
log.error("[{}][ALERT] Generic Error on consumer: {}", TargetRefreshEnum.cache.label, ex.getMessage(), ex);
}

try {
apiConfigCacheService.syncRiversamento();
} catch (Exception ex) {
log.error("[{}][ALERT] Generic Error on consumer: {}", TargetRefreshEnum.riversamento.label, ex.getMessage(), ex);
}


}

public void processError(ErrorContext errorContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,6 @@ public Map<String, SyncStatusEnum> syncCache() {
saveNexiPostgres(syncStatusMap, configCache);
saveNexiOracle(syncStatusMap, configCache);

if(riversamentoEnabled) {
riversamentoElencoServizi();
riversamentoCdiPreferences();
}

return composeSyncStatusMapResult(TargetRefreshEnum.cache.label, syncStatusMap);
} catch (FeignException fEx) {
log.error("[{}] error: {}", TargetRefreshEnum.cache.label, fEx.getMessage(), fEx);
Expand All @@ -160,6 +155,15 @@ public Map<String, SyncStatusEnum> syncCache() {
}
}

@Transactional
public void syncRiversamento() {
if(riversamentoEnabled) {
log.info("riversamento elenco servizi e cdi preferences abilitato");
riversamentoElencoServizi();
riversamentoCdiPreferences();
}
}

private void savePagoPA(Map<String, SyncStatusEnum> syncStatusMap, ConfigCache configCache) {
try {
if(apiConfigCacheWritePagoPa) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class CacheSyncNexiOracleTest {
}

@Test
void nexioracle() {
void nexioracle() throws InterruptedException {

ReflectionTestUtils.setField(cacheManagerService, "riversamentoSource", "nexi-oracle");
ReflectionTestUtils.setField(cacheManagerService, "riversamentoTarget", "nexi-oracle");
Expand Down Expand Up @@ -124,6 +124,7 @@ void nexioracle() {
assertThat(response.getBody().get(1).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(2).getServiceIdentifier()).isEqualTo(NEXIORACLE_SI);
assertThat(response.getBody().get(2).getStatus()).isEqualTo(SyncStatusEnum.DONE);
Thread.sleep(5000);
List<CDIPreferences> all = nexiCdiPreferencesOracleRepository.findAll();
assertThat(all.size()).isEqualTo(size);
assertThat(all.get(0).getSeller()).isEqualTo(arrayList.get(0).getSeller());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CacheSyncNexiPostgresTest {
}

@Test
void nexipostgres() {
void nexipostgres() throws InterruptedException {

ReflectionTestUtils.setField(cacheManagerService, "riversamentoSource", "nexi-postgres");
ReflectionTestUtils.setField(cacheManagerService, "riversamentoTarget", "nexi-postgres");
Expand Down Expand Up @@ -117,8 +117,154 @@ void nexipostgres() {
assertThat(response.getBody().get(1).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(2).getServiceIdentifier()).isEqualTo(NEXIORACLE_SI);
assertThat(response.getBody().get(2).getStatus()).isEqualTo(SyncStatusEnum.DONE);
Thread.sleep(5000);
List<CDIPreferences> all = nexiCdiPreferencesPostgresRepository.findAll();
assertThat(all.size()).isEqualTo(size);

}

@Test
void nexipostgres2() throws InterruptedException {

ReflectionTestUtils.setField(cacheManagerService, "riversamentoEnabled", false);

long size = Math.round(Math.random()*500);
ArrayList<CDIPreferencesView> arrayList = new ArrayList();
for(long i = 0;i<size;i++){
arrayList.add(new CDIPreferencesView(new Long(i),"","", BigDecimal.ZERO,new Long(i)));
}
long originalcount = nexiCdiPreferencesPostgresRepository.count();
nexiCdiPreferencesViewPostgresRepository.deleteAll();
nexiCdiPreferencesViewPostgresRepository.saveAll(arrayList);

Map<String, Collection<String>> headersCustom =
Map.of(
HEADER_CACHE_ID, List.of(String.valueOf(System.currentTimeMillis())),
HEADER_CACHE_TIMESTAMP, List.of(Instant.now().toString()),
HEADER_CACHE_VERSION, List.of(StringUtils.repeat("*", 50))
);
when(apiConfigCacheClient.getCache(anyString())).thenReturn(Response
.builder()
.status(200)
.reason("Mocked")
.headers(headersCustom)
.request(mock(Request.class))
.body(new byte[0])
.build());
cacheManagerService.setApiConfigCacheClient(apiConfigCacheClient);

ResponseEntity<List<SyncStatusResponse>> response = restTemplate.exchange(CACHE_URL, HttpMethod.PUT, null, new ParameterizedTypeReference<>() {});

assertThat(response.getBody()).isNotNull();
assertFalse(response.getHeaders().isEmpty());
assertFalse(response.getBody().isEmpty());
assertEquals(3, response.getBody().size());
assertThat(response.getBody().get(0).getServiceIdentifier()).isEqualTo(PAGOPAPOSTGRES_SI);
assertThat(response.getBody().get(0).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(1).getServiceIdentifier()).isEqualTo(NEXIPOSTGRES_SI);
assertThat(response.getBody().get(1).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(2).getServiceIdentifier()).isEqualTo(NEXIORACLE_SI);
assertThat(response.getBody().get(2).getStatus()).isEqualTo(SyncStatusEnum.DONE);
Thread.sleep(5000);
Long afterCount = nexiCdiPreferencesPostgresRepository.count();
assertThat(afterCount).isEqualTo(originalcount);

}

@Test
void nexipostgres3() throws InterruptedException {

ReflectionTestUtils.setField(cacheManagerService, "riversamentoSource", "exception");
ReflectionTestUtils.setField(cacheManagerService, "riversamentoTarget", "nexi-postgres");

long size = Math.round(Math.random()*500);
ArrayList<CDIPreferencesView> arrayList = new ArrayList();
for(long i = 0;i<size;i++){
arrayList.add(new CDIPreferencesView(new Long(i),"","", BigDecimal.ZERO,new Long(i)));
}
long originalcount = nexiCdiPreferencesPostgresRepository.count();
nexiCdiPreferencesViewPostgresRepository.saveAll(arrayList);

Map<String, Collection<String>> headersCustom =
Map.of(
HEADER_CACHE_ID, List.of(String.valueOf(System.currentTimeMillis())),
HEADER_CACHE_TIMESTAMP, List.of(Instant.now().toString()),
HEADER_CACHE_VERSION, List.of(StringUtils.repeat("*", 50))
);
when(apiConfigCacheClient.getCache(anyString())).thenReturn(Response
.builder()
.status(200)
.reason("Mocked")
.headers(headersCustom)
.request(mock(Request.class))
.body(new byte[0])
.build());
cacheManagerService.setApiConfigCacheClient(apiConfigCacheClient);

ResponseEntity<List<SyncStatusResponse>> response = restTemplate.exchange(CACHE_URL, HttpMethod.PUT, null, new ParameterizedTypeReference<>() {});

assertThat(response.getBody()).isNotNull();
assertFalse(response.getHeaders().isEmpty());
assertFalse(response.getBody().isEmpty());
assertEquals(3, response.getBody().size());
assertThat(response.getBody().get(0).getServiceIdentifier()).isEqualTo(PAGOPAPOSTGRES_SI);
assertThat(response.getBody().get(0).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(1).getServiceIdentifier()).isEqualTo(NEXIPOSTGRES_SI);
assertThat(response.getBody().get(1).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(2).getServiceIdentifier()).isEqualTo(NEXIORACLE_SI);
assertThat(response.getBody().get(2).getStatus()).isEqualTo(SyncStatusEnum.DONE);
Thread.sleep(1000);
long all = nexiCdiPreferencesPostgresRepository.count();
assertThat(all).isEqualTo(originalcount);

}

@Test
void nexipostgres4() throws InterruptedException {

ReflectionTestUtils.setField(cacheManagerService, "riversamentoSource", "nexi-postgres");
ReflectionTestUtils.setField(cacheManagerService, "riversamentoTarget", "exception");

long size = Math.round(Math.random()*500);
ArrayList<CDIPreferencesView> arrayList = new ArrayList();
for(long i = 0;i<size;i++){
arrayList.add(new CDIPreferencesView(new Long(i),"","", BigDecimal.ZERO,new Long(i)));
}
long originalcount = nexiCdiPreferencesPostgresRepository.count();
nexiCdiPreferencesViewPostgresRepository.saveAll(arrayList);

Map<String, Collection<String>> headersCustom =
Map.of(
HEADER_CACHE_ID, List.of(String.valueOf(System.currentTimeMillis())),
HEADER_CACHE_TIMESTAMP, List.of(Instant.now().toString()),
HEADER_CACHE_VERSION, List.of(StringUtils.repeat("*", 50))
);
when(apiConfigCacheClient.getCache(anyString())).thenReturn(Response
.builder()
.status(200)
.reason("Mocked")
.headers(headersCustom)
.request(mock(Request.class))
.body(new byte[0])
.build());
cacheManagerService.setApiConfigCacheClient(apiConfigCacheClient);

ResponseEntity<List<SyncStatusResponse>> response = restTemplate.exchange(CACHE_URL, HttpMethod.PUT, null, new ParameterizedTypeReference<>() {});

assertThat(response.getBody()).isNotNull();
assertFalse(response.getHeaders().isEmpty());
assertFalse(response.getBody().isEmpty());
assertEquals(3, response.getBody().size());
assertThat(response.getBody().get(0).getServiceIdentifier()).isEqualTo(PAGOPAPOSTGRES_SI);
assertThat(response.getBody().get(0).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(1).getServiceIdentifier()).isEqualTo(NEXIPOSTGRES_SI);
assertThat(response.getBody().get(1).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(2).getServiceIdentifier()).isEqualTo(NEXIORACLE_SI);
assertThat(response.getBody().get(2).getStatus()).isEqualTo(SyncStatusEnum.DONE);
Thread.sleep(1000);
long all = nexiCdiPreferencesPostgresRepository.count();
assertThat(all).isEqualTo(originalcount);

}

}
3 changes: 2 additions & 1 deletion src/test/java/it/gov/pagopa/node/cfgsync/CacheSyncTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ void error500ConnectionRefused() {
}

@Test
void trimCacheVersionOnDb() {
void trimCacheVersionOnDb() throws InterruptedException {

long size = Math.round(Math.random()*500);
ArrayList<CDIPreferencesView> arrayList = new ArrayList();
Expand Down Expand Up @@ -205,6 +205,7 @@ void trimCacheVersionOnDb() {
assertThat(response.getBody().get(1).getStatus()).isEqualTo(SyncStatusEnum.DONE);
assertThat(response.getBody().get(2).getServiceIdentifier()).isEqualTo(NEXIORACLE_SI);
assertThat(response.getBody().get(2).getStatus()).isEqualTo(SyncStatusEnum.DONE);
Thread.sleep(5000);
List<CDIPreferences> all = pagoPaCdiPreferencesPostgresRepository.findAll();
assertThat(all.size()).isEqualTo(size);
}
Expand Down

0 comments on commit 54bc932

Please sign in to comment.