Add files via upload
This commit is contained in:
parent
30b7ff0357
commit
c0c5018a61
4 changed files with 275 additions and 0 deletions
20
communication/communicate_datapacket.py
Normal file
20
communication/communicate_datapacket.py
Normal file
|
@ -0,0 +1,20 @@
|
|||
#%%
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass
|
||||
class DataPacket:
|
||||
|
||||
# data packet consists of message/command and data associated with it
|
||||
data: dict = field(default_factory=dict)
|
||||
message: str = field(default=None)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test = DataPacket()
|
||||
print(test.data)
|
||||
print(test.message)
|
||||
|
||||
|
||||
# %%
|
82
communication/communicate_receiver.py
Normal file
82
communication/communicate_receiver.py
Normal file
|
@ -0,0 +1,82 @@
|
|||
#%%
|
||||
import zmq
|
||||
import time
|
||||
|
||||
from communication.communicate_datapacket import DataPacket
|
||||
|
||||
|
||||
def receiver(url, receiver_processing, VERBOSE=False):
|
||||
|
||||
with zmq.Context() as context:
|
||||
|
||||
# establishing connection...
|
||||
socket = context.socket(zmq.REP)
|
||||
socket.bind(url)
|
||||
data_in = None
|
||||
data_out_cache = None
|
||||
|
||||
while socket.closed is False:
|
||||
|
||||
# MAIN JOB: here we do a fast task!
|
||||
if VERBOSE:
|
||||
print("RECEIVER: doing fast main job")
|
||||
data_out = receiver_processing.update(data_in)
|
||||
data_in = None # clear after processing!
|
||||
if data_out is not None:
|
||||
data_out_cache = data_out
|
||||
|
||||
# CHECK: is packet available?
|
||||
if VERBOSE:
|
||||
print("RECEIVER: Now looking for packets...")
|
||||
try:
|
||||
packet_in = socket.recv_pyobj(zmq.NOBLOCK)
|
||||
packet_available = True
|
||||
if VERBOSE:
|
||||
print(" ...packet available!")
|
||||
except zmq.error.Again:
|
||||
packet_available = False
|
||||
if VERBOSE:
|
||||
print(" ...nothing found")
|
||||
|
||||
flag_stop = False
|
||||
if packet_available:
|
||||
|
||||
# YES, packet is available - interpret it!
|
||||
if packet_in.message == "stop":
|
||||
flag_stop = True
|
||||
if VERBOSE:
|
||||
print("RECEIVER: I have to stop this program!")
|
||||
if packet_in.message == "sender":
|
||||
data_in = packet_in.data
|
||||
|
||||
if VERBOSE:
|
||||
print("RECEIVER: Confirm data packet was received and processed.")
|
||||
packet_out = DataPacket(data=data_out_cache, message="receiver")
|
||||
print(packet_out)
|
||||
socket.send_pyobj(packet_out)
|
||||
data_out_cache = None
|
||||
|
||||
if flag_stop:
|
||||
if VERBOSE:
|
||||
print("RECEIVER: Trying to close socket!")
|
||||
socket.close()
|
||||
|
||||
if VERBOSE:
|
||||
print("RECEIVER: Socket is closed!")
|
||||
if VERBOSE:
|
||||
print("RECEIVER: Context seems to have been released!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
from test_class_animate import TestClassAnimate
|
||||
|
||||
# count = None
|
||||
url: str = "tcp://*:5555" # number specifies communication channel
|
||||
|
||||
class_processing = TestClassAnimate(show_vertical_bar=True, wait_interval=0.1)
|
||||
receiver(url, class_processing, VERBOSE=True)
|
||||
del class_processing
|
||||
|
||||
|
||||
# %%
|
107
communication/communicate_sender.py
Normal file
107
communication/communicate_sender.py
Normal file
|
@ -0,0 +1,107 @@
|
|||
#%%
|
||||
import zmq
|
||||
import time
|
||||
import math
|
||||
|
||||
from communication.communicate_datapacket import DataPacket
|
||||
from communication.test_class_animate import TestClassAnimate
|
||||
|
||||
|
||||
def sender(url, sender_processing, max_processing=math.inf, VERBOSE=False):
|
||||
|
||||
context = zmq.Context()
|
||||
|
||||
socket = context.socket(zmq.REQ)
|
||||
with socket.connect(url):
|
||||
|
||||
try:
|
||||
cur_processing = 0
|
||||
running = True
|
||||
data_in = None
|
||||
while running:
|
||||
|
||||
# MAIN JOB: the following function performs heavy
|
||||
# data processing and sends a result to the receiving process
|
||||
data_out = sender_processing.update(data_in)
|
||||
packet_out = DataPacket(data=data_out, message="sender")
|
||||
socket.send_pyobj(packet_out)
|
||||
|
||||
# CONFIRMATION: Waits indefinitely until receiving a
|
||||
# confirmation data packet. Can be stopped by pressing
|
||||
# CTRL-C, which is handled below as exception
|
||||
if VERBOSE:
|
||||
print("SENDER: Sent, waiting for receiver confirmation...")
|
||||
confirm = False
|
||||
while not confirm:
|
||||
try:
|
||||
packet_in = socket.recv_pyobj(zmq.NOBLOCK)
|
||||
confirm = True
|
||||
except zmq.error.Again:
|
||||
pass
|
||||
if not packet_in.message == "receiver":
|
||||
if VERBOSE:
|
||||
print("SENDER: Something went wrong, exiting!")
|
||||
running = False
|
||||
else:
|
||||
if VERBOSE:
|
||||
print("SENDER: Receiver confirmed")
|
||||
data_in = packet_in.data
|
||||
if data_in:
|
||||
if "exit" in data_in.keys():
|
||||
if VERBOSE:
|
||||
print("Exit received!")
|
||||
raise KeyboardInterrupt
|
||||
|
||||
# CHECK: if maximum iterations reached...
|
||||
cur_processing += 1
|
||||
if max_processing != -1:
|
||||
if cur_processing >= max_processing:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
except KeyboardInterrupt:
|
||||
if VERBOSE:
|
||||
print("SENDER: Someone wants to killlllll meeeeeeeeeeee!")
|
||||
|
||||
# CTRL-C has pressed, try to stop other process
|
||||
if VERBOSE:
|
||||
print("SENDER: Generating and sending packet...")
|
||||
packet_out = DataPacket(message="stop")
|
||||
socket.send_pyobj(packet_out)
|
||||
|
||||
# CONFIRMATION: Loops and waits for confirmation that data was received
|
||||
if VERBOSE:
|
||||
print("SENDER: Waiting for reception confirmation...")
|
||||
confirm = False
|
||||
while not confirm:
|
||||
try:
|
||||
packet_in = socket.recv_pyobj(zmq.NOBLOCK)
|
||||
confirm = True
|
||||
except zmq.error.Again:
|
||||
pass
|
||||
|
||||
if not packet_in.message == "receiver":
|
||||
if VERBOSE:
|
||||
print("SENDER: Something went wrong, but we're exiting anyway!")
|
||||
|
||||
# exit gracefully...
|
||||
if VERBOSE:
|
||||
print("Closing socket and destroying context!")
|
||||
socket.close()
|
||||
context.destroy()
|
||||
if VERBOSE:
|
||||
print("Done with everything!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
VERBOSE: bool = True
|
||||
url: str = "tcp://localhost:5555" # number specifies communication channel
|
||||
|
||||
class_processing = TestClassAnimate(show_horizontal_bar=True, wait_interval=5)
|
||||
sender(
|
||||
url=url, sender_processing=class_processing, max_processing=3, VERBOSE=VERBOSE
|
||||
)
|
||||
del class_processing
|
||||
|
||||
|
||||
# %%
|
66
communication/test_class_animate.py
Normal file
66
communication/test_class_animate.py
Normal file
|
@ -0,0 +1,66 @@
|
|||
#%%
|
||||
import cv2
|
||||
import numpy as np
|
||||
import time
|
||||
|
||||
from communication.communicate_datapacket import DataPacket
|
||||
|
||||
|
||||
class TestClassAnimate:
|
||||
|
||||
dx: int = 400
|
||||
dy: int = 400
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
show_vertical_bar: bool = False,
|
||||
show_horizontal_bar: bool = False,
|
||||
wait_interval: float = 5.0,
|
||||
):
|
||||
self.show_vertical_bar = show_vertical_bar
|
||||
self.show_horizontal_bar = show_horizontal_bar
|
||||
self.count_vertical_bar = 0
|
||||
self.count_horizontal_bar = 0
|
||||
self.wait_interval = wait_interval
|
||||
|
||||
def update(self, data_in: dict):
|
||||
|
||||
if self.show_vertical_bar:
|
||||
self.count_vertical_bar = np.mod(self.count_vertical_bar + 1, self.dx)
|
||||
vertical_image = np.zeros((self.dy, self.dx, 3))
|
||||
vertical_image[:, self.count_vertical_bar, 0] = 1.0
|
||||
cv2.imshow("Vertical Bar", vertical_image)
|
||||
cv2.waitKey(1)
|
||||
|
||||
if self.show_horizontal_bar:
|
||||
self.count_horizontal_bar = np.mod(self.count_horizontal_bar + 2, self.dx)
|
||||
horizontal_image = np.zeros((self.dy, self.dx, 3))
|
||||
horizontal_image[self.count_horizontal_bar, :, 2] = 1.0
|
||||
cv2.imshow("Horizontal Bar", horizontal_image)
|
||||
cv2.waitKey(1)
|
||||
|
||||
print(f"Waiting for {self.wait_interval} seconds...")
|
||||
time.sleep(self.wait_interval)
|
||||
|
||||
print(f"Input data was: {data_in}")
|
||||
print("Generating output data...")
|
||||
data_out = {
|
||||
"CountVert": self.count_vertical_bar,
|
||||
"CountHori": self.count_horizontal_bar,
|
||||
}
|
||||
|
||||
return data_out
|
||||
|
||||
def __del__(self):
|
||||
if self.show_horizontal_bar or self.show_vertical_bar:
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
tcas = TestClassAnimate(wait_interval=0.5, show_horizontal_bar=True)
|
||||
for i in range(50):
|
||||
data_out = tcas.update(None)
|
||||
print(f"Iteration {i}, data is: {data_out}")
|
||||
del tcas
|
||||
|
||||
# %%
|
Loading…
Reference in a new issue