enochecker package¶
Submodules¶
enochecker.checkerservice module¶
Flask service to run a checker as HTTP service.
-
enochecker.checkerservice.
checker_routes
(checker_cls, disable_json_logging)¶ Create a flask app for the given checker class.
- Parameters
checker_cls (
Type
[ForwardRef
]) – The checker class to use- Return type
Tuple
[Callable
[[],Response
],Callable
[[],Response
],Callable
[[],CheckerInfoMessage
],Callable
[[],Response
]]- Returns
A flask app that can be passed to a uWSGI server or run using .run().
-
enochecker.checkerservice.
init_service
(checker, disable_json_logging=False)¶ Initialize a flask app that can be used for WSGI or listen directly.
The Engine may communicate with it over socket.
- Parameters
checker (
Type
[ForwardRef
]) – the checker class to use for check requests.- Return type
Flask
- Returns
a flask app with post and get routes set, ready for checking.
enochecker.enochecker module¶
Contains the BaseChecker to be used as base for all checkers.
-
class
enochecker.enochecker.
BaseChecker
(task, storage_dir='/home/runner/work/enochecker/enochecker/docs/.data', use_db_cache=True, json_logging=True)¶ Bases:
object
All you base are belong to us. Also all your flags. And checker scripts.
Override the methods given here, then simply init and .run(). Magic.
-
property
chain_db
¶ get the team_db entry for the current chain. Short hand version for self.team_db[self.task_chain_id]
- Return type
Any
- Returns
the team_db entry
-
connect
(host=None, port=None, timeout=None, retries=3)¶ Open a socket/telnet connection to the remote host.
Use connect(..).get_socket() for the raw socket.
- Parameters
host (
Optional
[str
]) – the host to connect to (defaults to self.address)port (
Optional
[int
]) – the port to connect to (defaults to self.port)timeout (
Optional
[float
]) – timeout on connection (defaults to self.timeout)retries (
int
) – the amount of times this socket connection should be retried
- Return type
- Returns
A connected Telnet instance
-
db
(name, ignore_locks=False)¶ Get a (global) db by name.
Subsequent calls will return the same db. Names can be anything, for example the team name, round numbers etc.
- Parameters
name (
str
) – The name of the DBignore_locks (
bool
) – Should only be set if you’re sure-ish keys are never shared between instances. Manual locking ist still possible.
- Return type
Union
[NoSqlDict
,StoredDict
]- Returns
A dict that will be self storing. Alternatively,
-
abstract
exploit
()¶ Use this method strictly for testing purposes. It should run exploits just like a real team would. It may not use stored data.
Will hopefully not be called during the actual CTF.
- Return type
str
- Returns
The flag found during the execution of this exploit
- Raises
EnoException on Error or if the flag was not found
-
exploit_variants
: int = None¶
-
flag_variants
: int = None¶
-
get_team_db
(team_id=None)¶ Return the database for a specific team.
Subsequent calls will return the same db.
- Parameters
team – Return a db for an other team. If none, the db for the local team will be returned.
- Return type
Union
[NoSqlDict
,StoredDict
]- Returns
The team local db
-
abstract
getflag
()¶ Retrieve a flag from the service.
Use self.flag to get the flag that needs to be recovered and self.task_chain_id as a key to retrieve data from the database stored during putflag On error, raise an EnoException.
- Raises
EnoException on error
- Return type
None
-
abstract
getnoise
()¶ Retrieve noise in the service.
The noise to be retrieved, can be restored from the database by using the self.task_chain_id used to store it during putflag. The difference between noise and flag is, tht noise does not have to remain secret for other teams. This method can be called many times per round. Check which variant is called using self.variant_id. On error, raise an EnoException.
- Raises
EnoException on error
- Return type
None
-
property
global_db
¶ Get a global storage shared between all teams and rounds.
Subsequent calls will return the same db. Prefer db_team_local or db_round_local
- Return type
Union
[NoSqlDict
,StoredDict
]- Returns
The global db
-
abstract
havoc
()¶ Unleash havoc on the app -> Do whatever you must to prove the service still works. Or not.
On error, raise an EnoException.
- Raises
EnoException on Error
- Return type
None
-
havoc_variants
: int = None¶
-
http
(method, route='/', params=None, port=None, scheme='http', raise_http_errors=False, timeout=None, **kwargs)¶ Perform an http request (requests lib) to the current host.
Caches cookies in self.http_session
- Parameters
method (
str
) – The request methodparams (
Any
) – The parameterroute (
str
) – The routeport (
Optional
[int
]) – The remote port in case it has not been specified at creationscheme (
str
) – The scheme (defaults to http)raise_http_errors (
bool
) – If True, will raise exception on http error codes (4xx, 5xx)timeout (
Optional
[float
]) – How long we’ll try to connect (default: self.timeout)
- Return type
requests.Response
- Returns
The response
-
http_get
(route='/', params=None, port=None, scheme='http', raise_http_errors=False, timeout=None, **kwargs)¶ Perform a (http) requests.get to the current host.
Caches cookies in self.http_session
- Parameters
params (
Any
) – The parameterroute (
str
) – The routeport (
Optional
[int
]) – The remote port in case it has not been specified at creationscheme (
str
) – The scheme (defaults to http)raise_http_errors (
bool
) – If True, will raise exception on http error codes (4xx, 5xx)timeout (
Optional
[float
]) – How long we’ll try to connect
- Return type
requests.Response
- Returns
The response
-
http_post
(route='/', params=None, port=None, scheme='http', raise_http_errors=False, timeout=None, **kwargs)¶ Perform a (http) requests.post to the current host.
Caches cookies in self.http_session
- Parameters
params (
Any
) – The parameterroute (
str
) – The routeport (
Optional
[int
]) – The remote port in case it has not been specified at creationscheme (
str
) – The scheme (defaults to http)raise_http_errors (
bool
) – If True, will raise exception on http error codes (4xx, 5xx)timeout (
Optional
[float
]) – How long we’ll try to connect
- Return type
requests.Response
- Returns
The response
-
property
http_useragent
¶ Return the useragent for http(s) requests.
- Return type
str
- Returns
the current useragent
-
http_useragent_randomize
()¶ Choose a new random http useragent.
Note that http requests will be initialized with a random user agent already. To retrieve a random useragent without setting it, use random instead.
- Return type
str
- Returns
the new useragent
-
property
noise
¶ Creates a stable noise value for the current task chain. Do not use for indexing (use self.task_chain_id instead).
- Return type
str
- Returns
A noise string, unique for each task_chain_id.
-
noise_variants
: int = None¶
-
abstract
putflag
()¶ Store a flag in the service.
In case multiple flags are provided, self.variant_id gives the appropriate flag store to target. The flag itself can be retrieved from self.flag. On error, raise an Eno Exception.
- Return type
Optional
[str
]- Returns
An optional attack info string that will be publicly available to help in exploits (e.g. the username of the user to attack)
- Raises
EnoException on error
-
abstract
putnoise
()¶ Store noise in the service.
The noise should later be recoverable. The difference between noise and flag is that noise does not have to remain secret for other teams. This method can be called multiple times per round. Check which variant is called using self.variant_id. On error, raise an EnoException.
- Raises
EnoException on error
- Return type
None
-
run
(method=None)¶ Execute the checker and catch errors along the way.
- Parameters
method (
Optional
[CheckerMethod
]) – When calling run, you may call a different method than the one passed on Checker creation using this optional param.- Return type
- Returns
A CheckerResult as a representation of the CheckerResult response as definded in the Spec.
-
search_flag
(data)¶ Search for the flag in the input data using the flag_regex and flag_hash.
Used only in the exploit method.
- Return type
Optional
[str
]- Returns
The flag if it was found, None otherwise
-
search_flag_bytes
(data)¶ Search for the flag in the input data using the flag_regex and flag_hash, where flag_regex is interpreted as binary regular expression.
Used only in the exploit method.
- Return type
Optional
[str
]- Returns
The flag if it was found, None otherwise
-
property
team_db
¶ Return the database for the current team.
- Return type
Union
[NoSqlDict
,StoredDict
]- Returns
The team local db
-
property
time_remaining
¶ Return a remaining time that is safe to be used as timeout.
Includes a buffer of TIME_BUFFER seconds.
- Return type
float
- Returns
A safe number of seconds that may still be used
-
property
time_running
¶ How long this checker has been running for.
- Return type
float
- Returns
time this checker has been running for in seconds
-
property
-
enochecker.enochecker.
parse_args
(argv=None)¶ Return the parsed argparser args.
- Parameters
argv (
Optional
[Sequence
[str
]]) – argv. Custom argvs. Will default to sys.argv if not provided.- Return type
Namespace
- Returns
args object
-
enochecker.enochecker.
run
(checker_cls, args=None, **kwargs)¶ Run a checker, either from cmdline or as uwsgi script.
- Parameters
checker – The checker (subclass of basechecker) to run
force_service – if True (non-default), the server will skip arg parsing and immediately spawn the web service.
args (
Optional
[Sequence
[str
]]) – optional parameter, providing parameters
- Return type
Optional
[CheckerResult
]- Returns
Never returns.
-
enochecker.enochecker.
warn_deprecated
(old_name, new_name)¶ Print a warning for the deprecated feature, include the new name in the log This needs python development mode or deprecation warnings enabled!
- Return type
None
enochecker.logging module¶
Utilities for sending log messages to a central ELK.
-
class
enochecker.logging.
ELKFormatter
(checker, fmt=None, datefmt='%Y-%m-%dT%H:%M:%S%z', style='%')¶ Bases:
logging.Formatter
Format log messages for a central ELK.
-
format
(record)¶ Format a LogRecord as a string.
- Parameters
record (
LogRecord
) – the record to format- Return type
str
- Returns
the formatted string
-
-
enochecker.logging.
exception_to_string
(excp)¶ Format an exception as a string.
Limits the length of the traceback to 3. :param ecxp: the exception to format :rtype:
str
:return: The formatted string
enochecker.nosqldict module¶
Backend for team_db based on MongoDB.
-
class
enochecker.nosqldict.
NoSqlDict
(name='default', checker_name='BaseChecker', host=None, port=None, username=None, password=None, logger=None, *args, **kwargs)¶ Bases:
collections.abc.MutableMapping
A dictionary that is MongoDb backed.
-
dblock
= <unlocked _thread.RLock object owner=0 count=0>¶
-
classmethod
get_client
(host, port, username, password, logger)¶ Lazily try to get the mongo db connection or creates a new one.
- Parameters
host (
str
) – mongo hostport (
int
) – mongo portusername (
Optional
[str
]) – the username to connect topassword (
Optional
[str
]) – the password to use
- Return type
MongoClient
- Returns
-
persist
()¶ Persist the changes in the backend.
- Return type
None
-
-
enochecker.nosqldict.
value_to_hash
(value)¶ Create a stable hash for a value based on the json representation.
Used to check whether the value in the DB has changed without calling __setitem__, for example when writing to nested dicts.
- Parameters
key – the value to hash
- Return type
Optional
[int
]- Returns
hash in string format
enochecker.results module¶
Collection of Exception classes to signal the status of the service being checked.
-
exception
enochecker.results.
BrokenCheckerException
(message, internal_message=None)¶ Bases:
enochecker.results.EnoException
Shouldn’t be raised ever since we catch all abstract Errors.
Used internally if something goes horribly wrong.
-
result
: enochecker_core.CheckerTaskResult = 'INTERNAL_ERROR'¶
-
-
exception
enochecker.results.
BrokenServiceException
(message, internal_message=None)¶ Bases:
enochecker.results.EnoException
Indicates a broken Service.
-
result
: enochecker_core.CheckerTaskResult = 'MUMBLE'¶
-
-
class
enochecker.results.
CheckerResult
(result, message=None, attack_info=None, flag=None)¶ Bases:
object
-
static
from_exception
(e)¶ Converts a given Exception to an extended CheckerResult including Message public_message isn’t used anywhere yet
- Return type
-
static
-
exception
enochecker.results.
EnoException
(message, internal_message=None)¶ Bases:
Exception
,abc.ABC
Base error including the Result. Raise a subclass of me once we know what to do.
-
message_contains
(flag)¶ ” If the string is in the message
- Return type
bool
-
result
: enochecker_core.CheckerTaskResult = 'INTERNAL_ERROR'¶
-
-
exception
enochecker.results.
OfflineException
(message, internal_message=None)¶ Bases:
enochecker.results.EnoException
Service was not reachable (at least once) during our checks.
-
result
: enochecker_core.CheckerTaskResult = 'OFFLINE'¶
-
enochecker.storeddict module¶
Backend for team_db based on a local filesystem directory.
-
class
enochecker.storeddict.
StoredDict
(name='default', base_path='/home/runner/work/enochecker/enochecker/docs/.data', ignore_locks=False, logger=None, *args, **kwargs)¶ Bases:
collections.abc.MutableMapping
A dictionary that is filesystem backed.
It will write to disk every few seconds and at exit. In case python crashes, changes may be gone. :/ Note: Complex won’t be tracked.
-
is_locked
(key)¶ Return if the key is currently locked by this process.
- Parameters
key (
str
) – The key- Return type
bool
- Returns
True if locked by this process, False otherwise
-
lock
(key)¶ Wait for a lock.
- Parameters
key (
str
) – the key to lock- Return type
None
-
mark_dirty
(key)¶ Manually mark an entry as dirty. It will be updated on disk on the next occasion.
- Parameters
key (
str
) – the key that needs to be stored- Return type
Any
- Returns
the value contained in the key
-
persist
()¶ Store all dirty data to disk.
If no data is to be stored, it’s basically free to call.
- Return type
None
-
release
(locked_key)¶ Release a file lock.
- Parameters
locked_key (
str
) – the key we locked- Return type
None
-
reload
()¶ Reload stored values from disk.
There is usually no reason to call this. Non persisted changes might be lost. Only reason would be if another process fiddles with our data concurrently.
- Return type
None
-
-
enochecker.storeddict.
makedirs
(path, exist_ok=True)¶ Create a directory.
Creates the parent directories if necessary.
param path: the path to create param exist_ok: ignore already existing path and do nothing
- Return type
None
enochecker.useragents module¶
Collection of random usernames to make fingerprinting checker traffic harder.
-
enochecker.useragents.
random_useragent
()¶ Return a random useragent.
- Return type
str
- Returns
A seemingly valid useragent.
enochecker.utils module¶
Collection of utilities for checker development.
-
class
enochecker.utils.
SimpleSocket
(host=None, port=0, timeout=<object object>, logger=None, timeout_fun=None, *args, **kwargs)¶ Bases:
pwnlib.tubes.remote.remote
Convenient socket wrapper using pwnlib’s tubes.remote
Read Pwnlib documentation for more info.
-
property
current_default_timeout
¶ Get the timeout default that should currently be used.
- Return type
float
- Returns
current timeout default, either from self.timeout_fun or from timeout.
-
expect
(regexes, timeout=None)¶ Read until one from a list of a regular expressions matches.
Use this to search for anything.
- Parameters
regexes (
Sequence
[Union
[Pattern
[bytes
],bytes
,str
]]) – The first argument is a list of regular expressions, either compiled (re.Pattern instances) or uncompiled (strings).timeout (
Optional
[float
]) – Timeout in seconds. If none, default will be taken.
- Return type
Tuple
[int
,Optional
[Match
[bytes
]],bytes
]- Returns
Return a tuple of three items: the index in the list of the first regular expression that matches; the re.Match object returned; and the text read up till and including the match.
-
read_all
()¶ Read all data until EOF; block until connection closed.
- Return type
bytes
- Returns
the complete content until EOF
-
read_n_lines
(line_count, delimiter=b'\\n')¶ Read n lines from socket.
- Parameters
line_count (
int
) – the amount of lines to readdelimiter (
Union
[str
,bytes
]) – what delimeter to use for splitting (could also be non-n)
- Return type
List
[bytes
]- Returns
a list of lines
-
read_until
(match, timeout=None)¶ Read until the expected string has been seen, or a timeout is hit (default is default socket timeout).
- Parameters
match (
Union
[bytes
,str
]) – what to look for.timeout (
Optional
[float
]) – default socket timeout override
- Return type
bytes
- Returns
Returns everything until the given math. When no match is found, return whatever is available instead, possibly the empty string. Raise EOFError if the connection is closed and no cooked data is available.
-
readline_expect
(expected, read_until=b'\\n', timeout=None, exception_message=None)¶ Read to newline (or read_until string) and assert the presence of a string in the response.
Will raise an exception if failed.
- Parameters
read_until (
Union
[str
,bytes
]) – Which parameter to read untilexpected (
Union
[str
,bytes
]) – the expected String to search for in the responsetimeout (
Optional
[float
]) – The timeout (uses Telnet default if not passed in)
- Return read
the bytes read
- Return type
bytes
-
recv
(numb=4096, timeout=default) → bytes¶ Receives up to numb bytes of data from the tube, and returns as soon as any quantity of data is available.
If the request is not satisfied before
timeout
seconds pass, all data is buffered and an empty string (''
) is returned.- Raises:
exceptions.EOFError: The connection is closed
- Returns:
A bytes object containing bytes received from the socket, or
''
if a timeout occurred while waiting.
Examples:
>>> t = tube() >>> # Fake a data source >>> t.recv_raw = lambda n: b'Hello, world' >>> t.recv() == b'Hello, world' True >>> t.unrecv(b'Woohoo') >>> t.recv() == b'Woohoo' True >>> with context.local(log_level='debug'): ... _ = t.recv() [...] Received 0xc bytes: b'Hello, world'
-
write
(buffer)¶ Write a string to the socket.
Can block if the connection is blocked. May raise socket.error if the connection is closed.
- Parameters
buffer (
Union
[str
,bytes
]) – The buffer to write- Return type
None
-
property
-
enochecker.utils.
assert_equals
(o1, o2, message=None, autobyteify=False)¶ Raise a BrokenServiceException if o1 != o2.
- Parameters
o1 (
Any
) – the first objecto2 (
Any
) – the second objectmessage (
Optional
[str
]) – The exception message in case of an error (optional)autobyteify (
bool
) – will call ensure_bytes on both parameters.
- Return type
None
-
enochecker.utils.
assert_in
(o1, o2, message=None)¶ Raise a BrokenServiceException if o1 not in o2.
- Parameters
o1 (
Any
) – the object that should be in o2o2 (
Any
) – the object to look inmessage (
Optional
[str
]) – An optional message that will be part of the error
- Return type
None
-
enochecker.utils.
assert_true
(expression, message=None, autobyteify=False)¶ Raise a BrokenServiceException if expression is not True.
- Parameters
o1 – the element that should be True
message (
Optional
[str
]) – The exception message in case of an error (optional)autobyteify (
bool
) – will call ensure_bytes on both parameters.
- Return type
None
-
enochecker.utils.
base64ify
(s, altchars=None)¶ Calculate the base64 representation of a value.
- Parameters
s (
Union
[str
,bytes
]) – the input stringaltchars (
Union
[str
,bytes
,None
]) – base64 encodes using the given altchars (or not, if None)
- Return type
str
- Returns
base64 representation
-
enochecker.utils.
debase64ify
(s, altchars=None)¶ Decode a base64-encoded string.
- Parameters
s (
Union
[str
,bytes
]) – the base64-encoded stringaltchars (
Union
[str
,bytes
,None
]) – base64 decodes using the given altchars (or not, if None)
- Return type
str
- Returns
the decoded value
-
enochecker.utils.
ensure_bytes
(obj)¶ Convert an object to bytes.
If the input is bytes, the value remains unchanged.
- Parameters
obj (
Union
[bytes
,str
,Any
]) – str or bytes (or anything else) to convert to bytes representation- Return type
bytes
- Returns
the bytes representation of the object
-
enochecker.utils.
ensure_valid_filename
(s, min_length=3)¶ Get a valid file name from the input.
- Parameters
s (
str
) – The input stringmin_length (
int
) – if the result is smaller than this, the method will fall back to base64.
- Return type
str
- Returns
all illegal chars stripped or base64ified if it gets too small
-
enochecker.utils.
sha256ify
(s)¶ Calculate the sha256 hash.
Converts the input to bytes if it is a string.
- Parameters
s (
Union
[str
,bytes
]) – the string- Return type
str
- Returns
the hash in hex representation
-
enochecker.utils.
snake_caseify
(camel)¶ Turn camels into snake (-cases).
- Parameters
camel (
str
) – camelOrSnakeWhatever- Return type
str
- Returns
camel_or_snake_whatever