mirror of
https://github.com/caperren/project_archives.git
synced 2025-11-08 13:31:14 +00:00
Added ground station mockup
This commit is contained in:
131
rover_base_station_mockup/Framework/FlasherTestCore.py
Normal file
131
rover_base_station_mockup/Framework/FlasherTestCore.py
Normal file
@@ -0,0 +1,131 @@
|
||||
from PyQt5 import QtWidgets, QtCore, QtGui
|
||||
import time
|
||||
|
||||
|
||||
class Flasher(QtCore.QThread):
|
||||
THREAD_DELAY = 100
|
||||
FLASH_PERIOD = 1 # Per Second
|
||||
|
||||
update_label_status__signal = QtCore.pyqtSignal()
|
||||
|
||||
def __init__(self, label_gui_element, subject_text, good_text, warning_text, bad_text):
|
||||
super(Flasher, self).__init__()
|
||||
|
||||
self.subject_text = subject_text
|
||||
|
||||
self.label = label_gui_element # type: QtWidgets.QLabel
|
||||
|
||||
self.not_abort = True
|
||||
|
||||
self.statuses = {
|
||||
"GOOD": {
|
||||
"style": "background-color: green;",
|
||||
"text": good_text
|
||||
},
|
||||
"WARNING": {
|
||||
"style": "background-color: yellow;",
|
||||
"text": warning_text
|
||||
},
|
||||
"BAD": {
|
||||
"style": "background-color: red;",
|
||||
"text": bad_text
|
||||
},
|
||||
"NEUTRAL": {
|
||||
"style": ""
|
||||
},
|
||||
"UNKNOWN": {
|
||||
"style": "background-color: black;",
|
||||
"text": "STATUS UNKNOWN"
|
||||
}
|
||||
}
|
||||
|
||||
self.label_status = "BAD"
|
||||
|
||||
self.current_label_status = self.label_status
|
||||
self.last_label_status = self.current_label_status
|
||||
|
||||
self.connect_signals_to_slots()
|
||||
|
||||
self.last_time = time.time()
|
||||
|
||||
def run(self):
|
||||
while self.not_abort:
|
||||
if (time.time() - self.last_time) >= self.FLASH_PERIOD:
|
||||
if self.current_label_status in ["BAD", "WARNING", "UNKNOWN"]:
|
||||
self.current_label_status = "NEUTRAL"
|
||||
elif self.current_label_status == "NEUTRAL":
|
||||
self.current_label_status = self.label_status
|
||||
|
||||
self.update_label_status__signal.emit()
|
||||
self.last_time = time.time()
|
||||
|
||||
self.msleep(self.THREAD_DELAY)
|
||||
|
||||
def connect_signals_to_slots(self):
|
||||
self.update_label_status__signal.connect(self.set_style__slot)
|
||||
|
||||
def set_label_status__slot(self, status):
|
||||
text = self.subject_text
|
||||
|
||||
if status in self.statuses:
|
||||
self.label_status = status
|
||||
text += "\n" + self.statuses[self.label_status]["text"]
|
||||
else:
|
||||
self.label_status = "UNKNOWN"
|
||||
|
||||
self.label.setText(text)
|
||||
|
||||
def set_style__slot(self):
|
||||
self.label.setStyleSheet(self.statuses[self.current_label_status]["style"])
|
||||
|
||||
|
||||
class FlasherTest(QtCore.QThread):
|
||||
set_style_sheets_signal = QtCore.pyqtSignal()
|
||||
start_flashers__signal = QtCore.pyqtSignal()
|
||||
|
||||
def __init__(self, main_window):
|
||||
super(FlasherTest, self).__init__()
|
||||
|
||||
self.main_window = main_window
|
||||
|
||||
self.not_abort = True
|
||||
|
||||
self.label = self.main_window.label # type: QtWidgets.QLabel
|
||||
self.label_2 = self.main_window.label_3 # type: QtWidgets.QLabel
|
||||
self.label_3 = self.main_window.label_4 # type: QtWidgets.QLabel
|
||||
|
||||
self.rover_connected_flasher = Flasher(self.label, "ROVER", "CONNECTED", "INTERMITTENT", "DISCONNECTED")
|
||||
self.controller_connected_flasher = Flasher(self.label_2, "CONTROLLER", "CONNECTED", "INTERMITTENT", "DISCONNECTED")
|
||||
self.frsky_controller_connected_flasher = Flasher(self.label_3, "FRSKY", "CONNECTED", "INTERMITTENT", "DISCONNECTED")
|
||||
|
||||
self.rover_connected_flasher.set_label_status__slot("GOOD")
|
||||
self.controller_connected_flasher.set_label_status__slot("WARNING")
|
||||
self.frsky_controller_connected_flasher.set_label_status__slot("BAD")
|
||||
|
||||
self.start_flashers__signal.connect(self.rover_connected_flasher.start)
|
||||
self.start_flashers__signal.connect(self.controller_connected_flasher.start)
|
||||
self.start_flashers__signal.connect(self.frsky_controller_connected_flasher.start)
|
||||
|
||||
self.start_flashers__signal.emit()
|
||||
|
||||
self.main_window.kill_threads_signal.connect(self.on_kill_threads__slot)
|
||||
|
||||
def run(self):
|
||||
start_time = time.time()
|
||||
while self.not_abort:
|
||||
if time.time() - start_time > 1:
|
||||
self.set_style_sheets_signal.emit()
|
||||
start_time = time.time()
|
||||
self.msleep(100)
|
||||
|
||||
def set_style_sheets(self):
|
||||
if not self.label_last_color:
|
||||
self.label_2.setStyleSheet("background-color: yellow;")
|
||||
self.label_last_color = "asdf"
|
||||
else:
|
||||
self.label_2.setStyleSheet("")
|
||||
self.label_last_color = ""
|
||||
|
||||
def on_kill_threads__slot(self):
|
||||
self.rover_connected_flasher.terminate()
|
||||
self.not_abort = False
|
||||
164
rover_base_station_mockup/Framework/VideoCore.py
Normal file
164
rover_base_station_mockup/Framework/VideoCore.py
Normal file
@@ -0,0 +1,164 @@
|
||||
from PyQt5 import QtWidgets, QtCore, QtGui, QtMultimediaWidgets
|
||||
import time
|
||||
import cv2
|
||||
import qimage2ndarray
|
||||
import PIL.Image
|
||||
from PIL.ImageQt import ImageQt
|
||||
|
||||
CV_CAP_PROP_FRAME_WIDTH = 3
|
||||
CV_CAP_PROP_FRAME_HEIGHT = 4
|
||||
CV_CAP_PROP_FPS = 5
|
||||
|
||||
|
||||
class Compass(QtCore.QThread):
|
||||
compass_ready__signal = QtCore.pyqtSignal()
|
||||
|
||||
def __init__(self, main_window):
|
||||
super(Compass, self).__init__()
|
||||
|
||||
self.main_window = main_window
|
||||
|
||||
self.not_abort = True
|
||||
|
||||
self.image = PIL.Image.open("Resources/Images/compass.png").resize((300, 300)) # PIL.Image
|
||||
self.compass_dir = 1
|
||||
|
||||
self.compass_label = self.main_window.compass_label # type: QtWidgets.QLabel
|
||||
self.compass_frame = None
|
||||
self.compass_current_degrees_rotation = 0
|
||||
|
||||
self.main_window.kill_threads_signal.connect(self.on_kill_threads__slot)
|
||||
self.compass_ready__signal.connect(self.on_compass_ready__slot)
|
||||
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
while self.not_abort:
|
||||
new = self.image.rotate(int(self.compass_current_degrees_rotation))
|
||||
|
||||
if self.compass_current_degrees_rotation == 90:
|
||||
self.compass_dir = 0
|
||||
elif self.compass_current_degrees_rotation == -90:
|
||||
self.compass_dir = 1
|
||||
|
||||
if self.compass_dir:
|
||||
self.compass_current_degrees_rotation += 1
|
||||
else:
|
||||
self.compass_current_degrees_rotation -= 1
|
||||
|
||||
self.compass_frame = QtGui.QPixmap.fromImage(ImageQt(new))
|
||||
self.compass_ready__signal.emit()
|
||||
|
||||
self.msleep(50)
|
||||
|
||||
def on_compass_ready__slot(self):
|
||||
self.compass_label.setPixmap(self.compass_frame)
|
||||
|
||||
def on_kill_threads__slot(self):
|
||||
self.not_abort = False
|
||||
|
||||
|
||||
class VideoReader(QtCore.QThread):
|
||||
video_frame_ready__signal = QtCore.pyqtSignal()
|
||||
|
||||
def __init__(self, main_window):
|
||||
super(VideoReader, self).__init__()
|
||||
|
||||
self.main_window = main_window
|
||||
|
||||
self.not_abort = True
|
||||
|
||||
self.primary_display_label = self.main_window.primary_display_label # type: QtWidgets.QLabel
|
||||
|
||||
self.main_window.kill_threads_signal.connect(self.on_kill_threads__slot)
|
||||
self.video_frame_ready__signal.connect(self.on_compass_ready__slot)
|
||||
|
||||
# self.video = cv2.imread("Resources/Videos/ROSS1.MOV")
|
||||
|
||||
self.video_frame = None
|
||||
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
while self.not_abort:
|
||||
frame = self.video_camera.read()[1]
|
||||
|
||||
# frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
# frame = cv2.resize(frame, (1280, 720))
|
||||
# self.video_frame = QtGui.QPixmap.fromImage(qimage2ndarray.array2qimage(frame))
|
||||
|
||||
self.video_frame_ready__signal.emit()
|
||||
|
||||
self.msleep(50)
|
||||
|
||||
def on_compass_ready__slot(self):
|
||||
self.primary_display_label.setPixmap(self.video_frame)
|
||||
|
||||
def on_kill_threads__slot(self):
|
||||
self.not_abort = False
|
||||
|
||||
|
||||
class Video(QtCore.QThread):
|
||||
image_ready_signal = QtCore.pyqtSignal()
|
||||
|
||||
def __init__(self, main_window):
|
||||
super(Video, self).__init__()
|
||||
|
||||
self.main_window = main_window
|
||||
|
||||
self.not_abort = True
|
||||
|
||||
self.primary_display_label = self.main_window.primary_display_label # type: QtWidgets.QLabel
|
||||
self.secondary_display_label = self.main_window.secondary_display_label # type: QtWidgets.QLabel
|
||||
self.tertiary_display_label = self.main_window.tertiary_display_label # type: QtWidgets.QLabel
|
||||
|
||||
self.video_camera = cv2.VideoCapture(0)
|
||||
|
||||
self.video_camera.set(CV_CAP_PROP_FRAME_WIDTH, 1280)
|
||||
self.video_camera.set(CV_CAP_PROP_FRAME_HEIGHT, 720)
|
||||
self.video_camera.set(CV_CAP_PROP_FPS, 15)
|
||||
|
||||
self.main_window.kill_threads_signal.connect(self.on_kill_threads__slot)
|
||||
self.image_ready_signal.connect(self.on_image_ready__slot)
|
||||
|
||||
self.raw_frame = None
|
||||
self.large_frame = None
|
||||
self.small_frame = None
|
||||
|
||||
self.compass = Compass(self.main_window)
|
||||
# self.video_reader = VideoReader(self.main_window)
|
||||
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
frame_count = 0
|
||||
start_time = time.time()
|
||||
|
||||
while self.not_abort:
|
||||
frame = self.video_camera.read()[1]
|
||||
|
||||
self.raw_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
self.large_frame = self.raw_frame
|
||||
self.small_frame = cv2.resize(self.raw_frame, (640, 360))
|
||||
self.large_frame = QtGui.QPixmap.fromImage(qimage2ndarray.array2qimage(self.large_frame))
|
||||
self.small_frame = QtGui.QPixmap.fromImage(qimage2ndarray.array2qimage(self.small_frame))
|
||||
|
||||
self.image_ready_signal.emit()
|
||||
|
||||
frame_count += 1
|
||||
fps = frame_count / (time.time() - start_time)
|
||||
# print("FPS:", fps)
|
||||
|
||||
self.msleep(1)
|
||||
|
||||
self.compass.wait()
|
||||
#self.video_reader.wait()
|
||||
|
||||
def on_image_ready__slot(self):
|
||||
pass
|
||||
self.primary_display_label.setPixmap(self.large_frame)
|
||||
self.secondary_display_label.setPixmap(self.small_frame)
|
||||
self.tertiary_display_label.setPixmap(self.small_frame)
|
||||
|
||||
def on_kill_threads__slot(self):
|
||||
self.not_abort = False
|
||||
Reference in New Issue
Block a user