Add files via upload

This commit is contained in:
David Rotermund 2023-07-31 15:30:18 +02:00 committed by GitHub
parent c4400d6a2f
commit fc3f078eb9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 396 additions and 0 deletions

325
WebCam.py Normal file
View file

@ -0,0 +1,325 @@
#%%
#
# WebCam.py
# ========================================================
# interface to cv2 for using a webcam or for reading from
# a video file
#
# Version 1.0, before 30.03.2023:
# written by David...
#
# Version 1.1, 30.03.2023:
# thrown out test image
# added test code
# added code to "capture" from video file
#
# Version 1.2, 20.06.2023:
# added code to capture wirelessly from "GoPro" camera
# added test code for "GoPro"
#
# Version 1.3, 23.06.2023
# test display in pyplot or cv2
#
# Version 1.4, 28.06.2023
# solved Windows DirectShow problem
#
from PIL import Image
import os
import cv2
import torch
import torchvision as tv
# for GoPro
import time
import socket
try:
print("Trying to import GoPro modules...")
from goprocam import GoProCamera, constants
gopro_exists = True
except:
print("...not found, continuing!")
gopro_exists = False
import platform
class WebCam:
# test_pattern: torch.Tensor
# test_pattern_gray: torch.Tensor
source: int
framesize: tuple[int, int]
fps: float
cap_frame_width: int
cap_frame_height: int
cap_fps: float
webcam_is_ready: bool
cap_frames_available: int
default_dtype = torch.float32
def __init__(
self,
source: str | int = 1,
framesize: tuple[int, int] = (720, 1280), # (1920, 1080), # (640, 480),
fps: float = 30.0,
):
super().__init__()
assert fps > 0
self.source = source
self.framesize = framesize
self.cap = None
self.fps = fps
self.webcam_is_ready = False
def open_cam(self) -> bool:
if self.cap is not None:
self.cap.release()
self.cap = None
self.webcam_is_ready = False
# handle GoPro...
if self.source == "GoProWireless":
if not gopro_exists:
print("No GoPro driver/support!")
self.webcam_is_ready = False
return False
print("GoPro: Starting access")
gpCam = GoProCamera.GoPro()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("GoPro: Socket created!!!")
self.t = time.time()
gpCam.livestream("start")
gpCam.video_settings(res="1080p", fps="30")
gpCam.gpControlSet(
constants.Stream.WINDOW_SIZE, constants.Stream.WindowSize.R720
)
self.cap = cv2.VideoCapture("udp://10.5.5.9:8554", cv2.CAP_FFMPEG)
print("GoPro: Video capture started!!!")
else:
self.sock = None
self.t = -1
if platform.system().lower() == "windows":
self.cap = cv2.VideoCapture(self.source, cv2.CAP_DSHOW)
else:
self.cap = cv2.VideoCapture(self.source)
print("Normal capture started!!!")
assert self.cap is not None
if self.cap.isOpened() is not True:
self.webcam_is_ready = False
return False
if type(self.source) != str:
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.framesize[0])
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.framesize[1])
self.cap_frames_available = None
else:
# ...for files and GoPro...
self.cap_frames_available = self.cap.get(cv2.CAP_PROP_FRAME_COUNT)
self.cap_frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.cap_frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.cap_fps = float(self.cap.get(cv2.CAP_PROP_FPS))
print(
(
f"Capturing or reading with: {self.cap_frame_width:.0f} x "
f"{self.cap_frame_height:.0f} @ "
f"{self.cap_fps:.1f}."
)
)
self.webcam_is_ready = True
return True
def close_cam(self) -> None:
if self.cap is not None:
self.cap.release()
def get_frame(self) -> torch.Tensor | None:
if self.cap is None:
return None
else:
if self.sock:
dt_min = 0.015
success, frame = self.cap.read()
t_next = time.time()
t_prev = t_next
while t_next - t_prev < dt_min:
t_prev = t_next
success, frame = self.cap.read()
t_next = time.time()
if self.t >= 0:
if time.time() - self.t > 2.5:
print("GoPro-Stream must be kept awake!...")
self.sock.sendto(
"_GPHD_:0:0:2:0.000000\n".encode(), ("10.5.5.9", 8554)
)
self.t = time.time()
else:
success, frame = self.cap.read()
if success is False:
self.webcam_is_ready = False
return None
output = (
torch.tensor(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
.movedim(-1, 0)
.type(dtype=self.default_dtype)
/ 255.0
)
return output
# for testing the code if module is executed from command line
if __name__ == "__main__":
import matplotlib.pyplot as plt
import cv2
TEST_FILEREAD = False
TEST_WEBCAM = True
TEST_GOPRO = False
display = "cv2"
n_capture = 200
delay_capture = 0.001
print("Testing the WebCam interface")
if TEST_FILEREAD:
file_name = "level1.mp4"
# open
print("Opening video file")
w = WebCam(file_name)
if not w.open_cam():
raise OSError(f"Opening file with name {file_name} failed!")
# print information
print(
f"Frame size {w.cap_frame_width} x {w.cap_frame_height} at {w.cap_fps} fps."
)
# capture three frames and show them
for i in range(min([n_capture, w.cap_frames_available])): # TODO: available?
frame = w.get_frame()
if frame == None:
raise OSError(f"Can not get frame from file with name {file_name}!")
print(f"frame {i} has shape {frame.shape}")
frame_numpy = (frame.movedim(0, -1) * 255).type(dtype=torch.uint8).numpy()
if display == "pyplot":
plt.imshow(frame_numpy)
plt.show()
if display == "cv2":
cv2.imshow("File", frame_numpy[:, :, (2, 1, 0)])
cv2.waitKey(1)
time.sleep(delay_capture)
# close
print("Closing file")
w.close_cam()
if TEST_WEBCAM:
camera_index = 0
# open
print("Opening camera")
w = WebCam(camera_index)
if not w.open_cam():
raise OSError(f"Opening web cam with index {camera_index} failed!")
# print information
print(
f"Frame size {w.cap_frame_width} x {w.cap_frame_height} at {w.cap_fps} fps."
)
# capture three frames and show them
for i in range(n_capture):
frame = w.get_frame()
if frame == None:
raise OSError(
f"Can not get frame from camera with index {camera_index}!"
)
print(f"frame {i} has shape {frame.shape}")
frame_numpy = (frame.movedim(0, -1) * 255).type(dtype=torch.uint8).numpy()
if display == "pyplot":
plt.imshow(frame_numpy)
plt.show()
if display == "cv2":
cv2.imshow("WebCam", frame_numpy[:, :, (2, 1, 0)])
cv2.waitKey(1)
time.sleep(delay_capture)
# close
print("Closing camera")
w.close_cam()
if TEST_GOPRO:
camera_name = "GoProWireless"
# open
print("Opening GoPro")
w = WebCam(camera_name)
if not w.open_cam():
raise OSError(f"Opening GoPro with index {camera_index} failed!")
# print information
print(
f"Frame size {w.cap_frame_width} x {w.cap_frame_height} at {w.cap_fps} fps."
)
w.cap.set(cv2.CAP_PROP_BUFFERSIZE, 0)
# capture three frames and show them
# print("Empty Buffer...")
# for i in range(500):
# print(i)
# frame = w.get_frame()
# print("Buffer Emptied...")
for i in range(n_capture):
frame = w.get_frame()
if frame == None:
raise OSError(
f"Can not get frame from camera with index {camera_index}!"
)
print(f"frame {i} has shape {frame.shape}")
frame_numpy = (frame.movedim(0, -1) * 255).type(dtype=torch.uint8).numpy()
if display == "pyplot":
plt.imshow(frame_numpy)
plt.show()
if display == "cv2":
cv2.imshow("GoPro", frame_numpy[:, :, (2, 1, 0)])
cv2.waitKey(1)
time.sleep(delay_capture)
# close
print("Closing Cam/File/GoPro")
w.close_cam()
if display == "cv2":
cv2.destroyAllWindows()
# %%

14
online_receiver.py Normal file
View file

@ -0,0 +1,14 @@
#%%
from communication.communicate_receiver import receiver
from processing_chain.OnlinePerception import OnlinePerception
online_perception = OnlinePerception("cv2", use_gui=True)
url: str = "tcp://*:5555" # number specifies communication channel
receiver(url, online_perception, VERBOSE=False)
# del online_perception
# %%

19
online_sender.py Normal file
View file

@ -0,0 +1,19 @@
#%%
from processing_chain.OnlineEncoding import OnlineEncoding
from communication.communicate_sender import sender
verbose = "cv2"
source = "GoProWireless"
online_encoding = OnlineEncoding(source=source, verbose=verbose)
VERBOSE: bool = False
url: str = "tcp://localhost:5555" # number specifies communication channel
sender(url=url, sender_processing=online_encoding, max_processing=-1, VERBOSE=VERBOSE)
del online_encoding
# %%

19
online_sender_gopro.py Normal file
View file

@ -0,0 +1,19 @@
#%%
from processing_chain.OnlineEncoding import OnlineEncoding
from communication.communicate_sender import sender
verbose = "cv2"
source = "GoProWireless"
online_encoding = OnlineEncoding(source=source, verbose=verbose)
VERBOSE: bool = False
url: str = "tcp://localhost:5555" # number specifies communication channel
sender(url=url, sender_processing=online_encoding, max_processing=-1, VERBOSE=VERBOSE)
del online_encoding
# %%

19
online_sender_webcam0.py Normal file
View file

@ -0,0 +1,19 @@
#%%
from processing_chain.OnlineEncoding import OnlineEncoding
from communication.communicate_sender import sender
verbose = "cv2"
source = 0
online_encoding = OnlineEncoding(source=source, verbose=verbose)
VERBOSE: bool = False
url: str = "tcp://localhost:5555" # number specifies communication channel
sender(url=url, sender_processing=online_encoding, max_processing=-1, VERBOSE=VERBOSE)
del online_encoding
# %%