--- /dev/null
+#!/usr/bin/env python3
+
+"""Nagios check dispatcher and custom checks
+
+This fulfills two roles:
+ * can be used as an ssh force command and dispatch checks to appropriate nagios plugins
+ * implements various custom checks
+"""
+
+# https://pypi.org/project/nagiosplugin/ may also be useful
+
+# Standard library
+from enum import IntEnum
+import logging
+import os
+import re
+import subprocess
+import sys
+
+# Common third party libs
+import requests
+
+LOGGER = logging.getLogger(__name__)
+
+class Return_Code(IntEnum):
+ OK = 0
+ WARNING = 1
+ CRITICAL = 2
+ UNKNOWN = 3
+
+# TODO(py3.7): use dataclass?
+class NagiosReturn:
+ """Class for returning nagios results"""
+ returncode: Return_Code
+ short: str
+ short_perf: str
+ long_text: str
+ long_perf: str
+
+ def __init__(self, returncode, short, short_perf="", long_text="", long_perf=""):
+ self.returncode = returncode
+ self.short = short
+ self.short_perf = short_perf
+ self.long_text = long_text
+ self.long_perf = long_perf
+
+ def exit(self):
+ assert '\n' not in self.short
+ assert '\n' not in self.short_perf
+ print("%s|%s" % (self.short.strip(), self.short_perf.strip()))
+ print(self.long_text + "|" + self.long_perf)
+ sys.exit(self.returncode)
+
+
+def check_vault(check, hostname):
+ url = "https://%s:8200/v1/sys/seal-status" % (hostname, )
+ result = requests.get(url)
+ if result.status_code == 200:
+ long_perf = "json=%s" % (result.text, )
+ if result.json()['sealed']:
+ returncode = Return_Code.CRITICAL
+ short = "vault is sealed"
+ perf = "sealed=1"
+ return NagiosReturn(returncode, short, perf, long_perf=long_perf)
+ else:
+ returncode = Return_Code.OK
+ short = "vault is unsealed"
+ perf = "sealed=0"
+ return NagiosReturn(returncode, short, perf, long_perf=long_perf)
+ else:
+ returncode = Return_Code.CRITICAL
+ short = "vault seal-status returned %d" % (result.status_code)
+ perf = "status_code=%d" % (result.status_code)
+ return NagiosReturn(returncode, short, perf)
+
+
+FUNCTIONS = dict(
+ check_vault=check_vault,
+)
+
+
+ARG_CHECKER = re.compile('^([a-zA-Z0-9][a-zA-Z0-9.-]*)$')
+
+
+def dispatch():
+ assert len(sys.argv) > 1, "currently must pass args on commandline"
+ if len(sys.argv) > 1:
+ cmd = sys.argv[1]
+ if cmd in FUNCTIONS:
+ args = []
+ for arg in sys.argv[2:]:
+ match = ARG_CHECKER.match(arg)
+ if match:
+ args.append(match.group(1))
+ else:
+ ret = NagiosReturn(Return_Code.UNKNOWN, "invalid arg %s" % (arg, ))
+ break
+ else:
+ # If we got through the loop without breaking, args are fine
+ ret = FUNCTIONS[cmd](cmd, *args)
+ else:
+ ret = NagiosReturn(Return_Code.UNKNOWN, "unknown cmd %s" % (cmd, ))
+ ret.exit()
+
+
+if __name__ == '__main__':
+ #handlers = [journal.JournalHandler()]
+ handlers = []
+ if 'SSH_ORIGINAL_COMMAND' not in os.environ:
+ # Probably being run for diagnostics, so go ahead and log to console
+ handlers.append(logging.StreamHandler())
+ logging.basicConfig(level='INFO',handlers=handlers)
+ dispatch()