enochecker package

Submodules

enochecker.checkerservice module

Flask service to run a checker as HTTP service.

enochecker.checkerservice.checker_routes(checker_cls, disable_json_logging)

Create a flask app for the given checker class.

Parameters

checker_cls (Type[ForwardRef]) – The checker class to use

Return type

Tuple[Callable[[], Response], Callable[[], Response], Callable[[], CheckerInfoMessage], Callable[[], Response]]

Returns

A flask app that can be passed to a uWSGI server or run using .run().

enochecker.checkerservice.init_service(checker, disable_json_logging=False)

Initialize a flask app that can be used for WSGI or listen directly.

The Engine may communicate with it over socket.

Parameters

checker (Type[ForwardRef]) – the checker class to use for check requests.

Return type

Flask

Returns

a flask app with post and get routes set, ready for checking.

enochecker.enochecker module

Contains the BaseChecker to be used as base for all checkers.

class enochecker.enochecker.BaseChecker(task, storage_dir='/home/runner/work/enochecker/enochecker/docs/.data', use_db_cache=True, json_logging=True)

Bases: object

All you base are belong to us. Also all your flags. And checker scripts.

Override the methods given here, then simply init and .run(). Magic.

property chain_db

get the team_db entry for the current chain. Short hand version for self.team_db[self.task_chain_id]

Return type

Any

Returns

the team_db entry

connect(host=None, port=None, timeout=None, retries=3)

Open a socket/telnet connection to the remote host.

Use connect(..).get_socket() for the raw socket.

Parameters
  • host (Optional[str]) – the host to connect to (defaults to self.address)

  • port (Optional[int]) – the port to connect to (defaults to self.port)

  • timeout (Optional[float]) – timeout on connection (defaults to self.timeout)

  • retries (int) – the amount of times this socket connection should be retried

Return type

SimpleSocket

Returns

A connected Telnet instance

db(name, ignore_locks=False)

Get a (global) db by name.

Subsequent calls will return the same db. Names can be anything, for example the team name, round numbers etc.

Parameters
  • name (str) – The name of the DB

  • ignore_locks (bool) – Should only be set if you’re sure-ish keys are never shared between instances. Manual locking ist still possible.

Return type

Union[NoSqlDict, StoredDict]

Returns

A dict that will be self storing. Alternatively,

abstract exploit()

Use this method strictly for testing purposes. It should run exploits just like a real team would. It may not use stored data.

Will hopefully not be called during the actual CTF.

Return type

str

Returns

The flag found during the execution of this exploit

Raises

EnoException on Error or if the flag was not found

exploit_variants: int = None
flag_variants: int = None
get_team_db(team_id=None)

Return the database for a specific team.

Subsequent calls will return the same db.

Parameters

team – Return a db for an other team. If none, the db for the local team will be returned.

Return type

Union[NoSqlDict, StoredDict]

Returns

The team local db

abstract getflag()

Retrieve a flag from the service.

Use self.flag to get the flag that needs to be recovered and self.task_chain_id as a key to retrieve data from the database stored during putflag On error, raise an EnoException.

Raises

EnoException on error

Return type

None

abstract getnoise()

Retrieve noise in the service.

The noise to be retrieved, can be restored from the database by using the self.task_chain_id used to store it during putflag. The difference between noise and flag is, tht noise does not have to remain secret for other teams. This method can be called many times per round. Check which variant is called using self.variant_id. On error, raise an EnoException.

Raises

EnoException on error

Return type

None

property global_db

Get a global storage shared between all teams and rounds.

Subsequent calls will return the same db. Prefer db_team_local or db_round_local

Return type

Union[NoSqlDict, StoredDict]

Returns

The global db

abstract havoc()

Unleash havoc on the app -> Do whatever you must to prove the service still works. Or not.

On error, raise an EnoException.

Raises

EnoException on Error

Return type

None

havoc_variants: int = None
http(method, route='/', params=None, port=None, scheme='http', raise_http_errors=False, timeout=None, **kwargs)

Perform an http request (requests lib) to the current host.

Caches cookies in self.http_session

Parameters
  • method (str) – The request method

  • params (Any) – The parameter

  • route (str) – The route

  • port (Optional[int]) – The remote port in case it has not been specified at creation

  • scheme (str) – The scheme (defaults to http)

  • raise_http_errors (bool) – If True, will raise exception on http error codes (4xx, 5xx)

  • timeout (Optional[float]) – How long we’ll try to connect (default: self.timeout)

Return type

requests.Response

Returns

The response

http_get(route='/', params=None, port=None, scheme='http', raise_http_errors=False, timeout=None, **kwargs)

Perform a (http) requests.get to the current host.

Caches cookies in self.http_session

Parameters
  • params (Any) – The parameter

  • route (str) – The route

  • port (Optional[int]) – The remote port in case it has not been specified at creation

  • scheme (str) – The scheme (defaults to http)

  • raise_http_errors (bool) – If True, will raise exception on http error codes (4xx, 5xx)

  • timeout (Optional[float]) – How long we’ll try to connect

Return type

requests.Response

Returns

The response

http_post(route='/', params=None, port=None, scheme='http', raise_http_errors=False, timeout=None, **kwargs)

Perform a (http) requests.post to the current host.

Caches cookies in self.http_session

Parameters
  • params (Any) – The parameter

  • route (str) – The route

  • port (Optional[int]) – The remote port in case it has not been specified at creation

  • scheme (str) – The scheme (defaults to http)

  • raise_http_errors (bool) – If True, will raise exception on http error codes (4xx, 5xx)

  • timeout (Optional[float]) – How long we’ll try to connect

Return type

requests.Response

Returns

The response

property http_useragent

Return the useragent for http(s) requests.

Return type

str

Returns

the current useragent

http_useragent_randomize()

Choose a new random http useragent.

Note that http requests will be initialized with a random user agent already. To retrieve a random useragent without setting it, use random instead.

Return type

str

Returns

the new useragent

property noise

Creates a stable noise value for the current task chain. Do not use for indexing (use self.task_chain_id instead).

Return type

str

Returns

A noise string, unique for each task_chain_id.

noise_variants: int = None
abstract putflag()

Store a flag in the service.

In case multiple flags are provided, self.variant_id gives the appropriate flag store to target. The flag itself can be retrieved from self.flag. On error, raise an Eno Exception.

Return type

Optional[str]

Returns

An optional attack info string that will be publicly available to help in exploits (e.g. the username of the user to attack)

Raises

EnoException on error

abstract putnoise()

Store noise in the service.

The noise should later be recoverable. The difference between noise and flag is that noise does not have to remain secret for other teams. This method can be called multiple times per round. Check which variant is called using self.variant_id. On error, raise an EnoException.

Raises

EnoException on error

Return type

None

run(method=None)

Execute the checker and catch errors along the way.

Parameters

method (Optional[CheckerMethod]) – When calling run, you may call a different method than the one passed on Checker creation using this optional param.

Return type

CheckerResult

Returns

A CheckerResult as a representation of the CheckerResult response as definded in the Spec.

search_flag(data)

Search for the flag in the input data using the flag_regex and flag_hash.

Used only in the exploit method.

Return type

Optional[str]

Returns

The flag if it was found, None otherwise

search_flag_bytes(data)

Search for the flag in the input data using the flag_regex and flag_hash, where flag_regex is interpreted as binary regular expression.

Used only in the exploit method.

Return type

Optional[str]

Returns

The flag if it was found, None otherwise

property team_db

Return the database for the current team.

Return type

Union[NoSqlDict, StoredDict]

Returns

The team local db

property time_remaining

Return a remaining time that is safe to be used as timeout.

Includes a buffer of TIME_BUFFER seconds.

Return type

float

Returns

A safe number of seconds that may still be used

property time_running

How long this checker has been running for.

Return type

float

Returns

time this checker has been running for in seconds

enochecker.enochecker.parse_args(argv=None)

Return the parsed argparser args.

Parameters

argv (Optional[Sequence[str]]) – argv. Custom argvs. Will default to sys.argv if not provided.

Return type

Namespace

Returns

args object

enochecker.enochecker.run(checker_cls, args=None, **kwargs)

Run a checker, either from cmdline or as uwsgi script.

Parameters
  • checker – The checker (subclass of basechecker) to run

  • force_service – if True (non-default), the server will skip arg parsing and immediately spawn the web service.

  • args (Optional[Sequence[str]]) – optional parameter, providing parameters

Return type

Optional[CheckerResult]

Returns

Never returns.

enochecker.enochecker.warn_deprecated(old_name, new_name)

Print a warning for the deprecated feature, include the new name in the log This needs python development mode or deprecation warnings enabled!

Return type

None

enochecker.logging module

Utilities for sending log messages to a central ELK.

class enochecker.logging.ELKFormatter(checker, fmt=None, datefmt='%Y-%m-%dT%H:%M:%S%z', style='%')

Bases: logging.Formatter

Format log messages for a central ELK.

format(record)

Format a LogRecord as a string.

Parameters

record (LogRecord) – the record to format

Return type

str

Returns

the formatted string

enochecker.logging.exception_to_string(excp)

Format an exception as a string.

Limits the length of the traceback to 3. :param ecxp: the exception to format :rtype: str :return: The formatted string

enochecker.nosqldict module

Backend for team_db based on MongoDB.

class enochecker.nosqldict.NoSqlDict(name='default', checker_name='BaseChecker', host=None, port=None, username=None, password=None, logger=None, *args, **kwargs)

Bases: collections.abc.MutableMapping

A dictionary that is MongoDb backed.

dblock = <unlocked _thread.RLock object owner=0 count=0>
classmethod get_client(host, port, username, password, logger)

Lazily try to get the mongo db connection or creates a new one.

Parameters
  • host (str) – mongo host

  • port (int) – mongo port

  • username (Optional[str]) – the username to connect to

  • password (Optional[str]) – the password to use

Return type

MongoClient

Returns

persist()

Persist the changes in the backend.

Return type

None

enochecker.nosqldict.value_to_hash(value)

Create a stable hash for a value based on the json representation.

Used to check whether the value in the DB has changed without calling __setitem__, for example when writing to nested dicts.

Parameters

key – the value to hash

Return type

Optional[int]

Returns

hash in string format

enochecker.results module

Collection of Exception classes to signal the status of the service being checked.

exception enochecker.results.BrokenCheckerException(message, internal_message=None)

Bases: enochecker.results.EnoException

Shouldn’t be raised ever since we catch all abstract Errors.

Used internally if something goes horribly wrong.

result: enochecker_core.CheckerTaskResult = 'INTERNAL_ERROR'
exception enochecker.results.BrokenServiceException(message, internal_message=None)

Bases: enochecker.results.EnoException

Indicates a broken Service.

result: enochecker_core.CheckerTaskResult = 'MUMBLE'
class enochecker.results.CheckerResult(result, message=None, attack_info=None, flag=None)

Bases: object

static from_exception(e)

Converts a given Exception to an extended CheckerResult including Message public_message isn’t used anywhere yet

Return type

CheckerResult

exception enochecker.results.EnoException(message, internal_message=None)

Bases: Exception, abc.ABC

Base error including the Result. Raise a subclass of me once we know what to do.

message_contains(flag)

” If the string is in the message

Return type

bool

result: enochecker_core.CheckerTaskResult = 'INTERNAL_ERROR'
exception enochecker.results.OfflineException(message, internal_message=None)

Bases: enochecker.results.EnoException

Service was not reachable (at least once) during our checks.

result: enochecker_core.CheckerTaskResult = 'OFFLINE'

enochecker.storeddict module

Backend for team_db based on a local filesystem directory.

class enochecker.storeddict.StoredDict(name='default', base_path='/home/runner/work/enochecker/enochecker/docs/.data', ignore_locks=False, logger=None, *args, **kwargs)

Bases: collections.abc.MutableMapping

A dictionary that is filesystem backed.

It will write to disk every few seconds and at exit. In case python crashes, changes may be gone. :/ Note: Complex won’t be tracked.

is_locked(key)

Return if the key is currently locked by this process.

Parameters

key (str) – The key

Return type

bool

Returns

True if locked by this process, False otherwise

lock(key)

Wait for a lock.

Parameters

key (str) – the key to lock

Return type

None

mark_dirty(key)

Manually mark an entry as dirty. It will be updated on disk on the next occasion.

Parameters

key (str) – the key that needs to be stored

Return type

Any

Returns

the value contained in the key

persist()

Store all dirty data to disk.

If no data is to be stored, it’s basically free to call.

Return type

None

release(locked_key)

Release a file lock.

Parameters

locked_key (str) – the key we locked

Return type

None

reload()

Reload stored values from disk.

There is usually no reason to call this. Non persisted changes might be lost. Only reason would be if another process fiddles with our data concurrently.

Return type

None

enochecker.storeddict.makedirs(path, exist_ok=True)

Create a directory.

Creates the parent directories if necessary.

param path: the path to create param exist_ok: ignore already existing path and do nothing

Return type

None

enochecker.useragents module

Collection of random usernames to make fingerprinting checker traffic harder.

enochecker.useragents.random_useragent()

Return a random useragent.

Return type

str

Returns

A seemingly valid useragent.

enochecker.utils module

Collection of utilities for checker development.

class enochecker.utils.SimpleSocket(host=None, port=0, timeout=<object object>, logger=None, timeout_fun=None, *args, **kwargs)

Bases: pwnlib.tubes.remote.remote

Convenient socket wrapper using pwnlib’s tubes.remote

Read Pwnlib documentation for more info.

property current_default_timeout

Get the timeout default that should currently be used.

Return type

float

Returns

current timeout default, either from self.timeout_fun or from timeout.

expect(regexes, timeout=None)

Read until one from a list of a regular expressions matches.

Use this to search for anything.

Parameters
  • regexes (Sequence[Union[Pattern[bytes], bytes, str]]) – The first argument is a list of regular expressions, either compiled (re.Pattern instances) or uncompiled (strings).

  • timeout (Optional[float]) – Timeout in seconds. If none, default will be taken.

Return type

Tuple[int, Optional[Match[bytes]], bytes]

Returns

Return a tuple of three items: the index in the list of the first regular expression that matches; the re.Match object returned; and the text read up till and including the match.

read_all()

Read all data until EOF; block until connection closed.

Return type

bytes

Returns

the complete content until EOF

read_n_lines(line_count, delimiter=b'\\n')

Read n lines from socket.

Parameters
  • line_count (int) – the amount of lines to read

  • delimiter (Union[str, bytes]) – what delimeter to use for splitting (could also be non-n)

Return type

List[bytes]

Returns

a list of lines

read_until(match, timeout=None)

Read until the expected string has been seen, or a timeout is hit (default is default socket timeout).

Parameters
  • match (Union[bytes, str]) – what to look for.

  • timeout (Optional[float]) – default socket timeout override

Return type

bytes

Returns

Returns everything until the given math. When no match is found, return whatever is available instead, possibly the empty string. Raise EOFError if the connection is closed and no cooked data is available.

readline_expect(expected, read_until=b'\\n', timeout=None, exception_message=None)

Read to newline (or read_until string) and assert the presence of a string in the response.

Will raise an exception if failed.

Parameters
  • read_until (Union[str, bytes]) – Which parameter to read until

  • expected (Union[str, bytes]) – the expected String to search for in the response

  • timeout (Optional[float]) – The timeout (uses Telnet default if not passed in)

Return read

the bytes read

Return type

bytes

recv(numb=4096, timeout=default) → bytes

Receives up to numb bytes of data from the tube, and returns as soon as any quantity of data is available.

If the request is not satisfied before timeout seconds pass, all data is buffered and an empty string ('') is returned.

Raises:

exceptions.EOFError: The connection is closed

Returns:

A bytes object containing bytes received from the socket, or '' if a timeout occurred while waiting.

Examples:

>>> t = tube()
>>> # Fake a data source
>>> t.recv_raw = lambda n: b'Hello, world'
>>> t.recv() == b'Hello, world'
True
>>> t.unrecv(b'Woohoo')
>>> t.recv() == b'Woohoo'
True
>>> with context.local(log_level='debug'):
...    _ = t.recv() 
[...] Received 0xc bytes:
    b'Hello, world'
write(buffer)

Write a string to the socket.

Can block if the connection is blocked. May raise socket.error if the connection is closed.

Parameters

buffer (Union[str, bytes]) – The buffer to write

Return type

None

enochecker.utils.assert_equals(o1, o2, message=None, autobyteify=False)

Raise a BrokenServiceException if o1 != o2.

Parameters
  • o1 (Any) – the first object

  • o2 (Any) – the second object

  • message (Optional[str]) – The exception message in case of an error (optional)

  • autobyteify (bool) – will call ensure_bytes on both parameters.

Return type

None

enochecker.utils.assert_in(o1, o2, message=None)

Raise a BrokenServiceException if o1 not in o2.

Parameters
  • o1 (Any) – the object that should be in o2

  • o2 (Any) – the object to look in

  • message (Optional[str]) – An optional message that will be part of the error

Return type

None

enochecker.utils.assert_true(expression, message=None, autobyteify=False)

Raise a BrokenServiceException if expression is not True.

Parameters
  • o1 – the element that should be True

  • message (Optional[str]) – The exception message in case of an error (optional)

  • autobyteify (bool) – will call ensure_bytes on both parameters.

Return type

None

enochecker.utils.base64ify(s, altchars=None)

Calculate the base64 representation of a value.

Parameters
  • s (Union[str, bytes]) – the input string

  • altchars (Union[str, bytes, None]) – base64 encodes using the given altchars (or not, if None)

Return type

str

Returns

base64 representation

enochecker.utils.debase64ify(s, altchars=None)

Decode a base64-encoded string.

Parameters
  • s (Union[str, bytes]) – the base64-encoded string

  • altchars (Union[str, bytes, None]) – base64 decodes using the given altchars (or not, if None)

Return type

str

Returns

the decoded value

enochecker.utils.ensure_bytes(obj)

Convert an object to bytes.

If the input is bytes, the value remains unchanged.

Parameters

obj (Union[bytes, str, Any]) – str or bytes (or anything else) to convert to bytes representation

Return type

bytes

Returns

the bytes representation of the object

enochecker.utils.ensure_valid_filename(s, min_length=3)

Get a valid file name from the input.

Parameters
  • s (str) – The input string

  • min_length (int) – if the result is smaller than this, the method will fall back to base64.

Return type

str

Returns

all illegal chars stripped or base64ified if it gets too small

enochecker.utils.sha256ify(s)

Calculate the sha256 hash.

Converts the input to bytes if it is a string.

Parameters

s (Union[str, bytes]) – the string

Return type

str

Returns

the hash in hex representation

enochecker.utils.snake_caseify(camel)

Turn camels into snake (-cases).

Parameters

camel (str) – camelOrSnakeWhatever

Return type

str

Returns

camel_or_snake_whatever

Module contents