Skip to content

Commit

Permalink
Add metacat-rucio-to-sam
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew-McNab-UK committed Aug 1, 2023
1 parent dcaa384 commit 4d8413d
Showing 1 changed file with 193 additions and 4 deletions.
197 changes: 193 additions & 4 deletions testing/metacat-rucio-to-sam
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,178 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Set up your environment with something like this:
source /cvmfs/dune.opensciencegrid.org/products/dune/setup_dune.sh
setup rucio
setup sam_web_client
export SAM_EXPERIMENT=dune
setup metacat
metacat auth login -m x509 dunepro
"""

import sys
import copy
import json
import datetime
import argparse
import subprocess
import rucio.client

def getListFromSAM(samQuery):
try:
outcome = subprocess.run('samweb list-files "%s" ' % samQuery,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
encoding='utf-8'
)
except Exception as e:
print('samweb list-files fails with ' + str(e))
return None

return outcome.stdout.splitlines()

def getListFromMetaCat(mql):
try:
outcome = subprocess.run('metacat query --json --metadata all "%s" ' % mql,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
encoding='utf-8'
)
filesList = json.loads(outcome.stdout)
except Exception as e:
print('metacat query fails with ' + str(e))
return None

return filesList

def makeMetadataForSAM(metaCatFile):

samMetadata = copy.deepcopy(metaCatFile['metadata'])

# From pdjson2metadata
coreAttributes = {
"event_count" : "core.event_count",
"file_type" : "core.file_type",
"file_format" : "core.file_format",
"data_tier" : "core.data_tier",
"data_stream" : "core.data_stream",
"events" : "core.events",
"first_event" : "core.first_event_number",
"last_event" : "core.last_event_number",
"event_count" : "core.event_count"
}

# Rename MetaCat core attributes
for sk in coreAttributes:
if coreAttributes[sk] in samMetadata:
samMetadata[sk] = samMetadata[coreAttributes[sk]]
samMetadata.pop(coreAttributes[sk], None)

# Set dune.* and dune_mc.* to uppercase
for sk in samMetadata:
if sk.startswith('dune.') or sk.startswith('dune_mc.'):
(prefix, suffix) = sk.split('.', 1)
samMetadata[prefix.upper() + '.' + suffix] = samMetadata[sk]
samMetadata.pop(sk, None)

samMetadata.pop('DUNE.workflow', None)

try:
samMetadata['application'] = {
"name" : samMetadata['core.application.name'],
"family" : samMetadata['core.application.family'],
"version" : samMetadata['core.application.version'] }
except:
pass

for mk in ['core.application.name', 'core.application.family',
'core.application.version', 'core.application' ]:
samMetadata.pop(mk, None)

samMetadata['runs'] = []

for subrunStr in samMetadata['core.runs_subruns']:
subrun = int(subrunStr)
run = int(subrun / 100000)
sub = subrun - 100000 * run
samMetadata['runs'].append([run, sub, samMetadata['core.run_type']])

samMetadata.pop('core.run_type', None)
samMetadata.pop('core.runs', None)
samMetadata.pop('core.runs_subruns', None)

if 'checksums' in metaCatFile and 'adler32' in metaCatFile['checksums']:
samMetadata['checksum'] = \
[ 'adler32:' + metaCatFile['checksums']['adler32'] ]

samMetadata['file_name'] = metaCatFile['name']
samMetadata['file_size'] = metaCatFile['size']
samMetadata['user'] = metaCatFile['creator']
samMetadata['group'] = 'dune'
samMetadata.pop('core.user', None)
samMetadata.pop('core.group', None)

for sk in ['start_time', 'end_time']:
if sk in samMetadata:
samMetadata[sk] = datetime.datetime.fromtimestamp(
samMetadata['core.' + sk],
datetime.timezone.utc).isoformat('T')
samMetadata.pop('core.' + sk, None)

return samMetadata

def processMissingFiles(number, samList, metaCatList, repClient):

numberDeclared = 0

for metaCatFile in metaCatList:
if metaCatFile['name'] in samList:
# SAM already has that file, so skip
continue

try:
rucioFilesList = repClient.list_replicas(
[ { 'scope' : metaCatFile['namespace'],
'name' : metaCatFile['name']
} ],
schemes=['root'],
domain='wan')
rucioFile = next(rucioFilesList)
except Exception as e:
print('Skip %s:%s - failed to get replicas: %s'
% (metaCatFile['namespace'], metaCatFile['name'], str(e)))
continue

samMetadata = makeMetadataForSAM(metaCatFile)
if not samMetadata:
print('Failed converting metadata for %s:%s - skipping'
% (metaCatFile['namespace'], metaCatFile['name']))
continue

print('=========================================')
print(json.dumps(samMetadata, indent = 4))
print('=========================================')

with open('m-r-t-s.json', 'w') as f:
f.write(json.dumps(samMetadata, indent = 4) + '\n')

print('samweb declare-file m-r-t-s.json')

for pfn in rucioFile['pfns']:
if not 'tape_backed' in pfn:
print('samweb add-file-location %s %s' % (metaCatFile['name'], pfn))

numberDeclared += 1
if numberDeclared >= number:
break

print('\n%d files declared to SAM\n' % numberDeclared)

#
# PROGRAM MAIN !!!
Expand All @@ -36,7 +205,7 @@ parser.add_argument("--sam-query",
type = str,
help = "SAM query")

parser.add_argument("--count",
parser.add_argument("--number",
type = int,
help = "SAM declares per invocation")

Expand All @@ -53,8 +222,28 @@ if not args['sam_query']:
print('--sam-query missing', file=sys.stderr)
sys.exit(1)

if not args['count']:
print('--count missing', file=sys.stderr)
if not args['number']:
print('--number missing', file=sys.stderr)
sys.exit(1)

samList = getListFromSAM(args['sam_query'])
if not samList:
sys.exit(1)
#print(samList)

metaCatList = getListFromMetaCat(args['mql'])
if not metaCatList:
sys.exit(1)
#print(metaCatList)

# Connect to Rucio
try:
repClient = rucio.client.replicaclient.ReplicaClient()
except Exception as e:
print("Connect to Rucio fails with: " + str(e))
sys.exit(2)

print(args)
sys.exit(processMissingFiles(args['number'],
samList,
metaCatList,
repClient))

0 comments on commit 4d8413d

Please sign in to comment.