diff --git a/communication/communicate_datapacket.py b/communication/communicate_datapacket.py new file mode 100644 index 0000000..e1d439c --- /dev/null +++ b/communication/communicate_datapacket.py @@ -0,0 +1,20 @@ +#%% + +from dataclasses import dataclass, field + + +@dataclass +class DataPacket: + + # data packet consists of message/command and data associated with it + data: dict = field(default_factory=dict) + message: str = field(default=None) + + +if __name__ == "__main__": + test = DataPacket() + print(test.data) + print(test.message) + + +# %% diff --git a/communication/communicate_receiver.py b/communication/communicate_receiver.py new file mode 100644 index 0000000..74b015e --- /dev/null +++ b/communication/communicate_receiver.py @@ -0,0 +1,82 @@ +#%% +import zmq +import time + +from communication.communicate_datapacket import DataPacket + + +def receiver(url, receiver_processing, VERBOSE=False): + + with zmq.Context() as context: + + # establishing connection... + socket = context.socket(zmq.REP) + socket.bind(url) + data_in = None + data_out_cache = None + + while socket.closed is False: + + # MAIN JOB: here we do a fast task! + if VERBOSE: + print("RECEIVER: doing fast main job") + data_out = receiver_processing.update(data_in) + data_in = None # clear after processing! + if data_out is not None: + data_out_cache = data_out + + # CHECK: is packet available? + if VERBOSE: + print("RECEIVER: Now looking for packets...") + try: + packet_in = socket.recv_pyobj(zmq.NOBLOCK) + packet_available = True + if VERBOSE: + print(" ...packet available!") + except zmq.error.Again: + packet_available = False + if VERBOSE: + print(" ...nothing found") + + flag_stop = False + if packet_available: + + # YES, packet is available - interpret it! + if packet_in.message == "stop": + flag_stop = True + if VERBOSE: + print("RECEIVER: I have to stop this program!") + if packet_in.message == "sender": + data_in = packet_in.data + + if VERBOSE: + print("RECEIVER: Confirm data packet was received and processed.") + packet_out = DataPacket(data=data_out_cache, message="receiver") + print(packet_out) + socket.send_pyobj(packet_out) + data_out_cache = None + + if flag_stop: + if VERBOSE: + print("RECEIVER: Trying to close socket!") + socket.close() + + if VERBOSE: + print("RECEIVER: Socket is closed!") + if VERBOSE: + print("RECEIVER: Context seems to have been released!") + + +if __name__ == "__main__": + + from test_class_animate import TestClassAnimate + + # count = None + url: str = "tcp://*:5555" # number specifies communication channel + + class_processing = TestClassAnimate(show_vertical_bar=True, wait_interval=0.1) + receiver(url, class_processing, VERBOSE=True) + del class_processing + + +# %% diff --git a/communication/communicate_sender.py b/communication/communicate_sender.py new file mode 100644 index 0000000..1345882 --- /dev/null +++ b/communication/communicate_sender.py @@ -0,0 +1,107 @@ +#%% +import zmq +import time +import math + +from communication.communicate_datapacket import DataPacket +from communication.test_class_animate import TestClassAnimate + + +def sender(url, sender_processing, max_processing=math.inf, VERBOSE=False): + + context = zmq.Context() + + socket = context.socket(zmq.REQ) + with socket.connect(url): + + try: + cur_processing = 0 + running = True + data_in = None + while running: + + # MAIN JOB: the following function performs heavy + # data processing and sends a result to the receiving process + data_out = sender_processing.update(data_in) + packet_out = DataPacket(data=data_out, message="sender") + socket.send_pyobj(packet_out) + + # CONFIRMATION: Waits indefinitely until receiving a + # confirmation data packet. Can be stopped by pressing + # CTRL-C, which is handled below as exception + if VERBOSE: + print("SENDER: Sent, waiting for receiver confirmation...") + confirm = False + while not confirm: + try: + packet_in = socket.recv_pyobj(zmq.NOBLOCK) + confirm = True + except zmq.error.Again: + pass + if not packet_in.message == "receiver": + if VERBOSE: + print("SENDER: Something went wrong, exiting!") + running = False + else: + if VERBOSE: + print("SENDER: Receiver confirmed") + data_in = packet_in.data + if data_in: + if "exit" in data_in.keys(): + if VERBOSE: + print("Exit received!") + raise KeyboardInterrupt + + # CHECK: if maximum iterations reached... + cur_processing += 1 + if max_processing != -1: + if cur_processing >= max_processing: + raise KeyboardInterrupt + + except KeyboardInterrupt: + if VERBOSE: + print("SENDER: Someone wants to killlllll meeeeeeeeeeee!") + + # CTRL-C has pressed, try to stop other process + if VERBOSE: + print("SENDER: Generating and sending packet...") + packet_out = DataPacket(message="stop") + socket.send_pyobj(packet_out) + + # CONFIRMATION: Loops and waits for confirmation that data was received + if VERBOSE: + print("SENDER: Waiting for reception confirmation...") + confirm = False + while not confirm: + try: + packet_in = socket.recv_pyobj(zmq.NOBLOCK) + confirm = True + except zmq.error.Again: + pass + + if not packet_in.message == "receiver": + if VERBOSE: + print("SENDER: Something went wrong, but we're exiting anyway!") + + # exit gracefully... + if VERBOSE: + print("Closing socket and destroying context!") + socket.close() + context.destroy() + if VERBOSE: + print("Done with everything!") + + +if __name__ == "__main__": + + VERBOSE: bool = True + url: str = "tcp://localhost:5555" # number specifies communication channel + + class_processing = TestClassAnimate(show_horizontal_bar=True, wait_interval=5) + sender( + url=url, sender_processing=class_processing, max_processing=3, VERBOSE=VERBOSE + ) + del class_processing + + +# %% diff --git a/communication/test_class_animate.py b/communication/test_class_animate.py new file mode 100644 index 0000000..7b6b4da --- /dev/null +++ b/communication/test_class_animate.py @@ -0,0 +1,66 @@ +#%% +import cv2 +import numpy as np +import time + +from communication.communicate_datapacket import DataPacket + + +class TestClassAnimate: + + dx: int = 400 + dy: int = 400 + + def __init__( + self, + show_vertical_bar: bool = False, + show_horizontal_bar: bool = False, + wait_interval: float = 5.0, + ): + self.show_vertical_bar = show_vertical_bar + self.show_horizontal_bar = show_horizontal_bar + self.count_vertical_bar = 0 + self.count_horizontal_bar = 0 + self.wait_interval = wait_interval + + def update(self, data_in: dict): + + if self.show_vertical_bar: + self.count_vertical_bar = np.mod(self.count_vertical_bar + 1, self.dx) + vertical_image = np.zeros((self.dy, self.dx, 3)) + vertical_image[:, self.count_vertical_bar, 0] = 1.0 + cv2.imshow("Vertical Bar", vertical_image) + cv2.waitKey(1) + + if self.show_horizontal_bar: + self.count_horizontal_bar = np.mod(self.count_horizontal_bar + 2, self.dx) + horizontal_image = np.zeros((self.dy, self.dx, 3)) + horizontal_image[self.count_horizontal_bar, :, 2] = 1.0 + cv2.imshow("Horizontal Bar", horizontal_image) + cv2.waitKey(1) + + print(f"Waiting for {self.wait_interval} seconds...") + time.sleep(self.wait_interval) + + print(f"Input data was: {data_in}") + print("Generating output data...") + data_out = { + "CountVert": self.count_vertical_bar, + "CountHori": self.count_horizontal_bar, + } + + return data_out + + def __del__(self): + if self.show_horizontal_bar or self.show_vertical_bar: + cv2.destroyAllWindows() + + +if __name__ == "__main__": + tcas = TestClassAnimate(wait_interval=0.5, show_horizontal_bar=True) + for i in range(50): + data_out = tcas.update(None) + print(f"Iteration {i}, data is: {data_out}") + del tcas + +# %%