Skip to content

Commit

Permalink
Renamed batchid.py to jobid.py. added test_jobid.py
Browse files Browse the repository at this point in the history
  • Loading branch information
hailihu@gmail.com authored and hailihu@gmail.com committed Sep 23, 2024
1 parent c1e11b2 commit 81203f0
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 21 deletions.
5 changes: 2 additions & 3 deletions picas/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def cleanup_env(self, *args, **kwargs):
Method which gets called after the run method has completed.
"""


class RunActor(AbstractRunActor):
"""
RunActor class with added stopping functionality.
Expand Down Expand Up @@ -201,8 +201,7 @@ def run(self, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=Non

logging.debug("Tasks executed: ", self.tasks_processed)

if (stop_function is not None and
stop_function(**stop_function_args)):
if (stop_function is not None and stop_function(**stop_function_args)):
break

# break if number of tasks processed is max set
Expand Down
6 changes: 3 additions & 3 deletions picas/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import traceback
from uuid import uuid4

from . import batchid
from . import jobid
from .util import merge_dicts, seconds


Expand Down Expand Up @@ -202,7 +202,7 @@ def __init__(self, task=None):
def lock(self):
"""Function which modifies the task such that it is locked."""
self.doc['lock'] = seconds()
batchid.add_batch_management_id(self.doc)
jobid.add_job_id(self.doc)
return self._update_hostname()

def done(self):
Expand Down Expand Up @@ -253,7 +253,7 @@ def scrub(self):
self.doc['scrub_count'] += 1
self.doc['done'] = 0
self.doc['lock'] = 0
batchid.remove_batch_management_id(self.doc)
jobid.remove_job_id(self.doc)
return self._update_hostname()

def error(self, msg=None, exception=None):
Expand Down
1 change: 0 additions & 1 deletion picas/executers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def execute(args, shell=False):
return (proc, proc.returncode, stdout, stderr)



def execute_old(cmd):
"""Helper function to execute an external application.
@param cmd: the command to be executed.
Expand Down
16 changes: 8 additions & 8 deletions picas/batchid.py → picas/jobid.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
from os import environ


def add_batch_management_id(doc):
def add_job_id(doc):
"""
Add job number id of the batch system to a token/document
Adds information of the highest level of batch system,
Add job number id to a token/document. For batch jobs,
adds information of the highest level of batch system,
since job submision systems may be layered e.g:
A glite wms system makes underneath use of a cream system which makes use
of PBS. I such a case only the glite wms id instead of all of them.
of PBS. I such a case only the glite wms id instead of all of them.
"""
dirac_jobid = environ.get("DIRACJOBID")
slurm_jobid = environ.get("SLURM_JOB_ID")
Expand All @@ -23,7 +23,7 @@ def add_batch_management_id(doc):
if slurm_jobid is not None:
doc["slurm_job_id"] = slurm_jobid
if dirac_jobid is not None:
doc["dirac_job_id"] = dirac_jobid
doc["dirac_job_id"] = dirac_jobid
elif wms_jobid is not None:
if not wms_jobid.startswith("http"):
wms_jobid = None
Expand All @@ -34,14 +34,14 @@ def add_batch_management_id(doc):
doc["pbs_job_id"] = pbs_jobid


def remove_batch_management_id(doc):
def remove_job_id(doc):
"""
removes all batch id from doc/token
removes all job id from doc/token
"""
if "slurm_job_id" in doc:
del doc["slurm_job_id"]
if "dirac_job_id" in doc:
del doc["dirac_job_id"]
del doc["dirac_job_id"]
if "wms_job_id" in doc:
del doc["wms_job_id"]
if "cream_job_id" in doc:
Expand Down
9 changes: 4 additions & 5 deletions picas/modifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import socket
import time

from os import environ
from . import batchid
from . import jobid


class TokenModifier:
Expand Down Expand Up @@ -70,7 +69,7 @@ def lock(self, token):
}

# try to include job id if present
batchid.add_batch_management_id(token)
jobid.add_job_id(token)

token.update(lock_content)
return token
Expand All @@ -85,7 +84,7 @@ def unlock(self, token):
'hostname': socket.gethostname(),
'lock': 0
}
batchid.remove_batch_management_id(token)
jobid.remove_job_id(token)

token.update(lock_content)
return token
Expand Down Expand Up @@ -113,7 +112,7 @@ def unclose(self, token):
'done': 0
}
token.update(done_content)
batchid.remove_batch_management_id(token)
jobid.remove_job_id(token)
return token

def add_output(self, token, output):
Expand Down
37 changes: 37 additions & 0 deletions tests/test_jobid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import unittest
from os import environ

from picas.jobid import add_job_id, remove_job_id
from picas.documents import Document


class TestJobid(unittest.TestCase):

def setUp(self):
self.doc = Document()
self.env_vars = {
"DIRACJOBID": "dirac_job_id",
"SLURM_JOB_ID": "slurm_job_id",
"GLITE_WMS_JOBID": "wms_job_id",
"CREAM_JOBID": "cream_job_id",
"PBS_JOBID": "pbs_job_id"
}

def test_add_job_id(self):
""" Test if job_id is added to token"""
for test in self.env_vars:
environ[test] = "http/jobid"
add_job_id(self.doc)
self.assertTrue(self.doc[self.env_vars[test]] == "http/jobid")
environ.pop(test)

environ["GLITE_WMS_JOBID"] = "jobid"
add_job_id(self.doc)
self.assertTrue(self.doc["wms_job_id"] is None)

def test_remove_job_id(self):
""" Test if job_id is removed from token"""
for test in self.env_vars:
self.doc[self.env_vars[test]] = "jobid"
remove_job_id(self.doc)
self.assertRaises(KeyError, self.doc.__getitem__, self.env_vars[test])
1 change: 0 additions & 1 deletion tests/test_modifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def test_lock(self):
self.modifier.lock(self.token)
self.assertTrue(self.token['hostname'] != "")
self.assertTrue(self.token['lock'] > 0)
self.assertTrue(self.token['dirac_jobid'] is None)

def test_unlock(self):
self.modifier.unlock(self.token)
Expand Down

0 comments on commit 81203f0

Please sign in to comment.