Skip to content

Commit

Permalink
Merge pull request #339 from ARGOeu/devel
Browse files Browse the repository at this point in the history
Version 2.0.0
  • Loading branch information
themiszamani authored Jun 29, 2022
2 parents 10fb722 + 9eb88eb commit 0542a72
Show file tree
Hide file tree
Showing 494 changed files with 58,779 additions and 3,473 deletions.
3 changes: 2 additions & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ pipeline {
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs/batch_status/pom.xml
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs/ams_ingest_metric/pom.xml
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs/ams_ingest_sync/pom.xml
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs/status_trends/pom.xml
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs/status_trends/pom.xml
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs_v2/pom.xml
"""
junit '**/target/surefire-reports/*.xml'
cobertura coberturaReportFile: '**/target/site/cobertura/coverage.xml'
Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,4 +446,19 @@ Job requires parameters:
`--proxy` : (optional) proxy url
`--clearMongo` : (optional) defines if the collections in mongo will be cleared from previous documents or not. if false or is missing collection will remain as it is

## Flink Jobs V2


Flink Jobs V2 exists as a parent project , that contains module projects that are able to share common code and also run independently.
In order to submit each job to flink the projects need to be prepared as jar files.
Firstly need to install all project dependencies by:
- `cd flink_jobs_v2`
- `mvn clean install`
Secondly for each job to generate the jar:
- `mvn clean && mvn package`
Finally run the jar in flink with the required job parameters as described previously in README.md





25 changes: 13 additions & 12 deletions bin/ar_job_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
import os
import argparse
import datetime
from datetime import datetime, date, timedelta
from snakebite.client import Client
import logging
from urllib.parse import urlparse
Expand Down Expand Up @@ -40,7 +40,7 @@ def compose_hdfs_commands(year, month, day, args, config):

hdfs_user = config.get("HDFS", "user")
tenant = args.tenant


hdfs_metric = config.get("HDFS", "path_metric")

Expand All @@ -52,7 +52,7 @@ def compose_hdfs_commands(year, month, day, args, config):

# file location of previous day's metric data (local or hdfs)
hdfs_commands["--pdata"] = hdfs_check_path(
hdfs_metric + "/" + str(datetime.date(year, month, day) - datetime.timedelta(1)), client)
hdfs_metric + "/" + str(date(year, month, day) - timedelta(1)), client)

# file location of target day's metric data (local or hdfs)
hdfs_commands["--mdata"] = hdfs_check_path(
Expand All @@ -68,8 +68,8 @@ def compose_command(config, args, hdfs_commands, dry_run=False):
Args:
config (obj.): argo configuration object
args (dict): command line arguments of this script
hdfs_commands (list): a list of hdfs related arguments to be passed in flink job
dry_run (bool, optional): signifies a dry-run execution context, if yes no mongodb clean-up is perfomed.
hdfs_commands (list): a list of hdfs related arguments to be passed in flink job
dry_run (bool, optional): signifies a dry-run execution context, if yes no mongodb clean-up is perfomed.
Defaults to False.
Returns:
Expand Down Expand Up @@ -128,18 +128,20 @@ def compose_command(config, args, hdfs_commands, dry_run=False):
cmd_command.append("--api.endpoint")
cmd_command.append(api_endpoint.hostname)

# get the api token
# get the api token
cmd_command.append("--api.token")
cmd_command.append(config.get("API","access_token"))
cmd_command.append(config.get("API",args.tenant+"_key"))

# get report id

# get report id
cmd_command.append("--report.id")
cmd_command.append(config.get("TENANTS:"+args.tenant,"report_"+args.report))




# get optional api proxy
# get optional parameter for api proxy
proxy = config.get("API", "proxy")

if proxy is not None:
cmd_command.append("--api.proxy")
cmd_command.append(proxy.geturl())
Expand Down Expand Up @@ -170,7 +172,6 @@ def main(args=None):
hdfs_commands = compose_hdfs_commands(year, month, day, args, config)

cmd_command = compose_command(config, args, hdfs_commands, args.dry_run)

# submit the script's command
flink_job_submit(config, cmd_command, None, args.dry_run)

Expand Down Expand Up @@ -202,4 +203,4 @@ def main(args=None):
action="store_true", dest="dry_run")

# Pass the arguments to main method
sys.exit(main(parser.parse_args()))
sys.exit(main(parser.parse_args()))
216 changes: 216 additions & 0 deletions bin/multi_job_submit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#!/usr/bin/env python
import sys
import os
import argparse
from snakebite.client import Client

from datetime import datetime, date, timedelta
import logging
from urllib.parse import urlparse
from utils.argo_mongo import ArgoMongoClient
from utils.common import cmd_to_string, date_rollback, flink_job_submit, hdfs_check_path, get_log_conf, get_config_paths
from utils.update_profiles import ArgoProfileManager
from utils.argo_config import ArgoConfig


log = logging.getLogger(__name__)


def compose_hdfs_commands(year, month, day, args, config):
"""Checks hdfs for available files back in time and prepares the correct hdfs arguments
Args:
year (int): year part of the date to check for hdfs files
month (int): month part of the date to check for hdfs files
day (int): day part of the date to check for hdfs files
config (obj.): argo configuration object
Returns:
list: A list of all hdfs arguments to be used in flink job submission
"""

# set up the hdfs client to be used in order to check the files
namenode = config.get("HDFS", "namenode")
client = Client(namenode.hostname, namenode.port, use_trash=False)

# hdfs sync path for the tenant

hdfs_user = config.get("HDFS", "user")
tenant = args.tenant
hdfs_sync = config.get("HDFS", "path_sync")
hdfs_sync = hdfs_sync.fill(namenode=namenode.geturl(
), hdfs_user=hdfs_user, tenant=tenant).geturl()

hdfs_metric = config.get("HDFS", "path_metric")

hdfs_metric = hdfs_metric.fill(
namenode=namenode.geturl(), hdfs_user=hdfs_user, tenant=tenant).geturl()

# dictionary holding all the commands with their respective arguments' name
hdfs_commands = dict()

# file location of previous day's metric data (local or hdfs)
hdfs_commands["--pdata"] = hdfs_check_path(
hdfs_metric + "/" + str(date(year, month, day) - timedelta(1)), client)

# file location of target day's metric data (local or hdfs)
hdfs_commands["--mdata"] = hdfs_check_path(
hdfs_metric+"/"+args.date, client)

return hdfs_commands


def compose_command(config, args, hdfs_commands, dry_run=False):
"""Composes a command line execution string for submitting a flink job. Also calls mongodb
clean up procedure before composing the command
Args:
config (obj.): argo configuration object
args (dict): command line arguments of this script
hdfs_commands (list): a list of hdfs related arguments to be passed in flink job
dry_run (bool, optional): signifies a dry-run execution context, if yes no mongodb clean-up is perfomed.
Defaults to False.
Returns:
list: A list of all command line arguments for performing the flink job submission
"""

# job sumbission command
cmd_command = []

if args.sudo is True:
cmd_command.append("sudo")

# flink executable
cmd_command.append(config.get("FLINK", "path"))

cmd_command.append("run")

cmd_command.append("-c")

# Job's class inside the jar
cmd_command.append(config.get("CLASSES", "batch-multi"))

# jar to be sumbitted to flink
cmd_command.append(config.get("JARS", "batch-multi"))

# date the report will run for
cmd_command.append("--run.date")
cmd_command.append(args.date)

# MongoDB uri for outputting the results to (e.g. mongodb://localhost:21017/example_db)
cmd_command.append("--mongo.uri")
group_tenant = "TENANTS:" + args.tenant
mongo_endpoint = config.get("MONGO", "endpoint").geturl()
mongo_uri = config.get(group_tenant, "mongo_uri").fill(
mongo_endpoint=mongo_endpoint, tenant=args.tenant)
cmd_command.append(mongo_uri.geturl())

# if args.method == "insert" and dry_run == False:
# argo_mongo_client = ArgoMongoClient(args, config, ["status_metrics", "status_endpoints", "status_services",
# "status_endpoint_groups"])

# argo_mongo_client.mongo_clean_status(mongo_uri, dry_run)

# MongoDB method to be used when storing the results, either insert or upsert
cmd_command.append("--mongo.method")
cmd_command.append(args.method)

# add the hdfs commands
for command in hdfs_commands:
cmd_command.append(command)
cmd_command.append(hdfs_commands[command])

# get the api endpoint
api_endpoint = config.get("API","endpoint")
if api_endpoint:
cmd_command.append("--api.endpoint")
cmd_command.append(api_endpoint.hostname)

# get the api token
cmd_command.append("--api.token")
cmd_command.append(config.get("API",args.tenant + "_key"))

cmd_command.append("--report.id")
cmd_command.append(config.get("TENANTS:"+args.tenant,"report_"+args.report))

# get optional api proxy
proxy = config.get("API", "proxy")
if proxy is not None:
cmd_command.append("--api.proxy")
cmd_command.append(proxy.geturl())

# check clear
if args.clear_results is not None:
cmd_command.append("--clearMongo")
cmd_command.append("true")

# check what to compute
if args.calculate is not None:
if "ar" not in args.calculate:
cmd_command.append("--calcAR")
cmd_command.append("OFF")
if "status" not in args.calculate:
cmd_command.append("--calcStatus")
cmd_command.append("OFF")
if "trends" not in args.calculate:
cmd_command.append("--calcTrends")
cmd_command.append("OFF")
return cmd_command


def main(args=None):

# Get configuration paths
conf_paths = get_config_paths(args.config)

# Get logger config file
get_log_conf(conf_paths['log'])

# Get main configuration and schema
config = ArgoConfig(conf_paths["main"], conf_paths["schema"])

# check if configuration for the given tenant exists
if not config.has("TENANTS:"+args.tenant):
log.fatal("Tenant: "+args.tenant+" doesn't exist.")
sys.exit(1)

year, month, day = [int(x) for x in args.date.split("-")]

# dictionary containing the argument's name and the command associated with each name
hdfs_commands = compose_hdfs_commands(year, month, day, args, config)

cmd_command = compose_command(config, args, hdfs_commands, args.dry_run)

# submit the script's command
flink_job_submit(config, cmd_command, None, args.dry_run)


if __name__ == "__main__":

today = datetime.today().strftime('%Y-%m-%d')

parser = argparse.ArgumentParser(
description="Batch Status Job submit script")
parser.add_argument(
"-t", "--tenant", metavar="STRING", help="Name of the tenant", required=True, dest="tenant")
parser.add_argument(
"-r", "--report", metavar="STRING", help="Name of the report", required=True, dest="report")
parser.add_argument(
"-d", "--date", metavar="DATE(YYYY-MM-DD)", help="Date to run the job for", required=False, dest="date", default=today)
parser.add_argument(
"-m", "--method", metavar="KEYWORD(insert|upsert)", help="Insert or Upsert data in mongoDB", required=False, dest="method", default="insert")
parser.add_argument(
"-x", "--calculate", metavar="STRING", help="Comma separated list of what to calculate (ar,status,trends)", required=False, dest="calculate", default=None)
parser.add_argument(
"-c", "--config", metavar="PATH", help="Path for the config file", dest="config")
parser.add_argument(
"-u", "--sudo", help="Run the submit job as superuser", action="store_true")
parser.add_argument("--clear-prev-results", help="Clear previous results from datastore",
action="store_true", dest="clear_results")
parser.add_argument("--dry-run", help="Runs in test mode without actually submitting the job",
action="store_true", dest="dry_run")

# Pass the arguments to main method
sys.exit(main(parser.parse_args()))
23 changes: 11 additions & 12 deletions bin/status_job_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
import os
import argparse
import datetime
from datetime import datetime, date, timedelta
from snakebite.client import Client

import logging
Expand All @@ -12,7 +12,6 @@
from utils.update_profiles import ArgoProfileManager
from utils.argo_config import ArgoConfig

from datetime import datetime

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,7 +52,7 @@ def compose_hdfs_commands(year, month, day, args, config):

# file location of previous day's metric data (local or hdfs)
hdfs_commands["--pdata"] = hdfs_check_path(
hdfs_metric + "/" + str(datetime.date(year, month, day) - datetime.timedelta(1)), client)
hdfs_metric + "/" + str(date(year, month, day) - timedelta(1)), client)

# file location of target day's metric data (local or hdfs)
hdfs_commands["--mdata"] = hdfs_check_path(
Expand All @@ -69,8 +68,8 @@ def compose_command(config, args, hdfs_commands, dry_run=False):
Args:
config (obj.): argo configuration object
args (dict): command line arguments of this script
hdfs_commands (list): a list of hdfs related arguments to be passed in flink job
dry_run (bool, optional): signifies a dry-run execution context, if yes no mongodb clean-up is perfomed.
hdfs_commands (list): a list of hdfs related arguments to be passed in flink job
dry_run (bool, optional): signifies a dry-run execution context, if yes no mongodb clean-up is perfomed.
Defaults to False.
Returns:
Expand Down Expand Up @@ -129,12 +128,12 @@ def compose_command(config, args, hdfs_commands, dry_run=False):
cmd_command.append("--api.endpoint")
cmd_command.append(api_endpoint.hostname)

# get the api token
# get the api token
cmd_command.append("--api.token")
cmd_command.append(config.get("API","access_token"))
cmd_command.append(config.get("API",args.tenant + "_key"))

# get report id

# get report id

cmd_command.append("--report.id")
cmd_command.append(config.get("TENANTS:"+args.tenant,"report_"+args.report))

Expand Down Expand Up @@ -185,9 +184,9 @@ def main(args=None):
parser.add_argument(
"-r", "--report", metavar="STRING", help="Report status", required=True, dest="report")
parser.add_argument(
"-d", "--date", metavar="DATE(YYYY-MM-DD)", help="Date to run the job for", required=True, dest="date", default=today)
"-d", "--date", metavar="DATE(YYYY-MM-DD)", help="Date to run the job for", required=False, dest="date", default=today)
parser.add_argument(
"-m", "--method", metavar="KEYWORD(insert|upsert)", help="Insert or Upsert data in mongoDB", required=True, dest="method", default="insert")
"-m", "--method", metavar="KEYWORD(insert|upsert)", help="Insert or Upsert data in mongoDB", required=False, dest="method", default="insert")
parser.add_argument(
"-c", "--config", metavar="PATH", help="Path for the config file", dest="config")
parser.add_argument(
Expand All @@ -202,4 +201,4 @@ def main(args=None):
action="store_true", dest="dry_run")

# Pass the arguments to main method
sys.exit(main(parser.parse_args()))
sys.exit(main(parser.parse_args()))
Loading

0 comments on commit 0542a72

Please sign in to comment.