summaryrefslogtreecommitdiff
path: root/src/saml2test/__init__.py
blob: a4812f962542e35cd8c10a3fcfdbf209d22f9ac7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import logging
import time
import traceback
import requests
import sys
import socket

from subprocess import Popen, PIPE
from saml2test.check import CRITICAL

logger = logging.getLogger(__name__)

__author__ = 'rolandh'


class FatalError(Exception):
    pass


class CheckError(Exception):
    pass


class HTTP_ERROR(Exception):
    pass


class Unknown(Exception):
    pass


class OperationError(Exception):
    pass


class ContextFilter(logging.Filter):
    """
    This is a filter which injects time laps information into the log.
    """

    def start(self):
        self.start = time.time()

    def filter(self, record):
        record.delta = time.time() - self.start
        return True


def start_script(path, *args):
    popen_args = [path]
    popen_args.extend(args)
    return Popen(popen_args, stdout=PIPE, stderr=PIPE)


def stop_script_by_name(name):
    import subprocess
    import signal
    import os

    p = subprocess.Popen(['ps', '-A'], stdout=subprocess.PIPE)
    out, err = p.communicate()

    for line in out.splitlines():
        if name in line:
            pid = int(line.split(None, 1)[0])
            os.kill(pid, signal.SIGKILL)


def stop_script_by_pid(pid):
    import signal
    import os

    os.kill(pid, signal.SIGKILL)


def get_page(url):
    resp = requests.get(url)
    if resp.status_code == 200:
        return resp.text
    else:
        raise HTTP_ERROR(resp.status)


def exception_trace(tag, exc, log=None):
    message = traceback.format_exception(*sys.exc_info())

    try:
        _exc = "Exception: %s" % exc
    except UnicodeEncodeError:
        _exc = "Exception: %s" % exc.message.encode("utf-8", "replace")

    return {"status": CRITICAL, "message": _exc, "content": "".join(message)}


def ip_addresses():
    return [ip for ip in socket.gethostbyname_ex(socket.gethostname())[2]
            if not ip.startswith("127.")]