diff --git a/.github/workflows/lint_python.yml b/.github/workflows/lint_python.yml index 35063e54..51e3e032 100644 --- a/.github/workflows/lint_python.yml +++ b/.github/workflows/lint_python.yml @@ -8,7 +8,7 @@ jobs: - uses: actions/setup-python@v4 - run: pip install --upgrade pip setuptools wheel - run: pip install black codespell mypy pytest ruff safety - - run: ruff --format=github --ignore=E501,E701,E713,E722,F401,F403,F405,F841 --line-length=263 . + - run: ruff --output-format=github --ignore=E501,E701,E713,E722,F401,F403,F405,F841 --line-length=263 . - run: black --check . || true - run: codespell --ignore-words-list="datas" --skip="./.git/*" - run: pip install -r requirements.txt diff --git a/lib/doublepulsar.py b/lib/doublepulsar.py index 09baf382..c7605092 100644 --- a/lib/doublepulsar.py +++ b/lib/doublepulsar.py @@ -210,7 +210,7 @@ def check_ip_rdp(self): return False, "Status Unknown - Response received but length was %d not 288" % (len(ping_response)) s.close() - except socket.error as e: + except socket.error: return False, "No presence of DOUBLEPULSAR RDP implant" diff --git a/lib/helpers.py b/lib/helpers.py index 8bada6c6..da23a40c 100644 --- a/lib/helpers.py +++ b/lib/helpers.py @@ -8,7 +8,6 @@ import sys import hashlib import string -import struct import traceback import os import re @@ -16,7 +15,7 @@ try: from StringIO import StringIO except ImportError: - from io import StringIO + pass import netaddr import platform import time @@ -66,7 +65,7 @@ def generateHashes(filedata): sha1.update(filedata) sha256.update(filedata) return md5.hexdigest(), sha1.hexdigest(), sha256.hexdigest() - except Exception as e: + except Exception: traceback.print_exc() return 0, 0, 0 @@ -75,7 +74,7 @@ def getPlatformFull(): type_info = "" try: type_info = "%s PROC: %s ARCH: %s" % ( " ".join(platform.win32_ver()), platform.processor(), " ".join(platform.architecture())) - except Exception as e: + except Exception: type_info = " ".join(platform.win32_ver()) return type_info @@ -87,7 +86,7 @@ def setNice(logger): logger.log("INFO", "Init", "Setting LOKI process with PID: %s to priority IDLE" % pid) p.nice(psutil.IDLE_PRIORITY_CLASS) return 1 - except Exception as e: + except Exception: if logger.debug: traceback.print_exc() logger.log("ERROR", "Init", "Error setting nice value of THOR process") @@ -103,7 +102,7 @@ def getExcludedMountpoints(): if not options[0].startswith("/dev/"): if not options[1] == "/": excludes.append(options[1]) - except Exception as e: + except Exception: print ("Error while reading /etc/mtab") finally: mtab.close() @@ -174,7 +173,7 @@ def get_file_type(filePath, filetype_sigs, max_filetype_magics, logger): if res == bytes.fromhex(sig): return filetype_sigs[sig] return "UNKNOWN" - except Exception as e: + except Exception: if logger.debug: traceback.print_exc() return "UNKNOWN" @@ -187,10 +186,10 @@ def removeNonAscii(s, stripit=False): printable = set(string.printable) filtered_string = filter(lambda x: x in printable, s.decode('utf-8')) nonascii = ''.join(filtered_string) - except Exception as e: + except Exception: traceback.print_exc() nonascii = s.hex() - except Exception as e: + except Exception: traceback.print_exc() pass @@ -203,7 +202,7 @@ def removeNonAsciiDrop(s): # Generate a new string without disturbing characters printable = set(string.printable) nonascii = filter(lambda x: x in printable, s) - except Exception as e: + except Exception: traceback.print_exc() pass return nonascii @@ -220,7 +219,7 @@ def getAge(filePath): # Accessed atime=stats.st_atime - except Exception as e: + except Exception: # traceback.print_exc() return (0, 0, 0) @@ -232,7 +231,7 @@ def getAgeString(filePath): timestring = "" try: timestring = "CREATED: %s MODIFIED: %s ACCESSED: %s" % ( time.ctime(ctime), time.ctime(mtime), time.ctime(atime) ) - except Exception as e: + except Exception: timestring = "CREATED: not_available MODIFIED: not_available ACCESSED: not_available" return timestring @@ -269,7 +268,7 @@ def _kill_process_after_a_timeout(pid): watchdog.cancel() # if it's still waiting to run success = not kill_check.isSet() kill_check.clear() - except Exception as e: + except Exception: traceback.print_exc() return output, returnCode diff --git a/lib/lokilogger.py b/lib/lokilogger.py index 6d486a21..ddebd591 100644 --- a/lib/lokilogger.py +++ b/lib/lokilogger.py @@ -13,7 +13,6 @@ import logging from logging import handlers import socket -from .helpers import removeNonAsciiDrop __version__ = '0.51.0' @@ -101,7 +100,7 @@ def log(self, mes_type, module, message): # to stdout try: self.log_to_stdout(message, mes_type) - except Exception as e: + except Exception: print ("Cannot print certain characters to command line - see log file for full unicode encoded log line") self.log_to_stdout(message, mes_type) @@ -174,7 +173,7 @@ def log_to_stdout(self, message, mes_type): else: sys.stdout.write("%s%s\b\b%s %s%s%s%s\n" % (reset_all, base_color, mes_type, message, Back.BLACK,Fore.WHITE,Style.NORMAL)) - except Exception as e: + except Exception: if self.debug: traceback.print_exc() sys.exit(1) @@ -188,7 +187,7 @@ def log_to_file(self, message, mes_type, module): logfile.write(self.Format(self.FILE_CSV, u"{0},{1},{2},{3},{4}{5}", getSyslogTimestamp(), self.hostname, mes_type, module, message, self.linesep)) else: logfile.write(self.Format(self.FILE_LINE, u"{0} {1} LOKI: {2}: MODULE: {3} MESSAGE: {4}{5}", getSyslogTimestamp(), self.hostname, mes_type.title(), module, message, self.linesep)) - except Exception as e: + except Exception: if self.debug: traceback.print_exc() sys.exit(1) diff --git a/lib/pesieve.py b/lib/pesieve.py index 17ee0f1f..4b6d2117 100644 --- a/lib/pesieve.py +++ b/lib/pesieve.py @@ -3,7 +3,6 @@ # PE-Sieve Integration by @hasherezade import os -import sys import json import traceback @@ -66,10 +65,10 @@ def scan(self, pid, pesieveshellc = False): results_raw = json.loads(output) #results = results_raw["scan_report"]["scanned"]["modified"] results = results_raw["scanned"]["modified"] - except ValueError as v: + except ValueError: traceback.print_exc() self.logger.log("DEBUG", "PESieve", "Couldn't parse the JSON output.") - except Exception as e: + except Exception: traceback.print_exc() self.logger.log("ERROR", "PESieve", "Something went wrong during PE-Sieve scan.") return results diff --git a/lib/vuln_checker.py b/lib/vuln_checker.py index ebbe1047..752f6a7c 100644 --- a/lib/vuln_checker.py +++ b/lib/vuln_checker.py @@ -5,7 +5,6 @@ # Different vulnerability checks import subprocess -import traceback class VulnChecker(): @@ -27,11 +26,11 @@ def check_sam_readable(self): output = b'' try: output += subprocess.check_output([r'icacls.exe', r'C:\Windows\System32\config\sam'], stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as e: + except subprocess.CalledProcessError: pass try: output += subprocess.check_output([r'icacls.exe', r'C:\Windows\SysNative\config\sam'], stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as e: + except subprocess.CalledProcessError: pass # Check the output try: @@ -45,6 +44,6 @@ def check_sam_readable(self): return True else: self.logger.log("DEBUG", "VulnChecker", "SAM Database isn't readable by every user.") - except UnicodeDecodeError as e: + except UnicodeDecodeError: self.logger.log("ERROR", "VulnChecker", "Unicode decode error in SAM check") return False diff --git a/loki-upgrader.py b/loki-upgrader.py index d46724c2..03e58522 100755 --- a/loki-upgrader.py +++ b/loki-upgrader.py @@ -24,10 +24,8 @@ # Win32 Imports if _platform == "win32": try: - import wmi import win32api - from win32com.shell import shell - except Exception as e: + except Exception: platform = "linux" # crazy guess @@ -65,7 +63,7 @@ def needs_update(sig_url): file.write(sha) changed=True return changed - except Exception as e: + except Exception: return True @@ -94,7 +92,7 @@ def update_signatures(self, clean=False): try: self.logger.log("INFO", "Upgrader", "Downloading %s ..." % sig_url) response = urlopen(sig_url) - except Exception as e: + except Exception: if self.debug: traceback.print_exc() self.logger.log("ERROR", "Upgrader", "Error downloading the signature database - " @@ -111,7 +109,7 @@ def update_signatures(self, clean=False): fullOutDir = os.path.join(sigDir, outDir) if not os.path.exists(fullOutDir): os.makedirs(fullOutDir) - except Exception as e: + except Exception: if self.debug: traceback.print_exc() self.logger.log("ERROR", "Upgrader", "Error while creating the signature-base directories") @@ -157,7 +155,7 @@ def update_signatures(self, clean=False): target.close() source.close() - except Exception as e: + except Exception: if self.debug: traceback.print_exc() self.logger.log("ERROR", "Upgrader", "Error while extracting the signature files from the download " @@ -166,7 +164,7 @@ def update_signatures(self, clean=False): else: self.logger.log("INFO", "Upgrader", "%s is up to date." % sig_url) - except Exception as e: + except Exception: if self.debug: traceback.print_exc() return False @@ -185,7 +183,7 @@ def update_loki(self): zip_url = data['assets'][0]['browser_download_url'] self.logger.log("INFO", "Upgrader", "Downloading latest release %s ..." % zip_url) response_zip = urlopen(zip_url) - except Exception as e: + except Exception: if self.debug: traceback.print_exc() self.logger.log("ERROR", "Upgrader", "Error downloading the loki update - check your Internet connection") @@ -208,7 +206,7 @@ def update_loki(self): if not os.path.exists(os.path.dirname(targetFile)): if os.path.dirname(targetFile) != '': os.makedirs(os.path.dirname(targetFile)) - except Exception as e: + except Exception: if self.debug: self.logger.log("DEBUG", "Upgrader", "Cannot create dir name '%s'" % os.path.dirname(targetFile)) traceback.print_exc() @@ -221,19 +219,19 @@ def update_loki(self): if self.debug: self.logger.log("DEBUG", "Upgrader", "Successfully extracted '%s'" % targetFile) target.close() - except Exception as e: + except Exception: self.logger.log("ERROR", "Upgrader", "Cannot extract '%s'" % targetFile) if self.debug: traceback.print_exc() - except Exception as e: + except Exception: if self.debug: traceback.print_exc() self.logger.log("ERROR", "Upgrader", "Error while extracting the signature files from the download package") sys.exit(1) - except Exception as e: + except Exception: if self.debug: traceback.print_exc() return False @@ -253,7 +251,7 @@ def get_application_path(): #if args.debug: # logger.log("DEBUG", "Init", "Application Path: %s" % application_path) return application_path - except Exception as e: + except Exception: print("Error while evaluation of application path") traceback.print_exc() diff --git a/loki.py b/loki.py index 5f45ed0e..441cad1f 100644 --- a/loki.py +++ b/loki.py @@ -34,7 +34,7 @@ import platform import signal as signal_module from sys import platform as _platform -from subprocess import Popen, PIPE +from subprocess import Popen from collections import Counter import datetime from bisect import bisect_left @@ -65,7 +65,7 @@ import win32api from win32com.shell import shell import win32file - except Exception as e: + except Exception: print("Linux System - deactivating process memory check ...") os_platform = "linux" # crazy guess @@ -329,7 +329,7 @@ def scan_path(self, path): try: with open(filePath, 'rb') as f: firstBytes = f.read(4) - except Exception as e: + except Exception: logger.log("DEBUG", "FileScan", "Cannot open file %s (access denied)" % filePathCleaned) # Evaluate Type @@ -463,7 +463,7 @@ def scan_path(self, path): total_score += score reasons.append(message) - except Exception as e: + except Exception: if logger.debug: traceback.print_exc() logger.log("ERROR", "FileScan", "Cannot YARA scan file: %s" % filePathCleaned) @@ -491,7 +491,7 @@ def scan_path(self, path): logger.log(message_type, "FileScan", message_body) - except Exception as e: + except Exception: if logger.debug: traceback.print_exc() sys.exit(1) @@ -551,7 +551,7 @@ def scan_data(self, fileData, fileType="-", fileName=b"-", filePath=b"-", extens yield score, match.rule, description, reference, matched_strings, author - except Exception as e: + except Exception: if logger.debug: traceback.print_exc() @@ -630,12 +630,12 @@ def scan_processes(self, nopesieve, nolisten, excludeprocess, pesieveshellc): try: owner_raw = process.GetOwner() owner = owner_raw[2] - except Exception as e: + except Exception: owner = "unknown" if not owner: owner = "unknown" - except Exception as e: + except Exception: logger.log("ALERT", "ProcessScan", "Error getting all process information. Did you run the scanner 'As Administrator'?") continue @@ -645,7 +645,7 @@ def scan_processes(self, nopesieve, nolisten, excludeprocess, pesieveshellc): # Special Checks ------------------------------------------------------ # better executable path - if not "\\" in cmd and path != "none": + if "\\" not in cmd and path != "none": cmd = path # Process Info @@ -714,7 +714,7 @@ def scan_processes(self, nopesieve, nolisten, excludeprocess, pesieveshellc): elif len(alerts) > 0: for alert in alerts: logger.log("ALERT", "ProcessScan", alert) - except Exception as e: + except Exception: if logger.debug: traceback.print_exc() if path != "none": @@ -744,7 +744,7 @@ def scan_processes(self, nopesieve, nolisten, excludeprocess, pesieveshellc): (process_info, str(results["unreachable_file"]))) else: logger.log("INFO", "ProcessScan", "PE-Sieve reported no anomalies %s" % process_info) - except WindowsError as e: + except WindowsError: if logger.debug: traceback.print_exc() logger.log("ERROR", "ProcessScan", @@ -824,7 +824,7 @@ def scan_processes(self, nopesieve, nolisten, excludeprocess, pesieveshellc): #if name == "svchost.exe" and not ( self.check_svchost_owner(owner) or "unistacksvcgroup" in cmd.lower()): # logger.log("WARNING", "ProcessScan", "svchost.exe process owner is suspicious %s" % process_info) - if name == "svchost.exe" and not " -k " in cmd and cmd != "N/A": + if name == "svchost.exe" and " -k " not in cmd and cmd != "N/A": logger.log("WARNING", "ProcessScan", "svchost.exe process does not contain a -k in its command line %s" % process_info) # Process: lsm.exe @@ -851,7 +851,7 @@ def scan_processes(self, nopesieve, nolisten, excludeprocess, pesieveshellc): # Process: explorer.exe if path != "none": - if name == "explorer.exe" and not t_systemroot.lower() in path.lower(): + if name == "explorer.exe" and t_systemroot.lower() not in path.lower(): logger.log("WARNING", "ProcessScan", "explorer.exe path is not %%SYSTEMROOT%% %s" % process_info) if name == "explorer.exe" and parent_pid > 0: for proc in processes: @@ -874,7 +874,7 @@ def check_process_connections(self, process): # Get psutil info about the process try: p = psutil.Process(pid) - except Exception as e: + except Exception: if logger.debug: traceback.print_exc() return @@ -920,7 +920,7 @@ def check_process_connections(self, process): logger.log("NOTICE", "ProcessScan", "Connection output threshold reached. Output truncated.") return - except Exception as e: + except Exception: if args.debug: traceback.print_exc() sys.exit(1) @@ -940,7 +940,7 @@ def check_rootkit(self): logger.log("ALERT", message) else: logger.log("INFO", "Rootkit", "Double Pulsar RDP check RESULT: %s" % message) - except Exception as e: + except Exception: logger.log("INFO", "Rootkit", "Double Pulsar RDP check failed RESULT: Connection failure") if args.debug: traceback.print_exc() @@ -952,7 +952,7 @@ def check_rootkit(self): logger.log("ALERT", message) else: logger.log("INFO", "Rootkit", "Double Pulsar SMB check RESULT: %s" % message) - except Exception as e: + except Exception: logger.log("INFO", "Rootkit", "Double Pulsar SMB check failed RESULT: Connection failure") if args.debug: traceback.print_exc() @@ -1017,13 +1017,13 @@ def initialize_c2_iocs(self, ioc_directory): # Add to the LOKI iocs self.c2_server[c2.lower()] = last_comment - except Exception as e: + except Exception: logger.log("ERROR", "Init", "Cannot read line: %s" % line) if logger.debug: sys.exit(1) - except OSError as e: + except OSError: logger.log("ERROR", "Init", "No such file or directory") - except Exception as e: + except Exception: traceback.print_exc() logger.log("ERROR", "Init", "Error reading Hash file: %s" % ioc_filename) @@ -1085,13 +1085,13 @@ def initialize_filename_iocs(self, ioc_directory): fioc = {'regex': re.compile(regex), 'score': score, 'description': desc, 'regex_fp': regex_fp_comp} self.filename_iocs.append(fioc) - except Exception as e: + except Exception: logger.log("ERROR", "Init", "Error reading line: %s" % line) if logger.debug: traceback.print_exc() sys.exit(1) - except Exception as e: + except Exception: if 'ioc_filename' in locals(): logger.log("ERROR", "Init", "Error reading IOC file: %s" % ioc_filename) else: @@ -1143,7 +1143,7 @@ def initialize_yara_rules(self): }) logger.log("DEBUG", "Init", "Initializing Yara rule %s" % file) rule_count += 1 - except Exception as e: + except Exception: logger.log("ERROR", "Init", "Error while initializing Yara rule %s ERROR: %s" % (file, sys.exc_info()[1])) traceback.print_exc() if logger.debug: @@ -1153,7 +1153,7 @@ def initialize_yara_rules(self): # Add the rule yaraRules += yara_rule_data - except Exception as e: + except Exception: logger.log("ERROR", "Init", "Error reading signature file %s ERROR: %s" % (yaraRuleFile, sys.exc_info()[1])) if logger.debug: traceback.print_exc() @@ -1171,7 +1171,7 @@ def initialize_yara_rules(self): 'owner': dummy, }) logger.log("INFO", "Init", "Initialized %d Yara rules" % rule_count) - except Exception as e: + except Exception: traceback.print_exc() logger.log("ERROR", "Init", "Error during YARA rule compilation ERROR: %s - please fix the issue in the rule set" % sys.exc_info()[1]) sys.exit(1) @@ -1179,7 +1179,7 @@ def initialize_yara_rules(self): # Add as Lokis YARA rules self.yara_rules.append(compiledRules) - except Exception as e: + except Exception: logger.log("ERROR", "Init", "Error reading signature folder /signatures/") if logger.debug: traceback.print_exc() @@ -1234,7 +1234,7 @@ def initialize_hash_iocs(self, ioc_directory, false_positive=False): self.hashes_sha256[int(hash, 16)] = comment if false_positive: self.false_hashes[int(hash, 16)] = comment - except Exception as e: + except Exception: if logger.debug: traceback.print_exc() logger.log("ERROR", "Init", "Cannot read line: %s" % line) @@ -1252,7 +1252,7 @@ def initialize_hash_iocs(self, ioc_directory, false_positive=False): self.hashes_sha256_list = list(self.hashes_sha256.keys()) self.hashes_sha256_list.sort() - except Exception as e: + except Exception: if logger.debug: traceback.print_exc() sys.exit(1) @@ -1278,10 +1278,10 @@ def initialize_filetype_magics(self, filetype_magics_file): # print "%s - %s" % ( sig, description ) self.filetype_magics[sig] = description - except Exception as e: + except Exception: logger.log("ERROR", "Init", "Cannot read line: %s" % line) - except Exception as e: + except Exception: if logger.debug: traceback.print_exc() sys.exit(1) @@ -1301,12 +1301,12 @@ def initialize_excludes(self, excludes_file): if re.search(r'\w', line): regex = re.compile(line, re.IGNORECASE) excludes.append(regex) - except Exception as e: + except Exception: logger.log("ERROR", "Init", "Cannot compile regex: %s" % line) self.fullExcludes = excludes - except Exception as e: + except Exception: if logger.debug: traceback.print_exc() logger.log("NOTICE", "Init", "Error reading excludes file: %s" % excludes_file) @@ -1318,7 +1318,7 @@ def get_file_data(self, filePath): # Read file complete with open(filePath, 'rb') as f: fileData = f.read() - except Exception as e: + except Exception: if logger.debug: traceback.print_exc() logger.log("DEBUG", "FileScan", "Cannot open file %s (access denied)" % filePath) @@ -1391,7 +1391,7 @@ def get_application_path(): #if args.debug: # logger.log("DEBUG", "Init", "Application Path: %s" % application_path) return application_path - except Exception as e: + except Exception: print("Error while evaluation of application path") traceback.print_exc() if args.debug: @@ -1448,7 +1448,7 @@ def signal_handler(signal_name, frame): try: print("------------------------------------------------------------------------------\n") logger.log('INFO', 'Init', 'LOKI\'s work has been interrupted by a human. Returning to Asgard.') - except Exception as e: + except Exception: print('LOKI\'s work has been interrupted by a human. Returning to Asgard.') sys.exit(0) diff --git a/plugins/loki-plugin-wmi.py b/plugins/loki-plugin-wmi.py index 8f99f2ec..31fa9121 100644 --- a/plugins/loki-plugin-wmi.py +++ b/plugins/loki-plugin-wmi.py @@ -16,7 +16,7 @@ def ScanWMI(): if sys.platform in ("win32", "cygwin"): try: import wmi - except ImportError as e: + except ImportError: wmi = None logger.log("CRITICAL", "WMIScan", "Unable to import wmi") print("Unable to import wmi") @@ -48,21 +48,21 @@ def ScanWMI(): for eventFilter in leventFilter: try: hashEntry = hashlib.md5(str(eventFilter)).hexdigest() - if not hashEntry in knownHashes: + if hashEntry not in knownHashes: logger.log("WARNING", "WMIScan", 'CLASS: __eventFilter MD5: %s NAME: %s QUERY: %s' % (hashEntry, eventFilter.wmi_property('Name').value, eventFilter.wmi_property('Query').value)) except: logger.log("INFO", "WMIScan", repr(str(eventFilter))) for FilterToConsumerBinding in lFilterToConsumerBinding: try: hashEntry = hashlib.md5(str(FilterToConsumerBinding)).hexdigest() - if not hashEntry in knownHashes: + if hashEntry not in knownHashes: logger.log("WARNING", "WMIScan", 'CLASS: __FilterToConsumerBinding MD5: %s CONSUMER: %s FILTER: %s' % (hashEntry, FilterToConsumerBinding.wmi_property('Consumer').value, FilterToConsumerBinding.wmi_property('Filter').value)) except: logger.log("INFO", "WMIScan", repr(str(FilterToConsumerBinding))) for CommandLineEventConsumer in lCommandLineEventConsumer: try: hashEntry = hashlib.md5(str(CommandLineEventConsumer)).hexdigest() - if not hashEntry in knownHashes: + if hashEntry not in knownHashes: logger.log("WARNING", "WMIScan", 'CLASS: CommandLineEventConsumer MD5: %s NAME: %s COMMANDLINETEMPLATE: %s' % (hashEntry, CommandLineEventConsumer.wmi_property('Name').value, CommandLineEventConsumer.wmi_property('CommandLineTemplate').value)) except: logger.log("INFO", "WMIScan", repr(str(CommandLineEventConsumer)))