Source code for localite.flow.mitm

from localite.flow.ext import EXT
from localite.flow.loc import LOC
from localite.flow.mrk import MRK
from localite.flow.ctrl import CTRL
from localite.flow.payload import Queue
from localite.flow.ext import push
from subprocess import Popen, PIPE
import time
from typing import Tuple


def start_threaded(
    loc_host: str, loc_port: int = 6666, address: Tuple[str, int] = ("127.0.0.1", 6667)
):
    """starts the whole flow-pipeline as threads within the local process

    args
    ----

    loc_host: str
        the ip-adress of the localite PC

    loc_port: int = 6666
        the port of the localite Server

    ext: Tuple[str, int]  ("127.0.0.1", 6667)
        the host:port where the localite-flow server will be setup 
    
    """
    queue = Queue()
    locbox = Queue()
    mrkbox = Queue()
    ext = EXT(host=address[0], port=address[1], queue=queue)
    ctrl = CTRL(queue=queue, loc=locbox, mrk=mrkbox)
    loc = LOC(outbox=queue, inbox=locbox, address=(loc_host, loc_port))
    mrk = MRK(mrk=mrkbox)
    mrk.start()
    loc.start()
    loc.await_running()
    mrk.await_running()
    ctrl.start()
    ctrl.await_running()
    ext.start()
    ext.await_running()


[docs]def kill(ext: Tuple[str, int] = ("127.0.0.1", 6667)): """ kill the localite-flow at the given address, whether it runs as a subprocess or in a local thread args ---- ext: Tuple[str, int] = ("127.0.0.1", 6667) the host:port where the localite-flow server was setup """ push("cmd", "poison-pill", host=ext[0], port=ext[1])
[docs]def start(host: str): """starts the localite-flow as a subprocess You can stop the subprocess gracefully using :meth:`~localite.flow.mitm.kill` args ---- host: str the ip adress of the localite PC """ from localite.flow.ext import available p = Popen(["localite-flow", "--host", host], stderr=PIPE, stdout=PIPE) print("[", end="") while not available(): # pragma no cover print(".", end="") time.sleep(0.5) print("]") return p