PNG  IHDR* pHYs+ IDATx]n#; cdLb Ǚ[at¤_:uP}>!Usă cag޿ ֵNu`ݼTâabO7uL&y^wFٝA"l[|ŲHLN밪4*sG3|Dv}?+y߉{OuOAt4Jj.u]Gz*҉sP'VQKbA1u\`& Af;HWj hsO;ogTu uj7S3/QzUr&wS`M$X_L7r2;aE+ώ%vikDA:dR+%KzƉo>eOth$z%: :{WwaQ:wz%4foɹE[9<]#ERINƻv溂E%P1i01 |Jvҗ&{b?9g=^wζXn/lK::90KwrюO\!ջ3uzuGv^;騢wq<Iatv09:tt~hEG`v;3@MNZD.1]L:{ծI3`L(÷ba")Y.iljCɄae#I"1 `3*Bdz>j<fU40⨬%O$3cGt]j%Fߠ_twJ;ABU8vP3uEԑwQ V:h%))LfraqX-ۿX]v-\9I gl8tzX ]ecm)-cgʒ#Uw=Wlێn(0hPP/ӨtQ“&J35 $=]r1{tLuǮ*i0_;NƝ8;-vݏr8+U-kruȕYr0RnC]*ެ(M:]gE;{]tg(#ZJ9y>utRDRMdr9㪩̞zֹb<ģ&wzJM"iI( .ꮅX)Qw:9,i좜\Ԛi7&N0:asϓc];=ΗOӣ APqz93 y $)A*kVHZwBƺnWNaby>XMN*45~ղM6Nvm;A=jֲ.~1}(9`KJ/V F9[=`~[;sRuk]rєT!)iQO)Y$V ی ۤmzWz5IM Zb )ˆC`6 rRa}qNmUfDsWuˤV{ Pݝ'=Kֳbg,UҘVz2ﴻnjNgBb{? ߮tcsͻQuxVCIY۠:(V뺕 ٥2;t`@Fo{Z9`;]wMzU~%UA蛚dI vGq\r82iu +St`cR.6U/M9IENDB`#!/opt/imunify360/venv/bin/python3 """This script is a cPanel hook script for several Filemanager related events. Based on: * https://documentation.cpanel.net/display/DD/Guide+to+Standardized+Hooks * https://documentation.cpanel.net/display/DD/Guide+to+Standardized+Hooks+-+Hook+Action+Code """ # noqa: E501 import logging import json import signal import socket import sys from tempfile import NamedTemporaryFile from typing import Callable, List, TextIO from defence360agent import sentry from im360 import aibolit_job logger = logging.getLogger("cpanel_fileman_hook") # _PATH_HOOK contains the location of this hook on the server _PATH_HOOK = "/usr/libexec/imunify360/cpanel_fileman_hook" DESCRIBE_DATA = [ { "blocking": 1, "escalateprivs": 0, "category": "Cpanel", "event": "UAPI::Fileman::upload_files", "stage": "pre", "hook": _PATH_HOOK + " --upload", "exectype": "script", }, { "blocking": 1, "escalateprivs": 0, "category": "Cpanel", "event": "UAPI::Fileman::save_file_content", "stage": "pre", "hook": _PATH_HOOK + " --save", "exectype": "script", }, { "blocking": 1, "escalateprivs": 0, "category": "Cpanel", "event": "Api2::Fileman::savefile", "stage": "pre", "hook": _PATH_HOOK + " --save", "exectype": "script", }, ] class Context: def __init__( self, stdin: TextIO, stdout: TextIO, stderr: TextIO, args: List[str], checker: Callable[[str], bool], ): self.stdin = stdin self.stdout = stdout self.stderr = stderr self.args = args self.checker = checker def status_text( allowed: bool, method=None, filename=None, folder=None, user=None ) -> str: if not allowed: logger.info( "0 BAILOUT malware detected when %s '%s' in %s for user %s", method, filename, folder, user, ) return "1" if allowed else "0 BAILOUT malware detected" def status_code(allowed: bool) -> int: return 0 def describe_action(ctx: Context) -> int: ctx.stdout.write(json.dumps(DESCRIBE_DATA)) return 0 def check_upload(ctx: Context) -> int: logger.info("upload action") suffix = "-key" path = "" filename = "" allowed = True data = json.loads(ctx.stdin.read())["data"] for k, v in data["args"].items(): if k.endswith(suffix): path = data["args"][k[: -len(suffix)]] # "file-eicar.com-key":"file-0" # [len("file-"):-len(suffix)] -> eicar.com filename = k[len("file-"):-len(suffix)] break if path != "": allowed = ctx.checker(path) ctx.stdout.write( status_text( allowed, "upload", filename, data["args"].get("dir"), data["user"] ) ) return status_code(allowed) def check_save(ctx: Context) -> int: logger.info("save action") allowed = True data = json.loads(ctx.stdin.read())["data"] content = data["args"].get("content") if content: with NamedTemporaryFile(mode="w") as ntf: ntf.write(content) ntf.flush() allowed = ctx.checker(ntf.name) ctx.stdout.write( status_text( allowed, "save", data["args"].get("filename") or data["args"].get("file"), data["args"].get("dir") or data["args"].get("path"), data["user"], ) ) return status_code(allowed) KNOWN_ACTIONS = { "describe": describe_action, "upload": check_upload, "save": check_save, } def aibolit_checker(file_to_scan: str) -> bool: # FOLLOWING IS MOSTLY COPIED FROM modsec_scan_real.py resident_dir_path = aibolit_job.RESIDENT_DIR # to include the import time, we could read the start time of the # process https://gist.github.com/westhood/1073585 remaining_time = aibolit_job.create_remaining_time_func( aibolit_job.UPLOAD_TIMEOUT ) # signals we'll be waiting for from aibolit sigset = {signal.SIGUSR1, signal.SIGUSR2} # block the signal in all threads signal.pthread_sigmask(signal.SIG_BLOCK, sigset) # submit the uploaded file for scanning # create PID.upload_job in the resident dir aibolit_job.create_upload_job( files=[file_to_scan], resident_dir_path=resident_dir_path, timeout=remaining_time(), ) logger.info("file %s is sent for scanning", file_to_scan) # notify aibolit about the new job aibolit_job.notify_aibolit_start_it_if_necessary(timeout=remaining_time()) # wait for response while True: # use sigtimedwait() instead of signal() to get the uid # note: ignore a possible race on retry inside sigtimedwait() on # receiving a signal (see sigtimedwait()'s Python docs) si = signal.sigtimedwait(sigset, remaining_time()) if si is None: # timed out logger.warning("timed out while scanning %s", file_to_scan) return True if si.si_uid == 0: # the signal is from root if si.si_signo == signal.SIGUSR1: return False elif si.si_signo == signal.SIGUSR2: return True else: assert 0, "shouldn't happen" # pragma: no cover def setup_logging() -> None: """Setup logging carefully for ossec to capture. When the hook prints logs on stderr, cpanel captures logs and prints them to /usr/local/cpanel/logs/error_log file. We need to make sure the logging format matches the syslog format for ossec to decode it perfectly. """ global logger hostname = socket.getfqdn() logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() formatter = logging.Formatter( f"%(asctime)s {hostname} %(name)s[%(process)d]: %(message)s", datefmt="%b %d %H:%M:%S", ) handler.setFormatter(formatter) logger.addHandler(handler) sentry.configure_sentry() def do_main(ctx: Context) -> int: if len(ctx.args) < 2: print("No command is given.", file=ctx.stderr) return 1 if not ctx.args[1].startswith("--"): print("Wrong argument:", ctx.args[1], file=ctx.stderr) return 1 action = ctx.args[1][2:] if action not in KNOWN_ACTIONS: print("Unknown action:", action, file=ctx.stderr) return 1 return KNOWN_ACTIONS[action](ctx) def main(ctx: Context) -> int: try: return do_main(ctx) except Exception as e: print("1 Exception:", e, file=ctx.stderr) logger.exception("internal error: %s", e) return 1 if __name__ == "__main__": setup_logging() # First line of log does not get its own line but prepended with # cpanel logging format, e.g.(/usr/local/cpanel/logs/error_log): # # [2024-07-16 07:42:40 +0000] info [uapi] STDERR output from hook: /usr/libexec/imunify360/cpanel_fileman_hook --upload # [2024-07-16 07:42:40 +0000] info [uapi] Jul 16 07:42:39 cl7x64.cltest.com cpanel_fileman_hook[101676]: Starting imunify fileman hook # Jul 16 07:42:39 cl7x64.cltest.com cpanel_fileman_hook[101676]: upload action # Jul 16 07:42:39 cl7x64.cltest.com cpanel_fileman_hook[101676]: file /home/user228/tmp/Cpanel_Form_file.upload.bb6355b0 is sent for scanning # Jul 16 07:42:40 cl7x64.cltest.com cpanel_fileman_hook[101676]: 0 BAILOUT malware detected when upload 'eicar.com' in public_html for user user228 # Jul 16 07:42:40 cl7x64.cltest.com cpanel_fileman_hook[101676]: exiting with code 0 # # [2024-07-16 07:42:40 +0000] info [uapi] End STDERR from hook # # This is the reason we issue a log during startup logger.info("Starting imunify fileman hook") ctx = Context( sys.stdin, sys.stdout, sys.stderr, sys.argv, aibolit_checker, ) code = main(ctx) ctx.stdout.flush() ctx.stderr.flush() logger.info("exiting with code %s", code) sys.exit(code)