diff --git a/fedcloudclient/secret.py b/fedcloudclient/secret.py index 0010e69..9e7cf9c 100644 --- a/fedcloudclient/secret.py +++ b/fedcloudclient/secret.py @@ -4,6 +4,7 @@ import base64 import json import os +import sys import click import hvac @@ -72,33 +73,40 @@ def read_data_from_file(input_format, input_file): """ if input_format is None or input_format == "auto-detect": - if input_file.endswith(".yaml"): - input_format = "yaml" - elif input_file.endswith(".json"): + if input_file.endswith(".json"): input_format = "json" else: + # default format input_format = "yaml" try: + + # read text/binary files to strings + if input_format == "binary": + with open(input_file, "rb") as f: + return base64.b64encode(f.read()).decode() + if input_format == "text": + with open(input_file, "r") as f: + return f.read() + + # reading YAML or JSON to dict with open(input_file) as f: if input_format == "yaml": data = yaml.safe_load(f) elif input_format == "json": data = json.load(f) - else: - data = f.read() - if input_format in ("yaml", "json"): - data = dict(data) + return dict(data) + except (ValueError, FileNotFoundError, YAMLError) as e: raise SystemExit( - f"Error: Error when reading file {input_file}. Error message: {e}" + f"Error: Error when reading file {input_file}. Error message: {type(e).__name__}: {e}" ) - return data -def secret_params_to_dict(params): +def secret_params_to_dict(params, binary_file=False): """ Convert secret params "key=value" to dict {"key":"value"} + :param binary_file: if reading files as binary :param params: input string in format "key=value" :return: dict {"key":"value"} """ @@ -122,7 +130,10 @@ def secret_params_to_dict(params): f"Error: Expecting 'key=value' arguments for secrets. '{param}' provided." ) if value.startswith("@"): - value = read_data_from_file("text", value[1:]) + if binary_file: + value = read_data_from_file("binary", value[1:]) + else: + value = read_data_from_file("text", value[1:]) result[key] = value return result @@ -177,25 +188,57 @@ def decrypt_data(decrypt_key, secrets): raise SystemExit(f"Error: Error during decryption. {e}") -def print_secrets(output_format, secrets): +def print_secrets(output_file, output_format, secrets): """ Print secrets in different formats + :param output_file: :param output_format: :param secrets: :return: """ - if output_format == "JSON": - print(json.dumps(secrets, indent=4)) - elif output_format == "YAML": - print(yaml.dump(secrets, sort_keys=False)) - else: - print(tabulate(secrets.items(), headers=["key", "value"])) + + try: + with open(output_file, "wt") if output_file else sys.stdout as f: + if output_format == "JSON": + json.dump(secrets, f, indent=4) + elif output_format == "YAML": + yaml.dump(secrets, f, sort_keys=False) + else: + print(tabulate(secrets.items(), headers=["key", "value"]), file=f) + + except (ValueError, FileNotFoundError, YAMLError) as e: + raise SystemExit( + f"Error: Error when writing file {output_file}. Error message: {type(e).__name__}: {e}" + ) + + +def print_value(output_file, binary_file, value): + """ + Print secrets in different formats + :param output_file: + :param binary_file: + :param value: + :return: + """ + + try: + if binary_file: + with open(output_file, "wb") if output_file else sys.stdout.buffer as f: + f.write(base64.b64decode(value.encode())) + else: + with open(output_file, "wt") if output_file else sys.stdout as f: + f.write(value) + + except (ValueError, FileNotFoundError, TypeError) as e: + raise SystemExit( + f"Error: Error when writing file {output_file}. Error message: {type(e).__name__}: {e}" + ) @click.group() def secret(): """ - Commands for accessing secrets + Commands for accessing secret objects """ @@ -205,30 +248,53 @@ def secret(): "--output-format", "-f", required=False, + help="Output format", type=click.Choice(["text", "YAML", "JSON"], case_sensitive=False), ) -@click.option("--decrypt-key", required=False) @click.argument("short_path", metavar="[secret path]") @click.argument("key", metavar="[key]", required=False) +@click.option( + "--decrypt-key", + "-d", + metavar="[key]", + required=False, + help="Decryption key or passphrase", +) +@click.option( + "--binary-file", + "-b", + required=False, + is_flag=True, + help="True for writing secrets to binary files", +) +@click.option( + "--output-file", + "-o", + metavar="[filename]", + required=False, + help="Name of output file", +) def get( access_token, short_path, key, output_format, decrypt_key, + binary_file, + output_file, ): """ - Get a secret from the path. If a key is given, print only the value of the key + Get the secret object in the path. If a key is given, print only the value of the key """ response = secret_client(access_token, "read_secret", short_path, None) if decrypt_key: decrypt_data(decrypt_key, response["data"]) if not key: - print_secrets(output_format, response["data"]) + print_secrets(output_file, output_format, response["data"]) else: if key in response["data"]: - print(response["data"][key]) + print_value(output_file, binary_file, response["data"][key]) else: raise SystemExit(f"Error: {key} not found in {short_path}") @@ -241,7 +307,7 @@ def list_( short_path, ): """ - List secrets in the path + List secret objects in the path """ response = secret_client(access_token, "list_secrets", short_path, None) @@ -252,18 +318,32 @@ def list_( @oidc_params @click.argument("short_path", metavar="[secret path]") @click.argument("secrets", nargs=-1, metavar="[key=value...]") -@click.option("--encrypt-key", required=False) +@click.option( + "--encrypt-key", + "-e", + metavar="[key]", + required=False, + help="Encryption key or passphrase", +) +@click.option( + "--binary-file", + "-b", + required=False, + is_flag=True, + help="True for reading secrets from binary files", +) def put( access_token, short_path, secrets, encrypt_key, + binary_file, ): """ - Put a secret to the path. Secrets are provided in form key=value + Put a secret object to the path. Secrets are provided in form key=value """ - secret_dict = secret_params_to_dict(secrets) + secret_dict = secret_params_to_dict(secrets, binary_file) if encrypt_key: encrypt_data(encrypt_key, secret_dict) secret_client(access_token, "put", short_path, secret_dict) @@ -277,7 +357,7 @@ def delete( short_path, ): """ - Delete the secret in the path + Delete the secret object in the path """ secret_client(access_token, "delete_secret", short_path, None)