diff --git a/testing/metacat-rucio-to-sam b/testing/metacat-rucio-to-sam index 644b84d..96be360 100755 --- a/testing/metacat-rucio-to-sam +++ b/testing/metacat-rucio-to-sam @@ -17,9 +17,178 @@ # See the License for the specific language governing permissions and # limitations under the License. # +""" + +Set up your environment with something like this: + +source /cvmfs/dune.opensciencegrid.org/products/dune/setup_dune.sh +setup rucio +setup sam_web_client +export SAM_EXPERIMENT=dune +setup metacat +metacat auth login -m x509 dunepro + +""" import sys +import copy +import json +import datetime import argparse +import subprocess +import rucio.client + +def getListFromSAM(samQuery): + try: + outcome = subprocess.run('samweb list-files "%s" ' % samQuery, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=True, + encoding='utf-8' + ) + except Exception as e: + print('samweb list-files fails with ' + str(e)) + return None + + return outcome.stdout.splitlines() + +def getListFromMetaCat(mql): + try: + outcome = subprocess.run('metacat query --json --metadata all "%s" ' % mql, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=True, + encoding='utf-8' + ) + filesList = json.loads(outcome.stdout) + except Exception as e: + print('metacat query fails with ' + str(e)) + return None + + return filesList + +def makeMetadataForSAM(metaCatFile): + + samMetadata = copy.deepcopy(metaCatFile['metadata']) + + # From pdjson2metadata + coreAttributes = { + "event_count" : "core.event_count", + "file_type" : "core.file_type", + "file_format" : "core.file_format", + "data_tier" : "core.data_tier", + "data_stream" : "core.data_stream", + "events" : "core.events", + "first_event" : "core.first_event_number", + "last_event" : "core.last_event_number", + "event_count" : "core.event_count" + } + + # Rename MetaCat core attributes + for sk in coreAttributes: + if coreAttributes[sk] in samMetadata: + samMetadata[sk] = samMetadata[coreAttributes[sk]] + samMetadata.pop(coreAttributes[sk], None) + + # Set dune.* and dune_mc.* to uppercase + for sk in samMetadata: + if sk.startswith('dune.') or sk.startswith('dune_mc.'): + (prefix, suffix) = sk.split('.', 1) + samMetadata[prefix.upper() + '.' + suffix] = samMetadata[sk] + samMetadata.pop(sk, None) + + samMetadata.pop('DUNE.workflow', None) + + try: + samMetadata['application'] = { + "name" : samMetadata['core.application.name'], + "family" : samMetadata['core.application.family'], + "version" : samMetadata['core.application.version'] } + except: + pass + + for mk in ['core.application.name', 'core.application.family', + 'core.application.version', 'core.application' ]: + samMetadata.pop(mk, None) + + samMetadata['runs'] = [] + + for subrunStr in samMetadata['core.runs_subruns']: + subrun = int(subrunStr) + run = int(subrun / 100000) + sub = subrun - 100000 * run + samMetadata['runs'].append([run, sub, samMetadata['core.run_type']]) + + samMetadata.pop('core.run_type', None) + samMetadata.pop('core.runs', None) + samMetadata.pop('core.runs_subruns', None) + + if 'checksums' in metaCatFile and 'adler32' in metaCatFile['checksums']: + samMetadata['checksum'] = \ + [ 'adler32:' + metaCatFile['checksums']['adler32'] ] + + samMetadata['file_name'] = metaCatFile['name'] + samMetadata['file_size'] = metaCatFile['size'] + samMetadata['user'] = metaCatFile['creator'] + samMetadata['group'] = 'dune' + samMetadata.pop('core.user', None) + samMetadata.pop('core.group', None) + + for sk in ['start_time', 'end_time']: + if sk in samMetadata: + samMetadata[sk] = datetime.datetime.fromtimestamp( + samMetadata['core.' + sk], + datetime.timezone.utc).isoformat('T') + samMetadata.pop('core.' + sk, None) + + return samMetadata + +def processMissingFiles(number, samList, metaCatList, repClient): + + numberDeclared = 0 + + for metaCatFile in metaCatList: + if metaCatFile['name'] in samList: + # SAM already has that file, so skip + continue + + try: + rucioFilesList = repClient.list_replicas( + [ { 'scope' : metaCatFile['namespace'], + 'name' : metaCatFile['name'] + } ], + schemes=['root'], + domain='wan') + rucioFile = next(rucioFilesList) + except Exception as e: + print('Skip %s:%s - failed to get replicas: %s' + % (metaCatFile['namespace'], metaCatFile['name'], str(e))) + continue + + samMetadata = makeMetadataForSAM(metaCatFile) + if not samMetadata: + print('Failed converting metadata for %s:%s - skipping' + % (metaCatFile['namespace'], metaCatFile['name'])) + continue + + print('=========================================') + print(json.dumps(samMetadata, indent = 4)) + print('=========================================') + + with open('m-r-t-s.json', 'w') as f: + f.write(json.dumps(samMetadata, indent = 4) + '\n') + + print('samweb declare-file m-r-t-s.json') + + for pfn in rucioFile['pfns']: + if not 'tape_backed' in pfn: + print('samweb add-file-location %s %s' % (metaCatFile['name'], pfn)) + + numberDeclared += 1 + if numberDeclared >= number: + break + + print('\n%d files declared to SAM\n' % numberDeclared) # # PROGRAM MAIN !!! @@ -36,7 +205,7 @@ parser.add_argument("--sam-query", type = str, help = "SAM query") -parser.add_argument("--count", +parser.add_argument("--number", type = int, help = "SAM declares per invocation") @@ -53,8 +222,28 @@ if not args['sam_query']: print('--sam-query missing', file=sys.stderr) sys.exit(1) -if not args['count']: - print('--count missing', file=sys.stderr) +if not args['number']: + print('--number missing', file=sys.stderr) + sys.exit(1) + +samList = getListFromSAM(args['sam_query']) +if not samList: sys.exit(1) +#print(samList) + +metaCatList = getListFromMetaCat(args['mql']) +if not metaCatList: + sys.exit(1) +#print(metaCatList) + +# Connect to Rucio +try: + repClient = rucio.client.replicaclient.ReplicaClient() +except Exception as e: + print("Connect to Rucio fails with: " + str(e)) + sys.exit(2) -print(args) +sys.exit(processMissingFiles(args['number'], + samList, + metaCatList, + repClient))