Skip to content

Commit

Permalink
Merge pull request #370 from ARGOeu/devel
Browse files Browse the repository at this point in the history
Devel
  • Loading branch information
themiszamani authored Feb 2, 2023
2 parents 615f8cb + 9bb5e7e commit 7474e46
Show file tree
Hide file tree
Showing 21 changed files with 253 additions and 859 deletions.
4 changes: 2 additions & 2 deletions flink_jobs_v2/ApiResourceManager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
<parent>
<groupId>flink.jobs.v2</groupId>
<artifactId>flink_jobs_v2</artifactId>
<version>2.1.0</version>
<version>2.1.1</version>
</parent>
<version>2.1.0</version>
<version>2.1.1</version>
<groupId>api.resource.manager</groupId>
<artifactId>ApiResourceManager</artifactId>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package argo.amr;

public enum ApiResource {
CONFIG, OPS, METRIC, AGGREGATION, THRESHOLDS, TOPOENDPOINTS, TOPOGROUPS, WEIGHTS, DOWNTIMES, RECOMPUTATIONS, MTAGS
CONFIG, OPS, METRIC, AGGREGATION, THRESHOLDS, TOPOENDPOINTS, TOPOGROUPS, WEIGHTS, DOWNTIMES, RECOMPUTATIONS, MTAGS, TENANTFEED
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ApiResourceManager {
private String weightsID;
private RequestManager requestManager;
private ApiResponseParser apiResponseParser;
private boolean isCombined;
//private boolean verify;
//private int timeoutSec;

Expand All @@ -47,8 +48,8 @@ public ApiResourceManager(String endpoint, String token) {
this.reportID = "";
this.date = "";
this.weightsID = "";
this.tenant="";
this.egroup="";
this.tenant = "";
this.egroup = "";
this.requestManager = new RequestManager("", this.token);
this.apiResponseParser = new ApiResponseParser(this.reportName, this.metricID, this.aggregationID, this.opsID, this.threshID, this.tenant, this.egroup);
}
Expand Down Expand Up @@ -182,8 +183,6 @@ public String getEgroup() {
public void setEgroup(String egroup) {
this.egroup = egroup;
}



/**
* Retrieves the remote report configuration based on reportID main class
Expand Down Expand Up @@ -297,7 +296,6 @@ public void getRemoteDowntimes() {
String path = "https://%s/api/v2/downtimes?date=%s";
String fullURL = String.format(path, this.endpoint, this.date);
String content = this.requestManager.getResource(fullURL);

this.data.put(ApiResource.DOWNTIMES, this.apiResponseParser.getJsonData(content, false));

}
Expand Down Expand Up @@ -353,8 +351,8 @@ public void parseReport() {
this.opsID = this.apiResponseParser.getOpsID();
this.threshID = this.apiResponseParser.getThreshID();
this.reportName = this.apiResponseParser.getReportName();
this.tenant=this.apiResponseParser.getTenant();
this.egroup=this.apiResponseParser.getEgroup();
this.tenant = this.apiResponseParser.getTenant();
this.egroup = this.apiResponseParser.getEgroup();
}

/**
Expand Down Expand Up @@ -458,12 +456,43 @@ public MetricProfile[] getListMetrics() {
return rArr;
}

/**
* Retrieves the remote report configuration based on reportID main class
* attribute and stores the content in the enum map
*/
public void getRemoteTenantFeed() {
String path = "https://%s/api/v2/feeds/data";
String fullURL = String.format(path, this.endpoint);
String content = this.requestManager.getResource(fullURL);
if (content != null) {
this.data.put(ApiResource.TENANTFEED, this.apiResponseParser.getJsonData(content, true));
}
}

public String[] getListTenants() {
List<String> results = new ArrayList<String>();
if (!this.data.containsKey(ApiResource.TENANTFEED)) {
String[] rArr = new String[results.size()];
rArr = results.toArray(rArr);
return rArr;
}

String content = this.data.get(ApiResource.TENANTFEED);
results = this.apiResponseParser.getListTenants(content);
String[] rArr = new String[results.size()];
rArr = results.toArray(rArr);
return rArr;
}

/**
* Executes all steps to retrieve the complete amount of the available
* profile, topology, weights and downtime information from argo-web-api
*/
public void getRemoteAll() {
// Start with report and configuration
if (isCombined) {
this.getRemoteTenantFeed();
}
this.getRemoteConfig();
// parse remote report config to be able to get the other profiles

Expand All @@ -489,4 +518,12 @@ public void getRemoteAll() {
this.getRemoteMetricTags();
}

public boolean isIsCombined() {
return isCombined;
}

public void setIsCombined(boolean isCombined) {
this.isCombined = isCombined;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -103,13 +103,11 @@ public void setEgroup(String egroup) {
this.egroup = egroup;
}


/**
* Extract first JSON item from data JSON array in api response
*
* @param content JSON content of the full repsonse (status + data)
* @return First available item in data array as JSON string representation
*
*/
public String getJsonData(String content, boolean asArray) {

Expand All @@ -118,10 +116,17 @@ public String getJsonData(String content, boolean asArray) {
JsonElement jElement = jsonParser.parse(content);
JsonObject jRoot = jElement.getAsJsonObject();
// Get the data array and the first item

if (asArray) {
if (jRoot.get("data") == null) {
return null;
}
return jRoot.get("data").toString();
}
JsonArray jData = jRoot.get("data").getAsJsonArray();
if (!jData.iterator().hasNext()) {
return null;
}
JsonElement jItem = jData.get(0);
return jItem.toString();
}
Expand Down Expand Up @@ -163,6 +168,21 @@ public void parseReport(String content) {

}

public List<String> getListTenants(String content) {
List<String> results = new ArrayList<String>();

JsonParser jsonParser = new JsonParser();
JsonElement jElement = jsonParser.parse(content);
JsonArray jArray = jElement.getAsJsonArray();
JsonObject jRoot = jArray.get(0).getAsJsonObject();
JsonArray tenants = jRoot.get("tenants").getAsJsonArray();
for (int i = 0; i < tenants.size(); i++) {
String jItem = tenants.get(i).getAsString();
results.add(jItem);
}
return results;
}

/**
* Parses the Downtime content retrieved from argo-web-api and provides a
* list of Downtime avro objects to be used in the next steps of the
Expand Down
4 changes: 2 additions & 2 deletions flink_jobs_v2/ProfilesManager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
<parent>
<groupId>flink.jobs.v2</groupId>
<artifactId>flink_jobs_v2</artifactId>
<version>2.1.0</version>
<version>2.1.1</version>
</parent>
<groupId>profiles.manager</groupId>
<artifactId>ProfilesManager</artifactId>
<version>2.1.0</version>
<version>2.1.1</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
Expand Down
4 changes: 2 additions & 2 deletions flink_jobs_v2/Timelines/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
<parent>
<groupId>flink.jobs.v2</groupId>
<artifactId>flink_jobs_v2</artifactId>
<version>2.1.0</version>
<version>2.1.1</version>
</parent>
<groupId>timeline.manager</groupId>
<artifactId>Timelines</artifactId>
<version>2.1.0</version>
<version>2.1.1</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
Expand Down
4 changes: 2 additions & 2 deletions flink_jobs_v2/ams-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
<parent>
<groupId>flink.jobs.v2</groupId>
<artifactId>flink_jobs_v2</artifactId>
<version>2.1.0</version>
<version>2.1.1</version>
</parent>

<groupId>ams.connector</groupId>
<version>2.1.0</version>
<version>2.1.1</version>
<name>ams.connector</name>
<description>Connect to AMS</description>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class ArgoMessagingSource extends RichSourceFunction<String> {
private boolean useProxy = false;
private String proxyURL = "";
private transient Object rateLck; // lock for waiting to establish rate
private boolean advanceOffset = true;


private volatile boolean isRunning = true;

Expand All @@ -49,6 +51,21 @@ public ArgoMessagingSource(String endpoint, String port, String token, String pr
this.runDate=runDate;

}

// second constructor with advanceOffset parametter
public ArgoMessagingSource(String endpoint, String port, String token, String project, String sub, int batch, Long interval, String runDate, boolean advanceOffset) {
this.endpoint = endpoint;
this.port = port;
this.token = token;
this.project = project;
this.sub = sub;
this.interval = interval;
this.batch = batch;
this.verify = true;
this.runDate=runDate;
this.advanceOffset = advanceOffset;

}

/**
* Set verify to true or false. If set to false AMS client will be able to contact AMS endpoints that use self-signed certificates
Expand Down Expand Up @@ -115,8 +132,16 @@ public void open(Configuration parameters) throws Exception {
if (this.useProxy) {
client.setProxy(this.proxyURL);
}
int offset=client.offset(); //get the offset of the subscription, that corresponds to the date
client.modifyOffset(offset); //mofify the offset of the subscription to point to the offset index of the date. if date is null then the index points to the latest offset (max)

// if advanceOffset is set to true (default) advance the offset to latest or based to the run date provided
if (advanceOffset) {
// get the offset of the subscription, that corresponds to the date
int offset=client.offset();
// mofify the offset of the subscription to point to the offset index of the date.
// if date is null then the index points to the latest offset (max)
client.modifyOffset(offset);
}

} catch (KeyManagementException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
Expand Down
6 changes: 3 additions & 3 deletions flink_jobs_v2/ams_ingest_metric/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ language governing permissions and limitations under the License. -->

<groupId>argo.streaming</groupId>
<artifactId>ams-ingest-metric</artifactId>
<version>2.1.0</version>
<version>2.1.1</version>
<packaging>jar</packaging>

<name>ARGO AMS Ingest Metric Data job</name>
Expand Down Expand Up @@ -63,7 +63,7 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>ams.connector</groupId>
<artifactId>ams-connector</artifactId>
<version>2.1.0</version>
<version>2.1.1</version>
<type>jar</type>
</dependency>
<dependency>
Expand Down Expand Up @@ -382,4 +382,4 @@ language governing permissions and limitations under the License. -->
</configuration> </plugin> </plugins> </pluginManagement> -->

</build>
</project>
</project>
Loading

0 comments on commit 7474e46

Please sign in to comment.