Added 2016-2017 rover code, and a ROB assignment

This commit is contained in:
2017-11-29 12:46:14 -08:00
parent 4566d98b5f
commit cb3ce5dafc
1414 changed files with 5874 additions and 0 deletions

View File

@@ -0,0 +1,162 @@
import numpy
import scipy
import matplotlib.pyplot as pyplot
import time
numpy.random.seed(int(time.time()))
PART_1_COEFFICIENTS = numpy.array([-0.1, 4.0, -0.1, 10.0], float)
PART_1_X_LIMITS = [-10.0, 25.0]
def plot_form(axis_handle, x_limit=None, title="", x_label="x", y_label="f(x)"):
if x_limit is not None:
axis_handle.set_xlim(x_limit)
axis_handle.set_title(title)
axis_handle.set_xlabel(x_label)
axis_handle.set_ylabel(y_label)
def part_1_polynomial(x_input):
return numpy.polyval(PART_1_COEFFICIENTS, x_input)
def part_2_plot():
x_limit_min = PART_1_X_LIMITS[0]
x_limit_max = PART_1_X_LIMITS[1]
temp = numpy.linspace(x_limit_min, x_limit_max, 351, dtype=float)
function_handle_1, axis_handle_1 = pyplot.subplots()
axis_handle_1.plot(temp, part_1_polynomial(temp), "b-")
plot_form(axis_handle_1, PART_1_X_LIMITS, "Original Polynomial")
function_handle_1.savefig("figures/hw0_original_polynomial.pdf", bbox_inches="tight")
def part_3():
x_limit_min = PART_1_X_LIMITS[0]
x_limit_max = PART_1_X_LIMITS[1]
bin_width = (x_limit_max-x_limit_min) / 14.0
x_bin = numpy.arange(x_limit_min, x_limit_max, bin_width, float)
y_bin = part_1_polynomial(x_bin)
function_handle, axis_handle = pyplot.subplots()
axis_handle.bar(x_bin + bin_width/2.0, y_bin, width=bin_width, edgecolor="k")
plot_form(axis_handle, PART_1_X_LIMITS, "Discretized Bins")
function_handle.savefig("figures/hw0_discretized_bins.pdf", bbox_inches="tight")
def part_4():
x_limit_min = PART_1_X_LIMITS[0]
x_limit_max = PART_1_X_LIMITS[1]
bin_width = (x_limit_max - x_limit_min) / 14.0
x_bin = numpy.arange(x_limit_min, x_limit_max, bin_width, float)
y_bin = part_1_polynomial(x_bin)
y_bin_normalized = y_bin / y_bin.sum()
function_handle, axis_handle = pyplot.subplots()
axis_handle.bar(x_bin + bin_width / 2.0, y_bin_normalized, width=bin_width, edgecolor="k")
plot_form(axis_handle, PART_1_X_LIMITS, "Discretized Bins (Normalized) sum=%s" % y_bin_normalized.sum(), y_label="p(k)")
function_handle.savefig("figures/hw0_discretized_bins_normalized.pdf", bbox_inches="tight")
def part_5_1():
num_samples = 500
x_rand_values = numpy.arange(1, num_samples+1, 1, int)
y_rand_values = numpy.random.random(num_samples)
function_handle, axis_handle = pyplot.subplots()
pyplot.plot(x_rand_values, y_rand_values, "k+")
plot_form(axis_handle, x_limit=[1, num_samples], title="%s Samples, Uniformly Distributed" % num_samples)
function_handle.savefig("figures/hw0_%s_random_samples.pdf" % num_samples, bbox_inches="tight")
return (x_rand_values, y_rand_values)
def part_5_2(vals):
num_samples = 500
x_limit_min = PART_1_X_LIMITS[0]
x_limit_max = PART_1_X_LIMITS[1]
bin_width = (x_limit_max - x_limit_min) / 14.0
x_bin = numpy.arange(x_limit_min, x_limit_max, bin_width, float)
x_rand_values = vals[0]
y_rand_values = vals[1]
y_random_scaled = y_rand_values * ((x_limit_max - x_limit_min) + x_limit_min)
function_handle, axis_handle = pyplot.subplots()
pyplot.plot(x_rand_values, y_random_scaled, "k+")
for i in range(0, len(x_bin)):
axis_handle.plot([1, num_samples], [x_bin[0], x_bin[1]])
plot_form(axis_handle, [1, num_samples], "Random Samples Mapped to X Ranges Of Bins")
function_handle.savefig("figures/hw0_random_bins_to_ranges.pdf", bbox_inches="tight")
def part_5_3():
y_count_incorrect = numpy.zeros(x_bin.shape)
for i in range(0, len(y_rand_scaled)):
for j in range(len(x_bin), 0, -1):
if y_rand_scaled[i] > x_bin[j-1]:
y_count_incorrect[j-1] += 1
break
function_handle, axis_handle = pyplot.subplots()
pyplot.plot(x_bin+b_width/2.0, y_random_incorrect, "k+")
plot_form(axis_handle, PART_1_X_LIMITS, "Samples per bin (incorrect)", bbox_inches="tight")
#savefig "hw0_samples_per_bin_incorrect.pdf"
def part_5_4():
y_bin_cdf = y_bin_normalized.copy()
i = 0
while i < len(y_bin_cdf) - 1:
i += 1
y_bin_cdf[i] += y_bin_cdf[i-1]
function_handle, axis_handle = pyplot.subplots()
axis_handle.plot(x_rand, y_rand, "k+")
for i in range(0, len(y_bin_cdf)):
axis_handle.plot([1, num_samples], [y_bin_cdf[0], y_bin_cdf[1]])
axis_handle.set_title("Dividing up the samples according to bin height")
function_handle.savefig("hw0_correct_sample_division.pdf", bbox_inches="tight")
y_count_correct = numpy.zeros(x_bin.shape)
for i in range(0, len(y_rand)):
for j in range(len_bin_cdf):
if y_rand[i] < y_bin_cdf[j]:
y_count_correct[j] += 1
break
function_handle_1, axis_handle_1 = pyplot.subplots()
axis_handle_1.bar(x_bin + b_width/2.0, y_count_correct, width=b_width, edgecolor="k")
plot_form(axis_handle_1, x_limit=PART_1_X_LIMITS, "Samples per bin (correct)", y_label="samples")
function_handle.savefig("hw0_samples_per_bin_correct.pdf", bbox_inches="tight")
def real_part_2():
pass
if __name__ == '__main__':
# part_2_plot()
# part_3()
# part_4()
# vals = part_5_1()
# part_5_2(vals)
real_part_2()
pyplot.show()

View File

@@ -0,0 +1,25 @@
set(PACKETS_SOURCES
"${CMAKE_BINARY_DIR}/common/packets.cpp"
)
set(PACKETS_HEADERS
"${CMAKE_BINARY_DIR}/common/packets.h"
)
add_custom_command(
OUTPUT ${PACKETS_SOURCES} ${PACKETS_HEADERS}
COMMAND python3 basepackets.py
ARGS --dest="${CMAKE_BINARY_DIR}/common/"
ARGS --src="${CMAKE_SOURCE_DIR}/base/serial/"
DEPENDS "${CMAKE_SOURCE_DIR}/common/basepackets.py"
DEPENDS "${CMAKE_SOURCE_DIR}/base/serial/packets.h" "${CMAKE_SOURCE_DIR}/base/serial/packets.cpp"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/common/"
)
qt5_wrap_cpp(GENERATED_MOC ${PACKETS_HEADERS})
add_library(packets STATIC ${GENERATED_MOC} ${PACKETS_SOURCES})
target_link_libraries(packets Qt5::Widgets)

View File

@@ -0,0 +1,5 @@
# Common Libraries
This folder contains tools and libraries that can be used by both the electrical and software team. Currently has files for packet IO between the base station and miniboard.
This was not written by me, but by Nick Ames

View File

@@ -0,0 +1,87 @@
# Radio Communication Specification
## Packet Structure
The data radios provide a bidirectional pipe between the rover and the computer (similar to a serial cable). Data is sent LSB first, just as in RS-232. This byte stream is divided into command packets. Each command packet has the following format:
```
<start byte (0x01)>, <length byte>, <2 byte CRC (little-endian)>, <command byte>, [0 or more data bytes]
└────────────────────────Packet Header──────────────────────────┘└────────────Packet Body──────────────┘
```
A start byte (value 0x01) begins each packet. (Before receiving a start byte, the receiver should ignore all non-start input bytes.) Following the start byte is a single byte indicating the packet length (as an unsigned integer). The length takes into account all following bytes (the CRC, command bytes, and data byte(s)). After the length byte is the CRC, which is used to validate the contents of the packet after reception. (See below for details on CRC calculation. Packets with an invalid CRC should be silently ignored.) Collectively, the start byte, length byte, and CRC are the packet header.
Following the packet header is the packet body, which is composed of a command byte (see Packet Types) and zero or more data bytes. Up to 127 bytes of packet data are permitted. The number and meaning of data bytes depends on the command byte, but will be reflected in the length byte in the header.
Special bytes:
Start - 0x01
## CRC Calculation
The CRC is 16-bit with a polynomial of 0x1021 and an initial value 0xFFFF. The CRC is calculated over the packet body (command byte and data byte(s)). The following function will calculate the CRC:
```c
uint16_t calc_crc(uint8_t *body, int body_length){
uint16_t remainder = 0xFFFF;
for (int i = 0; i < body_length; i++){
remainder ^= body[i] << 8;
for (int bit = 8; bit > 0; bit--){
if (remainder & 0x8000){
remainder = (remainder << 1) ^ 0x1021;
} else {
remainder = (remainder << 1);
}
}
}
return remainder;
}
```
After the CRC has been calculated, it should be checked for agreement with the CRC in the packet header. If they disagree, the packet should be silently discarded.
## Packet Types
Commands are sent from the computer to the rover, and result in a reply from the rover to the computer. Command execution and the reply occur after the entire length of the packet is received and verified against the CRC. Conceptually, commands can be thought of as reading or writing values in the rover's memory. The MSB (bit 7, 0x80) of the command byte indicates whether that command is reading or writing (0 = writing, 1 = reading). When reading, the command should contain no data.
For each command, the rover sends a reply containing the command (in the same read/write form as it received it) and any additional data. For write commands, no data is sent in the reply. For read commands, the requested data is sent in the reply. If the rover receives a command it does not recognize, it sends a reply with a command byte of 0x00. If the computer attempts to write a read-only register, the rover acts as if the write succeeded.
Multi-byte values are transmitted little-endian. Twos complement is used for signed values.
In the following table, argument specifications begin with a code (i8 = 8-bit int, u16 = 16-bit unsigned int, etc) indicating the size of each argument. Any variable length arguments are specified using a star (\*). The leftmost argument is the first in the packet data.
Camera video data is transmitted through a separate interface. Camera commands and camera selection are sent through this command protocol.
Note: when adding commands to the table, each argument's name must be unique among all commands. Since they are used in the rover firmware,
don't change the name of existing command arguments.
## Packet Type Specification
| Name | RW | Command Code | Arguments | Default values | Notes |
| ---- | --- | ------------ | --------- | -------------- | ----- |
| Command not Recognized | - | 0x00 | u8 wrong_command | - | Sent as a reply to unknown commands. |
| Pause | RW | 0x05 | u8 pause_state | 1 | 0 = pause (no rover motion) 1 = unpause |
| Battery Voltage | R | 0x06 | u16 battery_voltage | - | Battery voltage in mV |
| Drive Motor Power | RW | 0x10 | i8 l_f_drive, i8 l_m_drive, i8 l_b_drive, i8 r_f_drive, i8 r_m_drive, i8 r_b_drive | 0,0,0,0,0,0 | -127 = full reverse 128 = full forward, r = right, l = left, f = front, m = middle, b = back |
| Swerve Drive State | RW | 0x11 | u8 swerve_state | 0 | 0x00 = Off (no motion), 0x01 = Straight, 0x02 = Turn |
| Arm Motors | RW | 0x12 | i8 arm_motor_1, i8 arm_motor_2, i8 arm_motor_3, i8 arm_motor_4, i8 arm_motor_5 | 0,0,0,0,0 | -127 = full reverse 128 = full forward TODO: Define motor->joint mapping
| Servo | W | 0x14 | u8 ax12_addr, u16 ax12_angle | 0,512 | Set the target angle of an AX12 servo. |
| S Bus Values 1 | R | 0x15 | u16 sbus_1, u16 sbus_2, u16 sbus_3, u16 sbus_4, u16 sbus_5, u16 sbus_6, u16 sbus_7, u16 sbus_8 | - | S-Bus channel values. |
| S Bus Values 2 | R | 0x16 | u16 sbus_9, u16 sbus_10, u16 sbus_11, u16 sbus_12, u16 sbus_13, u16 sbus_14, u16 sbus_15, u16 sbus_16, u8 sbus_active | - | S-Bus channel values plus a status bit indicating if the failsafe has been set. |
| Select Camera | RW | 0x20 | u8 selected_camera | 5 | 1-6; select camera feed to send to the base station and to send commands to. Note: camera output will be disabled unless the callsign has been set. TODO: define which camera corresponds to which number. |
| Callsign | RW | 0x21 | u8 callsign_data_length, * callsign_data| - | ASCII string of callsign (use numerals and capital letters only) |
| Camera Command | W | 0x22 | u8 camera_data_length, * camera_data | - | Custom camera commands defined in camera manual. camera_data_length defines the number of data bytes in the command (0-255). camera_data is the command to be sent to the camera.|
| GPS Position | R | 0x23 | u8 gps_pos_valid, i64 latitude, i64 longitude, i32 altitude | 0,0,0,0 | GPS Position. Good when valid != 0. Sign meaning: +=north/east. Latitude and longitude are in 10e-6 minute units. Altitude is in meters. |
| GPS Track | R | 0x24 | u8 gps_track_valid, i16 gps_heading, u16 gps_speed | 0,0,0 | GPS Heading, in hundredths of a degree. Speed in meters/hour. |
| Magnetometer | R | 0x26 | i16 mag_x, i16 mag_y, i16 mag_z | 0,0,0 | External magnetometer reading. TODO: define units and axis directions. |
| Accelerometer | R | 0x27 | i16 accel_x, i16 accel_y, i16 accel_z | 0,0,0 | IMU accelerometer reading. TODO: define units and axis directions. |
| Gyroscope | R | 0x28 | i16 gyro_x, i16 gyro_y, i16 gyro_z | 0,0,0 | IMU gyroscope reading. TODO: define units and axis directions. |
| Compass Heading | R | 0x29 | u8 compass_heading_valid, i16 compass_heading | 0,0 | Magnetic compass heading. TODO: define units and axis directions. |
| Pan Tilt Speed | RW | 0x2B | i8 pan_speed, i8 tilt_speed | 0,0 | Pan/tilt speeds. Both range from -128 to 127. The total angle will be limited to safe values.|
| AX12 Arm Mode | RW | 0x2C | u8 arm_mode | 0 | 0 = arm AX12s not in use, 1 = arm ax12s in use |
| End Effector Speed | RW | 0x2D | i16 ee_speed | 0 | Speed of end effector pitch control. Range -1023 to 1023. |
| Grabber | RW | 0x2E | i16 grabber_speed, i16 grabber_rotation_speed | 0,0 | Speed of grabber motors. Range -1023 to 1023. |
| Container Sealer | RW | 0x2F | u16 cflex1_speed, u16 cflex2_speed, i16 cseal_speed | 0,0,0 | Angles and speed of container motors. |
| GPIO Read State | R | 0x32 | u8 gpio_state | 0 | GPIO pin directions. 1 = high, 0 = low. Mapping: MSB X X 5 4 3 2 1 X LSB. |
| Sample Camera Action | RW | 0x35 | u8 cam_action | 0 | Sample cam actions. 0 = none, 4 = shutter, 3 = focus, 1 = zoom in, 2 = zoom out |
| Navigation Camera Action | RW | 0x36 | u8 nav_action | 0 | Naviagation cam actions. 0 = none, 1 = zoom in, 2 = zoom out |
| Soil Sensor Send | W | 0x40 | u8 soil_send_data_length, * soil_send_data| - | Data string to send to the soil sensor. |
| Soil Sensor Recv | RW | 0x41 | u8 soil_recv_data_length, * soil_recv_data| - | Reply string received from the soil sensor. |
| Soil Measure | RW | 0x42 | u8 soil_measure | 0 | 0 = do nothing, 1 = take measurement, 2 = measurement complete |
| Soil Measurements | R | 0x43 | i32 moisture, i32 temperature, i32 salinity | 0, 0, 0 | Divide each by 1000 to get the floating-point value. |
| Joystick | RW | 0x50 | i8 fr_joylh, i8 fr_joylv, i8 fr_joyrh, i8 fr_joyrv, i8 fr_potl, i8 fr_potr, i8 fr_sidel, i8 fr_sider, u8 fr_buttons, i8 xbox_joylh, i8 xbox_joylv, i8 xbox_joyrh, i8 xbox_joyrv, i8 xbox_triggerl, i8 xbox_triggerr, u8 xbox_buttons_high, u8 xbox_buttons_low | 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 | Frsky buttons are packed (MSB) H ..A (LSB). MSB of xbox_buttons_high is enable; must be 1! XBOX low byte -> (MSB)(RSC)(LSC)(RB)(LB)(Y)(X)(B)(A)(LSB) XBOX high byte (MSB)(1)(DPD)(DPU)(DPR)(DPL)(HOME)(STRT)(SEL)(LSB)|
| Autonomous Enable | RW | 0x60 | u8 auton_en | 0 | If 1, do autonomous traversal. |
| Autonomous Waypoint 1 | RW | 0x61 | i64 auton_way1_lat, i64 auton_way1_lon, u16 auton_way1_speed | 0,0,0 | Autonomous target waypoint. Speed in mm/s. |
| Autonomous Waypoint 2 | RW | 0x63 | i64 auton_way2_lat, i64 auton_way2_lon, u16 auton_way2_speed | 0,0,0 | Autonomous next waypoint. Speed in mm/s. |
| Time ms | R | 0x64 | u32 time_ms | 0 | Time since rover was turned on, in ms. |

View File

@@ -0,0 +1,618 @@
#!/usr/bin/env python3
"""This packet generates C++ files for handling packets on the
side of the base station."""
import re
from collections import OrderedDict
class BasePackets(object):
"""
Autogenerates C++ code to encode and decode packets sent between the base
station and the miniboard.
This class takes in template C++ source and header files and expands them
to include Qt signals emitted when a packet of a specific type is parsed
successfully, as well as slots that allow for sending packets to the
miniboard. The input files should contain comments with an "@" followed by a
command or attributes.
Atttributes are variables defined in the header files that will be used in
constructing autogenerated functions. They are specified with the
attribute name followed by a colon and the name of the variable in the
source file. For example, writing "@datastream: m_datastream" will specify
that the QDataStream class to read and write data from is called
m_datastream. Currently, available attributes are "datastream", designating
the QDataStream to use, the "types_enum", the enum that stores the codes
for the packet types, and the "crc", the name of the CRC function.
Additionally, functions denoted by an "@" in the source provide the location
for expanding the file during the autogeneration process. These functions
match the names in this class. Available functions are:
"packet_types_header", defining where to enumerate the packet codes,
"recieved_signals_header", header functions for the recieved signals,
"write_slots_header", header functions for slots that send packets with
for writing,
"read_slots_header", header functions for slots that read data from the
miniboard,
"parse_packets", the source function that parses a packet, it takes in
a parameter specifying the size of the packet, such as
"@parse_packets(size)",
"write_slots_source", write the source code for write slots, and
"read_slots_source", source code for read slots.
"""
def __init__(self, specification=None, files={}):
"""
Creates an instance of the BasePackets class.
Parameters
----------
specification : string
The path to the packet specification file.
files : dictionary
Dictionary of the input and output files. Input files contain
special commands that the parser will recognize. The autogenerated
files will be written to the output files. The format for the
dictionary is {"input_file": "output_file"}.
"""
# TODO: normalized naming to template file (low priority)
# files {"input_file": "output_file"}
self._specification = specification
self.files = files
# packets generated by the extract_table function
self._packets = None
# parameters extracted from header files, such as "datastream", etc.
self._params = {"start_byte": 0x01}
self._tabsize = 4
# requires removing all whitespace
# TODO: make not use directly
# regular expressions for various parsing and extraction
self._param_extractor = re.compile(r"(?<=:)[a-zA-Z0-9_]+")
# TODO: try (?<=^class) (low priority)
self._class_extractor = re.compile(r"(?<=class )[a-zA-Z]+")
self._function_arg_extractor = re.compile(r"\(.+\)")
self._arg_size_extractor = re.compile(r"(\d|\*)+$")
def packets():
doc = """A list of packets and their properties. The format is
specified in the extrac_table function."""
def fget(self):
return self._packets
return locals()
packets = property(**packets())
def specification():
doc = """The path to the packet specification file."""
def fget(self):
return self._specification
def fset(self, value):
self._specification = value
return locals()
specification = property(**specification())
def files():
doc = """Dictionary of files used in the parsing and autogeneration
process. Files are stored in an ordered dictionary with source
(.cpp) files at the end."""
def fget(self):
return self._files
def fset(self, value):
self._files = OrderedDict(sorted(value.items(),
key=lambda x: x[0].endswith(".cpp")))
return locals()
files = property(**files())
def write_files(self):
"""Parses the input files and outputs the autogenerated files. This is
the function to call after the necessary files have been specified."""
for t, s in self._files.items():
self._write_file(str(t), str(s))
def _write_file(self, template, output):
"""Parses an individual file. Extracts packets from the specification if
not done so already, then looks for matching functions in the template
and calls the respective function.
Parameters
----------
template : string
The path to the template used for autogenerating the output.
output : string
The path to the output file.
"""
if self._specification is None:
raise ValueError("The specification file has not been specified.")
if self._packets is None:
self.extract_table()
with open(template) as f:
file_template = f.read().splitlines()
with open(output, "w+") as f:
for line in file_template:
f.write(line + "\n")
if "class" in line:
if "enum" in line:
continue
self._params.update({"class":
re.search(self._class_extractor, line).group(0)})
if "@" not in line:
continue
elif "@datastream" in line:
self._params.update({"datastream":
self._extract_param(line)})
elif "@types_enum" in line:
self._params.update({"types_enum":
self._extract_param(line)})
elif "@crc" in line:
self._params.update({"crc":
self._extract_param(line)})
elif "@packet_types_header" in line:
f.write(self.packet_types_header(line=line))
elif "@recieved_signals_header" in line:
f.write(self.packet_signals_header(line=line))
elif "@write_slots_header" in line:
f.write(self.write_slots_header(line=line))
elif "@read_slots_header" in line:
f.write(self.read_slots_header(line=line))
elif "@parse_packets" in line:
f.write(self.parse_packets(line=line))
elif "@write_slots_source" in line:
f.write(self.write_slots_source(line=line))
elif "@read_slots_source" in line:
f.write(self.read_slots_source(line=line))
def extract_table(self):
"""Extract the command table from the text of the specification file.
Creates a list of dictionaries with the following members:
"name" : str, name of command
"rw" : str, contains r is command can be read, w if command
can be written, - if neither
"code" : int, command code (0-127)
"argument" : list, list of 0 or more tuples, each with the
following structure:
tuple(format code:str, command format code, u8, i16,
etc, or * to indicate a variable number of bytes.
name:str))
"default" : list of default values of arguments, as strings
"notes" : str, notes on command."""
with open(self._specification, "r") as f:
file_str = f.read()
s = file_str.find("\n| Name | RW | Command Code")
s += 1 # Skip over first newline
e = file_str[s:].find("\n\n")
if e < s:
table_str = file_str[s:]
else:
table_str = file_str[s:s + e]
self._packets = []
for l in table_str.split("\n"):
if len(l) == 0: continue
if l[0] == "|":
l = l[1:]
col = [c.strip() for c in l.split("|")]
if col[0][0:3] == "---": continue
if col[0] == "Name": continue
name = col[0]
rw = col[1].lower()
code = int(col[2], 0)
notes = col[5]
argument = []
for a in [c.strip() for c in col[3].split(",")]:
a = a.split(" ")
if len(a) == 2:
argument.append((a[0], a[1]))
default = []
if len(argument) > 0 and col[4].strip() != "-":
for d in [c.strip() for c in col[4].split(",")]:
default.append(d)
if len(default) != len(argument) and len(default) != 0:
raise RuntimeError(
"Default list length mismatch on command %s" % name
)
self._packets.append({
"name": name,
"rw": rw,
"code": code,
"argument": argument,
"default": default,
"notes": notes
})
# removing redundant packet arguments
for packet in self._packets:
for i, arg in enumerate(packet["argument"]):
if "length" in arg[1]:
packet["argument"].pop(i)
def packet_types_header(self, line=None):
"""
Generates header function for the packet types.
Parameters
----------
line : string, optional
The string line that contains the command. Used for controlling
the amount of trailing whitespace.
"""
string = ""
ws = self._whitespace(line)
for packet in self._packets:
uppercase_name = packet["name"].upper().replace(" ", "_")
string += ws + ("%s = %i,\n" % (uppercase_name, packet["code"]))
return string
def packet_signals_header(self, line=None):
"""
Generates header function for the packet signals.
Parameters
----------
line : string, optional
The string line that contains the command. Used for controlling
the amount of trailing whitespace.
"""
string = ""
ws = self._whitespace(line)
for packet in self._packets:
if "r" in packet["rw"]:
string += (ws + "void " + self._signal_name(packet) + "(" +
self._argument_proto(packet) + ");\n")
return string
def write_slots_header(self, line=None):
string = ""
ws = self._whitespace(line)
for packet in self._packets:
if "w" in packet["rw"]:
string += (ws + "void write" + self._camelcase(packet["name"]) +
"(" + self._argument_proto(packet) + ");\n")
return string
def read_slots_header(self, line=None):
string = ""
ws = self._whitespace(line)
for packet in self._packets:
if "r" in packet["rw"]:
string += (ws + "void read" + self._camelcase(packet["name"]) +
"();\n")
return string
def parse_packets(self, line=None):
# note that parse_packets assumes that the datastream head is at the
# beginning of the crc
ws = self._whitespace(line)
string = ""
# bitshift in the crc, and the packet type
string += ws + "quint16 _read_crc;\n"
string += ws + ("%s >> _read_crc;\n" % self._params["datastream"])
string += ws + "quint8 _packetType;\n"
string += ws + ("%s >> _packetType;\n" % self._params["datastream"])
# index to specify whether to use "if" or "else if"
i = 0
for packet in self._packets:
uppercase_name = packet["name"].upper().replace(" ", "_")
if "r" not in packet["rw"]:
continue
if i is 0:
i += 1
conditional = "if"
else:
conditional = "else if"
string += (ws + "%s(_packetType == (static_cast<quint8>"
"(%s::%s) | 0x80)) {\n" % (
conditional,
self._params["types_enum"],
uppercase_name)
)
packet_size = self._packet_size(packet)
# TODO: what if the packet size is variable? This requires adding
# in a size argument to the parse_packets function, creating a data
# type of that size, and bitshifting in the data.
# Note: this will probably require using readRawBytes or readBytes
# function.
if packet_size is not None:
for arg in packet["argument"]:
# for every argument in the packet, create a new variable
# and bitshift it in from the stream
arg_type = self._expand_argument(arg[0])
string += ws + "\t%s %s;\n" % (arg_type, arg[1])
string += ws + "\t%s >> %s;\n" % (
self._params["datastream"], arg[1]
)
string += ws + '\tqDebug() << "argument: " << "%s" << "; value: " << %s;\n' % (
arg[1], arg[1]
)
string += self._crc_calculation(packet, ws=ws, write=True)
# TODO: checking against crc (high priority)
# then emit
string += ws + "\tif(_crc == _read_crc) {\n"
string += ws + '\t\tqDebug() << "CRC check successful.";\n'
string += ws + "\t\temit " + self._signal_name(packet) + "("
string += "".join(map(lambda x: x[1] + ", ",
packet["argument"])).strip(" ,")
string += ");\n"
string += ws + "\t}\n"
else:
string += ws + "\tchar _data[size];\n" # TODO: remove hard coding
string += ws + "\t%s.readRawData(_data, size);\n" % (
self._params["datastream"],
)
string += ws + "\tQByteArray _byte_array = QByteArray(_data);\n"
string += ws + "\tquint16 _crc = 0xFFFF;\n"
string += ws + "\t_crc = %s(&_packetType, sizeof(quint8), _crc);\n" % (
self._params["crc"],
)
string += ws + "\t_crc = %s(&_byte_array, size, _crc);\n" % (
self._params["crc"]
)
string += ws + '\tqDebug() << "argument: %s; value: "' % (
packet["argument"][-1][1]
)
string += " << _byte_array;\n"
string += ws + "\tif(_crc == _read_crc) {\n"
string += ws + "\t\temit %s(_byte_array);\n" % (
self._signal_name(packet)
)
string += ws + "\t}\n"
# TODO
string += ws + "}\n"
for packet in self._packets:
uppercase_name = packet["name"].upper().replace(" ", "_")
if "w" not in packet["rw"]:
continue
string += (ws + "else if(_packetType == static_cast<quint8>"
"(%s::%s)) {\n") % (
self._params["types_enum"],
uppercase_name
)
string += ws + "\tquint16 _crc = 0xFFFF;\n"
string += ws + "\t_crc = %s(&_packetType, sizeof(quint8), _crc);\n" % (
self._params["crc"],
)
string += ws + "\tif(_crc == _read_crc) {\n"
string += ws + '\t\tqDebug() << "Confirmed packet reception.";\n'
string += ws + "\t}\n"
# TODO: don't do anything yet, but to be added
# TODO: like a qDebug statement
string += ws + "}\n"
return string.expandtabs(self._tabsize)
def write_slots_source(self, line=None):
string = ""
for packet in self._packets:
if "w" not in packet["rw"]:
continue
string += ("void %s::write%s(%s)\n{\n" % (
self._params["class"],
self._camelcase(packet["name"]),
self._argument_proto(packet),
))
string += '\tqDebug() << "Beginning packet write.";\n'
string += "\tquint8 _packetType = static_cast<quint8>(%s::%s);\n" % (
self._params["types_enum"],
self._uppercase_name(packet),
)
string += self._crc_calculation(packet)
string += "\t%s << (quint8)%s;\n" % (
self._params["datastream"],
self._params["start_byte"],
)
if self._packet_size(packet) is not None:
string += "\t%s << (quint8)%s;\n" % (
self._params["datastream"],
self._packet_size(packet)
)
else:
string += "\t%s << static_cast<quint8>(%s.size() + 3);\n" % (
self._params["datastream"],
packet["argument"][-1][1],
)
string += "\t%s << _crc;\n" % self._params["datastream"]
string += "\t%s << _packetType;\n" % (
self._params["datastream"],
)
if self._packet_size(packet) is not None:
for arg in packet["argument"]:
string += '\tqDebug("writing: %s; value: %i",' + '"%s", %s);\n' % (
arg[1], arg[1]
)
string += "\t%s << %s;\n" % (
self._params["datastream"], arg[1]
)
string += '\tqDebug() << "argument:" << "%s" << "; value:" << %s;\n' % (
arg[1], arg[1]
)
else:
string += "\t%s.writeRawData(%s.constData(), %s.size());\n" % (
self._params["datastream"],
packet["argument"][-1][1],
packet["argument"][-1][1],
)
string += "}\n\n"
return string.expandtabs(self._tabsize)
def _crc_calculation(self, packet, ws="", write=True, packet_type="_packetType"):
string = ""
string += ws + "\tquint16 _crc = 0xFFFF;\n"
string += ws + "\t_crc = %s(&%s, sizeof(quint8), _crc);\n" % (
self._params["crc"],
packet_type,
)
if not write:
return string
if self._packet_size(packet) is not None:
for arg in packet["argument"]:
string += ws + "\t_crc = %s(&%s, sizeof(%s), _crc);\n" % (
self._params["crc"],
arg[1],
arg[1], # TODO: fix this, unworking for variable size packets
)
else:
string += ws + "\t_crc = %s(&%s, %s.size(), _crc);\n" % (
self._params["crc"],
packet["argument"][-1][1],
packet["argument"][-1][1],
)
return string
def read_slots_source(self, line=None):
string = ""
for packet in self._packets:
if "r" not in packet["rw"]:
continue
string += "void %s::read%s()\n{\n" % (
self._params["class"],
self._camelcase(packet["name"]),
)
string += '\tqDebug() << "Beginning packet write.";\n'
string += ("\tquint8 _packetType = static_cast<quint8>(%s::%s) "
"| 0x80;\n") % (
self._params["types_enum"],
self._uppercase_name(packet),
)
string += self._crc_calculation(packet, write=False)
string += "\t%s << (quint8)%s;\n" % (
self._params["datastream"],
self._params["start_byte"],
)
string += "\t%s << (quint8)3;\n" % self._params["datastream"]
string += "\t%s << _crc;\n" % self._params["datastream"]
string += "\t%s << _packetType;\n" % (
self._params["datastream"],
)
string += "}\n\n"
return string.expandtabs(self._tabsize)
def _parse_function_args(self, str):
arguments = re.search(self._function_arg_extractor, str).group(0)
if arguments is None:
return None
arguments = arguments.strip("() ")
arguments = list(map(lambda x: x.strip(" ,"), arguments))
return arguments
def _signal_name(self, packet):
return (self._camelcase(packet["name"], capitalize_first_word=False) +
"Received")
def _expand_argument(self, arg):
if arg == "*":
arg_type = "QByteArray"
elif "u" in arg:
arg_type = "q" + arg.replace("u", "uint")
elif "i" in arg:
arg_type = "q" + arg.replace("i", "int")
else:
raise ValueError("Argument type %s id not recognized" % arg)
return arg_type
def _argument_proto(self, packet):
argument_str = ""
for arg in packet["argument"]:
arg_type = self._expand_argument(arg[0])
argument_str += arg_type + " " + arg[1] + ", "
argument_str = argument_str.strip(" ,")
return argument_str
def _camelcase(self, string, capitalize_first_word=True):
"""Takes a string of words and returns a camelcase version.
Parameters
----------
string : string
This is a string of space-separated words to be used in constructing
a camelcase string.
capitalize_first_word : bool, optional
Specifies whether to capitalize the first word.
Returns
-------
words : string
A camelcase version of the input string. For example, "this is a
string" would return "ThisIsAString" or "thisIsAString".
"""
words = string.split(" ")
words = map(lambda x: x.capitalize(), words)
words = "".join(words)
if not capitalize_first_word:
words = words[0].lower() + words[1:]
return words
def _whitespace(self, reference):
if reference is None:
return ""
num_spaces = (len(reference) - len(reference.lstrip(" ")))
return " " * num_spaces
def _packet_size(self, packet):
# in bytes
# add two bytes for CRC, and one byte for packet type
size = 3
for arg in packet["argument"]:
arg_size = re.search(self._arg_size_extractor, arg[0]).group(0)
if arg_size is None:
raise ValueError("Cannot determine size of packet.")
elif arg_size == "*":
return None
arg_size = int(arg_size)
if (arg_size % 8 is not 0):
raise ValueError("Argument sizes must be multiples of 8.")
size += (arg_size // 8)
return size
def _uppercase_name(self, packet):
return packet["name"].upper().replace(" ", "_")
def _extract_param(self, string):
s = string.replace(" ", "")
return re.search(self._param_extractor, s).group(0)
if __name__ == "__main__":
import argparse
import os.path
parser = argparse.ArgumentParser()
parser.add_argument("--src", type=str, default=".")
parser.add_argument("--dest", type=str, default=".")
parser.add_argument("--name", type=str, default="packets")
parser.add_argument("--spec", type=str, default="SPECIFICATION.md")
args = parser.parse_args()
header_template = os.path.join(args.src, args.name) + ".h"
header_output = os.path.join(args.dest, args.name) + ".h"
source_template = os.path.join(args.src, args.name) + ".cpp"
source_output = os.path.join(args.dest, args.name) + ".cpp"
specification = args.spec
b = BasePackets(
specification=specification,
files = {header_template: header_output, source_template: source_output}
)
b.write_files()

View File

@@ -0,0 +1,190 @@
#!/usr/bin/env python3
#Generates communications code for the miniboard firmware, based on
#a command protocol specification document (see docparse.py) supplied
#as the first command-line argument.
#
#This code generation system has two goals: to allow commands to be easily added
#to the spec prior to their implementation, and to ensure that all commands
#function consistently. To accomplish this, all command-specific communication
#code is generated automatically from the specification document.
#For this to be possible, the code that actually carries out each command
#must be placed elsewhere.
#
#Conceptually, commands to the rover read and write registers in its memory.
#This model makes separating communication and action easy. The automatically-generated
#communication code reads and writes fields in a large global struct (also automatically
#generated), with each field corresponding to a command argument.
#
#The output of this program is a source file and a header file containing the auto-generated
#communication code.
#The global structure is called DataReal. Due to issues with the volatile qualifier
#(the main code needs it, but the ISR doesn't), the main code accesses DataReal through
#a pointer called Data.
#The parse_packet(uint8_t *buf, uint16_t count) function initiates packet
#parsing and dispatches the appropriate reply. buf must contain the command
#byte as the first byte in the buffer. count must be the number of bytes in
#the packet, not including the start or end bytes, and with escape bytes removed.
#
#The host program must provide a send_packet(uint8_t *data, uint16_t count) function
#which adds start/end bytes and escapes special characters, then sends the packet
#over the radio.
#The host program must also provide a memcpy() function.
#
#Additionally, there is an asynchronous command element beyond the simple read/write memory
#model. When a command includes a variable-length argument, a <command name>_trigger(uint8_t *data, uint16_t *len)
#function will be called when the command is received. This function must be implemented by
#the host code. The purpose of this mechanism is to notify the action code when the data
#changes. The trigger function must be capable of being run in an interrupt.
import sys
from docparse import *
def get_all_args(cmd_list):
"""Return a list of tuples (format code, argument name, note comment)."""
l = list()
for c in cmd_list:
for a in c["argument"]:
l += [(a[0], a[1], c["notes"])]
return l
def gen_struct_def(cmd_list):
s = "struct comm_data_t {\n"
for c in get_all_args(cmd_list):
dt = format_code_to_cstdint(c[0])
if "*" in dt:
#Arrays
s += "\t" + dt[0:-2] + " " + c[1] + "[%d];"%(2 ** int(dt[4:5])) + " /* " + c[2] + " */\n"
else:
s += "\t" + dt + " " + c[1] + ";" " /* " + c[2] + " */\n"
s += "};\n\n"
return s
def gen_build_str_def():
"""Return a declaration for a progmem string containing information about the build."""
return ""
def gen_build_str_dec():
"""Return a definition of a PROGMEM string containing information about the build."""
#Get name of person building firmware
#git config --get-all user.name
#Get repo revision
#git log | head -1 | cut -d " " -f 2
#Get branch
#git branch | grep "\*" | cut -d " " -f 2
#Get modified status
#Date, time, gcc version (__VERSION__)
s = "Miniboard Firmware rev "
return ""
def gen_header(cmd_list):
"""Return a string containing the C header for the communication module."""
s = "/* Warning: This file is automatically generated. Do not modify. */\n"
s += "#ifndef COMMGEN_H\n"
s += "#define COMMGEN_H\n\n"
s += "#ifdef __cplusplus\n"
s += "extern \"C\" {\n"
s += "#endif\n\n"
s += "#include <stdint.h>\n\n"
s += gen_struct_def(cmd_list)
s += "/* To avoid the volatile qualifier being a pain in the ass, the main loop\n"
s += " * accesses the DataReal struct through this pointer. */\n"
s += "extern volatile struct comm_data_t *Data;\n\n"
s += "/* Parse a packet, update the struct, and send a reply. */\n"
s += "void parse_packet(uint8_t *buf, uint16_t count);\n\n"
for c in cmd_list:
s += gen_send_proto(c) + "\n"
s + gen_parse_proto(c) + "\n"
s += gen_packing_protos()
s += gen_build_str_dec()
#s += "void send_packet(uint8_t *data, uint16_t count);\n\n"
s += "#ifdef __cplusplus\n"
s += "}\n"
s += "#endif\n\n"
s += "#endif\n"
return s
def gen_struct_dec(cmd_list):
s = "struct comm_data_t DataReal = {\n"
for c in cmd_list:
for i,d in zip(list(range(0, len(c["default"]))), c["default"]):
s += "\t." + c["argument"][i][1] + " = " + d + ",\n"
s += "};\n"
s += "volatile struct comm_data_t *Data = &DataReal;\n"
return s
def gen_source(cmd_list):
s = "/* Warning: This file is automatically generated. Do not modify. */\n"
s += "#include <stdint.h>\n"
s += "#include <string.h>\n"
s += "#include \"commgen.h\"\n\n"
s += "#include \"comm.h\"\n\n"
s += gen_struct_dec(cmd_list)
for c in cmd_list:
s += gen_parse_func(c) + "\n"
s += gen_send_func(c, False) + "\n"
s += gen_packing_funcs()
s += gen_parse_packet_source(cmd_list)
s += gen_build_str_def()
return s
def gen_parse_packet_source(cmd_list):
#TODO: check for count == 0
"""Return a string containing the source code to the
parse_packet(uint8_t *buf, uint16_t count)
function, which parses a packet, updates values in the global Data structure,
and dispatches a reply.
The function relies on the following special functions:
send_packet(uint8_t *data, uint16_t count) - send the given packet across the radio link.
The send_packet() function must add a start and end byte and
escape characters where necessary.
as well as the send_* and parse_* functions."""
s = ""
s += "void parse_packet(uint8_t *buf, uint16_t count){\n"
s += "\tuint8_t cmd = buf[0];\n"
s += "\tswitch(cmd){\n"
for c in cmd_list:
s += "\t\t/* %s */\n"%(c["name"])
s += "\t\tcase 0x%02X: /* (Write form) */\n"%c["code"]
s += "\t\t\tparse_%s(buf, "%cannon_name(c["name"])
add_trigger = False
for a in c["argument"]:
if a[0] == "*":
s += "DataReal.%s, "%(a[1])
add_trigger = True;
else:
s += "&(DataReal.%s), "%(a[1])
s = s[0:-2] + ");\n"
s += "\t\t\tbuf[0] = cmd;\n"
s += "\t\t\tsend_packet(buf, 1);\n"
if add_trigger:
s += "\t\t\t%s_trigger();\n"%cannon_name(c["name"])
s += "\t\t\tbreak;\n"
s += "\t\tcase 0x%02X: /* (Read form) */\n"%(c["code"] | 0x80)
s += "\t\t\tsend_%s("%cannon_name(c["name"])
for a in c["argument"]:
s += "DataReal.%s, "%(a[1])
s = s[0:-2] + ");\n"
s += "\t\t\tbreak;\n"
s += "\t\tdefault:\n"
s += "\t\t\tbuf[0] = 0;\n"
s += "\t\t\tsend_packet(buf, 1);\n"
s += "\t\t\tbreak;\n"
s += "\t}\n}\n"
return s
#TODO: writeable stuff
def main():
if len(sys.argv) != 4:
sys.stderr.write("error: wrong number of arguments. Expected path to spec file, source file, and header file.")
with open(sys.argv[1], "r") as f:
cmds = extract_table(f.read())
with open(sys.argv[3], "w") as f:
f.write(gen_header(cmds))
with open(sys.argv[2], "w") as f:
f.write(gen_source(cmds))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,289 @@
#!/usr/bin/env python3
#Parses a markdown file (first command line argument)
#containing a table with the header whose header starts with:
#| Name | RW | Command Code
import textwrap
import sys
import math
def extract_table(file_str):
"""Extract the command table from the text of
specification.md. Returns a list of dictionaries with the following
members:
"name" : str, name of command
"rw" : str, contains r is command can be read, w if command can be written, - if neither
"code" : int, command code (0-127)
"argument" : list( #list of 0 or more tuples, each with the following structure:
tuple(format code:str, #command format code, u8, i16, etc, or * to indicate a
#a variable number of bytes controlled by the previous argument.
name:str))
"default" : list of default values of arguments, as strings
"notes" : str, notes on command."""
s = file_str.find("\n| Name | RW | Command Code")
s+=1 #Skip over first newline
e = file_str[s:].find("\n\n")
if e < s:
table_str = file_str[s:]
else:
table_str = file_str[s:s + e]
commands = []
for l in table_str.split("\n"):
if len(l) == 0: continue
if l[0] == "|":
l = l[1:]
col = [c.strip() for c in l.split("|")]
if col[0][0:3] == "---": continue
if col[0] == "Name": continue
name = col[0]
rw = col[1].lower()
code = int(col[2], 0)
notes = col[5]
argument = []
for a in [c.strip() for c in col[3].split(",")]:
a = a.split(" ")
if len(a) == 2:
argument.append((a[0], a[1]))
default = []
if len(argument) > 0 and col[4].strip() != "-":
for d in [c.strip() for c in col[4].split(",")]:
default.append(d)
if len(default) != len(argument) and len(default) != 0:
sys.stderr.write("warning: default list length mismatch on command '%s'\n"%name)
commands.append({
"name" : name,
"rw" : rw,
"code" : code,
"argument" : argument,
"default" : default,
"notes" : notes})
return commands
def format_code_to_cstdint(fmt_name):
"""Convert a format code (i8, u16, *, etc) to a c standard int (int8_t, uint16_t, uint8_t *)."""
if fmt_name == "*":
return "uint8_t *"
elif fmt_name[0] == "i":
return "int%s_t"%fmt_name[1:]
elif fmt_name[0] == "u":
return "uint%s_t"%fmt_name[1:]
else:
raise ValueError("Unknown format code \"%s\""%fmt_name)
def format_code_size(fmt_name):
"""Return the number of bytes required for a type with the given format code."""
if fmt_name == "*":
return 255
else:
return int(fmt_name[1:])//8
def cannon_name(command):
"""Given the name of a command, return the canonical function name
(lowercase, with spaces converted to underscores)."""
return command.lower().replace(" ", "_")
def gen_send_proto(cmd_dict):
"""Given a command dictionary (from extract_table), return the prototype
of a function that transmits the command."""
cmd = cmd_dict
#Comment
comment = "/* Note: %s */\n"%textwrap.fill(cmd["notes"], 80).replace("\n", "\n * ")
#Prototype
need_trigger = False
proto = "void "
proto += "send_" + cannon_name(cmd["name"]) + "("
for a in cmd["argument"]:
proto += "" + format_code_to_cstdint(a[0]) + " " + a[1] + ", "
if a[0] == "*": need_trigger = True
proto = proto[0:-2] #Remove last commma
proto += ");\n"
#Trigger
trigger = ""
if need_trigger:
trigger = "void " + cannon_name(cmd["name"]) + "_trigger(void);\n"
return comment + proto + trigger
def gen_send_func(cmd_dict, write_mode):
"""Given a command dictionary (from extract_table()), return a function that constructs
a packet of that type. If write_mode is true, the command is transmitted in the
write form; otherwise, the read form is used.
The function relies on the following functions:
send_packet(uint8_t *data, uint16_t count) - send the given packet across the radio link.
The send_packet() function must add a start and end byte and
escape characters where necessary.
pack*(uint8_t *data, uint16_t pos, uint*_t value) - pack the given value (where * = 8, 16, 32, or 64 bits)
into the given buffer position in a little-endian format.
memcpy(void *dest, void *src, uint16_t count) - copy bytes"""
cmd = cmd_dict
#Comment
comment = "/* Note: %s */\n"%textwrap.fill(cmd["notes"], 80).replace("\n", "\n * ")
#Prototype
proto = "void "
proto += "send_" + cannon_name(cmd["name"]) + "("
for a in cmd["argument"]:
proto += "" + format_code_to_cstdint(a[0]) + " " + a[1] + ", "
proto = proto[0:-2] #Remove last commma
proto += "){\n"
#Command
command = "\tbuf[0] = 0x%X;\n"%(cmd["code"] | (int(not write_mode) << 7))
#Packet stuffing
body = ""
totalsize = 1 #current byte in packet buffer
prev_a = None
for a in cmd["argument"]:
if a[0] == "*":
if prev_a == None:
raise ValueError("In command %s, variable-length argument used before length controlling argument.")
body += "\tmemcpy(buf + b, %s, %s);\n"%(a[1], prev_a[1])
body += "\tb += %s;\n"%prev_a[1]
totalsize += 255
else:
s = format_code_size(a[0])
body += "\tpack%d(buf, b, %s);\n"%(s*8, a[1])
body += "\tb += %d;\n"%s
totalsize += s
prev_a = a
#Send command
send = "\tsend_packet(buf, b);\n}\n"
#Declarations
declarations = "\tuint16_t b = 1;\n\tuint8_t buf[%d];\n"%totalsize
return comment + proto + declarations + command + body + send
def gen_parse_proto(cmd_dict):
"""Given a command dictionary (from extract_table()), return a function that
Extracts a packet of that type from a buffer. The first byte in the buffer
should be the command byte, escape characters must be removed, and it need
not contain the end byte.
The generated function relies on the following functions:
unpack*(uint8_t *data, uint16_t pos, uint*_t *result) - (where * = 8, 16, 32, etc.)
unpack the little-endian value from the buffer and write it
to a variable.
memcpy(void *dest, void *src, uint16_t count) - copy bytes"""
cmd = cmd_dict
#Comment
comment = "/* Note: %s */\n"%textwrap.fill(cmd["notes"], 80).replace("\n", "\n * ")
#Prototype
proto = "void "
proto += "parse_" + cannon_name(cmd["name"]) + "("
proto += "uint8_t *packet, "
for a in cmd["argument"]:
typename = "" + format_code_to_cstdint(a[0]).strip()
if typename[-1] == "*":
typename = typename[0:-1]
proto += typename + " *" + a[1] + ", "
proto = proto[0:-2] #Remove last commma
proto += ";\n"
return comment + proto
def gen_parse_func(cmd_dict):
"""Given a command dictionary (from extract_table()), return a function that
Extracts a packet of that type from a buffer. The first byte in the buffer
should be the command byte, escape characters must be removed, and it need
not contain the end byte.
The generated function relies on the following functions:
unpack*(uint8_t *data, uint16_t pos, uint*_t *result) - (where * = 8, 16, 32, etc.)
unpack the little-endian value from the buffer and write it
to a variable.
memcpy(void *dest, void *src, uint16_t count) - copy bytes"""
cmd = cmd_dict
#Comment
comment = "/* Note: %s */\n"%textwrap.fill(cmd["notes"], 80).replace("\n", "\n * ")
#Prototype
proto = "void "
proto += "parse_" + cannon_name(cmd["name"]) + "("
proto += "uint8_t *packet, "
for a in cmd["argument"]:
typename = "" + format_code_to_cstdint(a[0]).strip()
if typename[-1] == "*":
typename = typename[0:-1]
proto += typename + " *" + a[1] + ", "
proto = proto[0:-2] #Remove last commma
proto += "){\n"
#Packet stuffing
body = ""
totalsize = 1 #current byte in packet buffer
prev_a = None
for a in cmd["argument"]:
if a[0] == "*":
if prev_a == None:
raise ValueError("In command %s, variable-length argument used before length controlling argument."%cmd["name"])
body += "\tmemcpy(%s, packet + b, *%s);\n"%(a[1], prev_a[1])
body += "\tb += *%s;\n"%prev_a[1]
totalsize += 255
else:
s = format_code_size(a[0])
body += "\tunpack%d(packet, b, (uint%d_t *) %s);\n"%(s*8, s*8, a[1])
body += "\tb += %d;\n"%s
totalsize += s
prev_a = a
#Declarations
declarations = "\tuint16_t b = 1;\n"
return comment + proto + declarations + body + "}\n"
def gen_packing_protos():
"""Return a string containing the prototypes of the functions:
unpack*(uint8_t *data, uint16_t pos, uint*_t *result) - (where * = 8, 16, 32, etc.)
unpack the little-endian value from the buffer and write it
to a variable.
pack*(uint8_t *data, uint16_t pos, uint*_t value) - pack the given value (where * = 8, 16, 32, or 64 bits)
into the given buffer position in a little-endian format.
where * = 8, 16, 32, and 64."""
s = ""
for n in [8, 16, 32, 64]:
s += "void pack%d(uint8_t *data, uint16_t pos, uint%d_t value);\n"%(n,n)
s += "void unpack%d(uint8_t *data, uint16_t pos, uint%d_t *result);\n"%(n,n)
return s
def gen_packing_funcs():
"""Return a string containing the source code to the functions:
unpack*(uint8_t *data, uint16_t pos, uint*_t *result) - (where * = 8, 16, 32, etc.)
unpack the little-endian value from the buffer and write it
to a variable.
pack*(uint8_t *data, uint16_t pos, uint*_t value) - pack the given value (where * = 8, 16, 32, or 64 bits)
into the given buffer position in a little-endian format.
where * = 8, 16, 32, and 64."""
s = ""
for n in [8, 16, 32, 64]:
s += "void pack%d(uint8_t *data, uint16_t pos, uint%d_t value){\n"%(n,n)
for i in range(0, n//8):
s += "\t*(data + pos + %d) = (value >> %d) & 0xFF;\n"%(i,i*8)
s += "}\n"
s += "void unpack%d(uint8_t *data, uint16_t pos, uint%d_t *result){\n"%(n,n)
s += "\t*result = "
for i in range(0, n//8):
s += "(((uint%d_t) *(data + pos + %d) << %d)) | "%(n,i, i * 8)
s = s[0:-3] #trim last " | "
s += ";\n}\n"
return s
if __name__ == "__main__":
import sys
with open(sys.argv[1], "r") as f:
cmds = extract_table(f.read())
for c in cmds:
print(c["argument"])

View File

@@ -0,0 +1,408 @@
#!/usr/bin/env python
Help = """Miniboard GUI Test Tool
From a SPECIFICATION.md file (supplied as the first command-line
argument), this program provides a GUI that can be used to get/set
all registers on the miniboard."""
import sys
import math
import docparse #TODO: Add a try/catch and pop-up dialog.
from PyQt4.QtGui import *
from PyQt4.QtCore import *
import struct
import serial
import signal
import os
from serial.tools.list_ports import comports
signal.signal(signal.SIGINT, signal.SIG_DFL) #Make Ctrl-C quit the program
#TODO: Check lengths everywhere, for validation
#TODO: Add serial port selection
#TODO: Add big pause/unpause buttons
#TODO: Fix issue with spinbox that prevents typing 0 or -
#SerialPortPath = "/dev/ttyUSB0"
SerialPortPath = comports()[0].device
#SerialPortPath = "/dev/ttyACM0"
class MiniboardIO():
"""Handles reading and writing from the miniboard."""
path = SerialPortPath
baud = 115200
def __init__(self):
os.system("stty -F %s -hupcl"%self.path)
self.__tty = serial.Serial(port=self.path,
baudrate=self.baud,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1)
def calc_crc(self, body):
body = [ord(b) for b in body]
remainder = 0xFFFF
for i in range(0, len(body)):
remainder ^= body[i] << 8
remainder &= 0xFFFF
for bit in reversed(range(1,9)):
if remainder & 0x8000:
remainder = (remainder << 1) ^ 0x1021
remainder &= 0xFFFF
else:
remainder <<= 1
remainder &= 0xFFFF
return remainder
#DON'T USE DIRECTLY RIGHT NOW! Use writeread()
def write(self, packet_contents):
"""Write a packet to the miniboard, inserting the header first."""
print " send: ",
packet_contents = "".join(packet_contents)
plen = len(packet_contents) + 2
crc = self.calc_crc(packet_contents)
packet_contents = chr(0x01) + chr(plen) + chr(crc & 0xFF) + chr(crc >> 8) + packet_contents
for c in packet_contents:
print "0x%02X, "%ord(c),
print "\n",
self.__tty.write(packet_contents)
#DON'T USE DIRECTLY RIGHT NOW! Use writeread()
def read(self):
"""Read a packet from the miniboard and return the contents
(with header removed).
If the miniboard takes too long to reply, or if an error occurs,
an exception will be raised. TODO: Exception type."""
print " recv: ",
reply = self.__tty.read(size=10000000)
for c in reply:
print "0x%02X, "%ord(c),
print "\n",
if len(reply) == 0:
print "Error: no reply."
return ""
if reply[0] != chr(0x01):
print "Error: missing start byte."
return ""
if len(reply) < 5:
print "Error: no enough bytes."
return ""
if (len(reply) - 2) != ord(reply[1]):
print "Error: length mismatch (header says %d, packet is %d)"%(ord(reply[1]), len(reply))
return ""
if self.calc_crc(reply[4:]) != (ord(reply[3])<<8 | ord(reply[2])):
print "Error: CRC mismatch."
return ""
return "".join(reply[4:])
def writeread(self, packet_contents):
"""Write a packet to the miniboard and return the reply."""
#self.__tty = serial.Serial(port=self.path,
#baudrate=self.baud,
#parity=serial.PARITY_NONE,
#stopbits=serial.STOPBITS_ONE,
#bytesize=serial.EIGHTBITS,
#timeout=0.1)
self.write(packet_contents)
reply = self.read()
#self.__tty.close()
return reply
def port_info(self):
"""Return a tuple (serial_port_path, baud_rate)."""
return (self.path, self.baud)
class RegisterController():
"""Handles reading/writing a single register and controlling that
register's UI widgets."""
def setup(self, reg_dict, widgets, io):
"""arg_row is the register specification from extract_table().
widgets is a list of the widgets corresponding to each register,
in the order they appear in the reg_dict.
io is the MiniboardIO class to use."""
self.reg = reg_dict
self.widgets = widgets
self.io = io
self.fmtcodes = {"u8":"<B", "i8":"<b", "u16":"<H", "i16":"<h", "u32":"<I", "i32":"<i", "i64":"<Q", "i64":"<q"}
def writefunc(self):
"""Return a function (due to pyqt weirdness) for writing this register to the miniboard."""
def func():
print "Write reg 0x%02X (%s): "%(self.reg["code"], self.reg["name"])
p = [chr(self.reg["code"] | 0x00)]
for a,w,i in zip(self.reg["argument"], self.widgets, range(0, len(self.widgets))):
if a[0] == "*":
self.widgets[i-1].setValue(len(str(w.text()).decode('string_escape')))
for a,w,i in zip(self.reg["argument"], self.widgets, range(0, len(self.widgets))):
if a[0] != "*":
value = w.value()
p += list(struct.pack(self.fmtcodes[a[0]], value))
else:
p += str(w.text()).decode('string_escape')
reply = self.io.writeread(p)
return func
def readfunc(self):
"""Return a function (due to pyqt weirdness) for reading this register from the miniboard."""
def func():
print "Read reg 0x%02X (%s): "%(self.reg["code"], self.reg["name"])
p = [chr(self.reg["code"] | 0x80)]
reply = self.io.writeread(p)
if reply[0] != p[0]:
print "Error: incorrect command code. Expected 0x%02X, got 0x%02X"%(ord(p[0]), ord(reply[0]))
b = 1
vs = []
for a,w,i in zip(self.reg["argument"], self.widgets, range(0, len(self.widgets))):
if a[0] != "*":
s = struct.calcsize(self.fmtcodes[a[0]])
value = struct.unpack(self.fmtcodes[a[0]], reply[b:b+s])[0]
vs.append(value)
b += s
else:
s = vs[i-1]
value = reply[b:b+s]
b+=s
try:
w.setValue(value)
except:
w.setText(value)
return func
def update(self):
"""Re-process widget data after editing."""
def horizontalLine():
#From http://stackoverflow.com/questions/5671354/how-to-programmatically-make-a-horizontal-line-in-qt
line = QFrame()
line.setFrameShape(QFrame.HLine)
line.setFrameShadow(QFrame.Sunken)
return line
def argtype_minval(argtype):
"""Return the minimum value for a numerical argument type."""
if argtype[0] == "u":
return 0
elif argtype[0] == "i":
return -(2**(int(argtype[1:])-1))
else:
return 0
def argtype_maxval(argtype):
"""Return the maximum value for a numerical argument type."""
if argtype[0] == "u":
return 2**(int(argtype[1:])) - 1
elif argtype[0] == "i":
return 2**(int(argtype[1:])-1) - 1
else:
return 0
def argtype_minwidth(argtype):
"""Return the minimum width of a spinbox for the given argument type."""
m = 2.5 + math.ceil(math.log10(argtype_maxval(argtype)))
return m * 9
class BigIntSpinBox(QAbstractSpinBox):
"""From http://stackoverflow.com/questions/26841036/pyqt4-spinbox-with-64bit-integers"""
def __init__(self, parent=None):
super(BigIntSpinBox, self).__init__(parent)
self._singleStep = 1
self._minimum = -18446744073709551616
self._maximum = 18446744073709551615
self.lineEdit = QLineEdit(self)
rx = QRegExp("-[0-9]\\d{0,20}")
validator = QRegExpValidator(rx, self)
self.lineEdit.setValidator(validator)
self.setLineEdit(self.lineEdit)
def value(self):
try:
return int(self.lineEdit.text())
except:
raise
return 0
def setValue(self, value):
if self._valueInRange(value):
self.lineEdit.setText(str(value))
def stepBy(self, steps):
self.setValue(self.value() + steps*self.singleStep())
def stepEnabled(self):
return self.StepUpEnabled | self.StepDownEnabled
def setSingleStep(self, singleStep):
assert isinstance(singleStep, int)
# don't use negative values
self._singleStep = abs(singleStep)
def singleStep(self):
return self._singleStep
def minimum(self):
return self._minimum
def setMinimum(self, minimum):
assert isinstance(minimum, int) or isinstance(minimum, long)
self._minimum = minimum
def maximum(self):
return self._maximum
def setMaximum(self, maximum):
assert isinstance(maximum, int) or isinstance(maximum, long)
self._maximum = maximum
def _valueInRange(self, value):
if value >= self.minimum() and value <= self.maximum():
return True
else:
return False
AutoreadTimer = None
def setup(window, spec_table, io):
ww = QWidget(window)
window.setWindowTitle("Miniboard GUI Test Tool")
flayout = QFormLayout()
vlayout = QVBoxLayout()
read_list = []
write_list = []
autoread_list = []
for r in spec_table:
label = QLabel(r["name"])
hl = QHBoxLayout()
annotated_args = []
control_widgets = []
rc = RegisterController()
for a,i in zip(r["argument"],range(0, len(r["argument"]))):
a = [a[0], a[1], False] #Type, Name, Is_Size_Controller
annotated_args.append(a)
if a[0] == "*":
annotated_args[i-1][2] = True
for a,i in zip(annotated_args,range(0, len(annotated_args))):
vl = QVBoxLayout()
if a[0] == "*":
widget = QLineEdit()
elif a[0] == "u64" or a[0] == "i64" or a[0] == "u32":
widget = BigIntSpinBox()
widget.setMinimum(argtype_minval(a[0]))
widget.setMaximum(argtype_maxval(a[0]))
widget.setMinimumSize(QSize(argtype_minwidth(a[0]), 0))
else:
widget = QSpinBox()
widget.setMinimum(argtype_minval(a[0]))
widget.setMaximum(argtype_maxval(a[0]))
widget.setMinimumSize(QSize(argtype_minwidth(a[0]), 0))
if len(r["default"]) == 0:
widget.setValue(0)
else:
widget.setValue(int(r["default"][i]))
if a[2] or "w" not in r["rw"]:
widget.setEnabled(False)
control_widgets.append(widget)
subtitle = QLabel(a[1])
subtitle.setFont(QFont("", 8))
vl.addWidget(widget)
vl.addWidget(subtitle)
hl.addLayout(vl)
rc.setup(r, control_widgets, io)
rbtn = QToolButton()
readfunc = rc.readfunc()
writefunc = rc.writefunc()
rbtn.setText("Read")
rbtn.pressed.connect(readfunc)
wbtn = QToolButton()
wbtn.setText("Write")
wbtn.pressed.connect(writefunc)
ar_check = QCheckBox()
if "r" not in r["rw"]:
rbtn.setEnabled(False)
ar_check.setEnabled(False)
else:
read_list.append(readfunc)
autoread_list.append((ar_check, readfunc))
if "w" not in r["rw"]:
wbtn.setEnabled(False)
else:
write_list.append(writefunc)
bvl = QVBoxLayout()
hbvl = QHBoxLayout()
hbvl.addWidget(ar_check)
hbvl.addWidget(rbtn)
hbvl.addWidget(wbtn)
bvl.addLayout(hbvl)
bvl.addSpacing(21)
hl.addStretch(1)
hl.addLayout(bvl)
flayout.addRow(label, hl)
gh = QHBoxLayout()
gh.addWidget(QLabel("Port: %s Baud: %d"%io.port_info()))
gh.addStretch(1)
def read_all():
for f in read_list:
f()
def write_all():
for f in write_list:
f()
def autoread():
for t in autoread_list:
if t[0].isChecked():
print "autoread"
t[1]()
rbtn = QToolButton()
rbtn.setText("Read All")
rbtn.pressed.connect(read_all)
wbtn = QToolButton()
wbtn.setText("Write All")
wbtn.pressed.connect(write_all)
ar_lbl = QLabel("Auto Read");
timer = QTimer(ar_lbl)
timer.timeout.connect(autoread)
timer.start(2000);
gh.addWidget(ar_lbl)
gh.addWidget(rbtn)
gh.addWidget(wbtn)
vlayout.addLayout(gh)
vlayout.addWidget(horizontalLine())
scrollarea = QScrollArea()
scrollarea.setWidgetResizable(True)
inner = QFrame(scrollarea)
inner.setLayout(flayout)
scrollarea.setWidget(inner)
vlayout.addWidget(scrollarea)
ww.setLayout(vlayout)
window.setCentralWidget(ww)
sh = flayout.sizeHint()
window.resize(QSize(sh.width() + 50, 700))
def get_specpath():
"""Return the path to the specification file,
using a hardcoded default, command-line argument, pop-up dialog, etc."""
return "SPECIFICATION.md"
def main():
app = QApplication(sys.argv)
w = QMainWindow()
spec = None
with open(get_specpath(), "r") as f:
spec = docparse.extract_table(f.read());
io = MiniboardIO()
setup(w, spec, io)
w.show()
sys.exit(app.exec_())
main()

View File

@@ -0,0 +1,193 @@
#!/usr/bin/env python
Help = """Miniboard Test-Drive
From a SPECIFICATION.md file (supplied as the first command-line
argument), this program provides takes in joystick input and drives
the rover."""
import sys
import math
import docparse #TODO: Add a try/catch and pop-up dialog.
from PyQt4.QtGui import *
from PyQt4.QtCore import *
import struct
import serial
import signal
import os
import pygame
import time
import struct
signal.signal(signal.SIGINT, signal.SIG_DFL) #Make Ctrl-C quit the program
SerialPortPath = "/dev/ttyUSB0"
class MiniboardIO():
"""Handles reading and writing from the miniboard."""
path = SerialPortPath
baud = 115200
def __init__(self):
os.system("stty -F %s -hupcl"%self.path)
self.__tty = serial.Serial(port=self.path,
baudrate=self.baud,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=0.5)
def calc_crc(self, body):
body = [ord(b) for b in body]
remainder = 0xFFFF
for i in range(0, len(body)):
remainder ^= body[i] << 8
remainder &= 0xFFFF
for bit in reversed(range(1,9)):
if remainder & 0x8000:
remainder = (remainder << 1) ^ 0x1021
remainder &= 0xFFFF
else:
remainder <<= 1
remainder &= 0xFFFF
return remainder
#DON'T USE DIRECTLY RIGHT NOW! Use writeread()
def write(self, packet_contents):
"""Write a packet to the miniboard, inserting the header first."""
print " send: ",
packet_contents = "".join(packet_contents)
plen = len(packet_contents) + 2
crc = self.calc_crc(packet_contents)
packet_contents = chr(0x01) + chr(plen) + chr(crc & 0xFF) + chr(crc >> 8) + packet_contents
for c in packet_contents:
print "0x%02X, "%ord(c),
print "\n",
self.__tty.write(packet_contents)
#DON'T USE DIRECTLY RIGHT NOW! Use writeread()
def read(self):
"""Read a packet from the miniboard and return the contents
(with header removed).
If the miniboard takes too long to reply, or if an error occurs,
an exception will be raised. TODO: Exception type."""
print " recv: ",
reply = self.__tty.read(size=10000000)
for c in reply:
print "0x%02X, "%ord(c),
print "\n",
if len(reply) == 0:
print "Error: no reply."
return ""
if reply[0] != chr(0x01):
print "Error: missing start byte."
return ""
if len(reply) < 5:
print "Error: no enough bytes."
return ""
if (len(reply) - 2) != ord(reply[1]):
print "Error: length mismatch (header says %d, packet is %d)"%(ord(reply[1]), len(reply))
return ""
if self.calc_crc(reply[4:]) != (ord(reply[3])<<8 | ord(reply[2])):
print "Error: CRC mismatch."
return ""
return "".join(reply[4:])
def writeread(self, packet_contents):
"""Write a packet to the miniboard and return the reply."""
#self.__tty = serial.Serial(port=self.path,
#baudrate=self.baud,
#parity=serial.PARITY_NONE,
#stopbits=serial.STOPBITS_ONE,
#bytesize=serial.EIGHTBITS,
#timeout=0.1)
self.write(packet_contents)
reply = self.read()
#self.__tty.close()
return reply
def port_info(self):
"""Return a tuple (serial_port_path, baud_rate)."""
return (self.path, self.baud)
def horizontalLine():
#From http://stackoverflow.com/questions/5671354/how-to-programmatically-make-a-horizontal-line-in-qt
line = QFrame()
line.setFrameShape(QFrame.HLine)
line.setFrameShadow(QFrame.Sunken)
return line
def setup(window, spec_table, io):
ww = QWidget(window)
window.setWindowTitle("Miniboard GUI Test Drive")
def get_specpath():
"""Return the path to the specification file,
using a hardcoded default, command-line argument, pop-up dialog, etc."""
return "SPECIFICATION.md"
#def main():
#app = QApplication(sys.argv)
#w = QMainWindow()
#spec = None
#with open(get_specpath(), "r") as f:
#spec = docparse.extract_table(f.read());
#io = MiniboardIO()
#setup(w, spec, io)
#w.show()
#sys.exit(app.exec_())
def set_drive_power(io, left, right):
"""Left and right are floats from -1 to 1."""
l = int(127 * left)
r = int(127 * right)
s = "".join([struct.pack('b', i) for i in [0x10, l, l, l, r, r, r]])
io.writeread(s)
def set_swerve_mode(io, mode):
if mode == 1 or mode == 2:
print "Swerve mode: ", mode
s = "".join([struct.pack('b', i) for i in [0x11, mode]])
io.writeread(s)
def get_joystick(joystick):
"""Return a tuple (l,r) of motor powers from the joystick."""
x = joystick.get_axis(0)
y = joystick.get_axis(1)
#print x, y
#theta = math.atan2(y, x)
#rad = math.sqrt(x**2 + y**2)
#print theta, rad
#if(rad > 1):
#rad = 1
#l=x
#r=-y
return (x,y)
def main():
with open(get_specpath(), "r") as f:
spec = docparse.extract_table(f.read());
io = MiniboardIO()
pygame.init()
pygame.joystick.init()
joysticks = [pygame.joystick.Joystick(x) for x in range(pygame.joystick.get_count())]
j = joysticks[0]
print j.get_name()
j.init()
oldl = 0
oldr = 0
while(True):
pygame.init()
pygame.event.get()
l,r = get_joystick(j)
#print "Motor power L=", l, "R=", r
if(oldl != l or oldr != r):
set_drive_power(io, l, r)
oldl = l
oldr = r
if int(j.get_button(6)) == 1:
set_swerve_mode(io, 2)
elif int(j.get_button(7)) == 1:
set_swerve_mode(io, 1)
else:
set_swerve_mode(io, 0)
time.sleep(30)
main()

View File

@@ -0,0 +1,19 @@
import sys
to_parse = ""
def __parse_and_show_science_data(string):
split_data = string[3:].split(",")
TempC = float(split_data[0])
Moisture = float(split_data[2])
Cond = float(split_data[4])
PermR = float(split_data[6])
PermI = float(split_data[8])
Salinity = Cond * 6.4
print("Temp in C: " + str(TempC))
print("Moisture: " + str(Moisture * 100) + "%")
print("Salinity: " + str(Salinity) + "g/L")
if __name__ == '__main__':
__parse_and_show_science_data(sys.argv[1])

View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python
import sys
import math
import struct
import serial
import signal
import os
import time
import struct
SerialPath="/dev/ttyUSB0"
class SoilSensor():
def __init__(self, path):
self.path = path
self.__tty = serial.Serial(port=self.path,
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=0.2)
def set_mode_rx(self):
"""Set the transceiver to receive mode."""
#Set DTR for receive mode, clear for transmit
self.__tty.setDTR(True)
def set_mode_tx(self):
"""Set the transceiver to transmit mode."""
self.__tty.setDTR(False)
def send_command(self, addr_str, command_str):
"""Send a command to the soil sensor."""
self.set_mode_tx()
time.sleep(.04)
self.__tty.write(addr_str + command_str + "\r")
self.__tty.flush()
time.sleep(.05)
self.__tty.write("\n")
time.sleep(.005)
self.set_mode_rx()
reply = self.__tty.read(size=10000000)
return reply
def set_data(self, addr_str, command_str, data_str):
"""Set data in the soil sensor."""
self.set_mode_tx()
time.sleep(.04)
self.__tty.write(addr_str + command_str + "=" + data_str + "\r\n")
self.__tty.flush()
time.sleep(.05)
self.__tty.write("\n")
time.sleep(.005)
self.set_mode_rx()
reply = self.__tty.read(size=10000000)
return reply
def get_data(self, addr_str, command_str):
"""Get data from the sensor, returning the data.
command_str is the two-character string."""
self.set_mode_tx()
time.sleep(.04)
self.__tty.write(addr_str + command_str + "=?" + "\r")
self.__tty.flush()
time.sleep(.05)
self.__tty.write("\n")
time.sleep(.005)
self.set_mode_rx()
reply = self.__tty.read(size=10000000)
return reply
def get_measurement(self, addr_str):
Moisture = float(data[2])
Cond = float(data[4])
"""Take and return a soil measurement."""
addr = addr_str
self.send_command(addr, "TR")
time.sleep(1)
data = self.send_command(addr, "T3")
print data
data = data[3:]
data = data.split(",")
print "Raw Values (reading set 3):", data
TempC = float(data[0])
PermR = float(data[6])
PermI = float(data[8])
Salinity = Cond * 6.4
return {"TempC":TempC, "Moisture":Moisture, "Salinity":Salinity}
def main():
s = SoilSensor(SerialPath)
addr = s.get_data("///", "SN")[0:3]
s.set_data(addr, "PE", "1")
time.sleep(1)
while True:
print s.get_measurement(addr)
print ""
time.sleep(10)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,156 @@
"""
This file contains the live logs page sub-class
"""
#####################################
# Imports
#####################################
# Python native imports
from PyQt5 import QtCore, QtWidgets, QtGui
import logging
from inputs import devices, GamePad
import time
#####################################
# Global Variables
#####################################
GAME_CONTROLLER_NAME = "FrSky FrSky Taranis Joystick"
CONTROLLER_DATA_UPDATE_FREQUENCY = 50 # Times per second
#####################################
# Controller Class Definition
#####################################
class FreeSkyController(QtCore.QThread):
# ########## Signals ##########
controller_connection_aquired = QtCore.pyqtSignal(bool)
controller_update_ready_signal = QtCore.pyqtSignal()
def __init__(self, main_window):
super(FreeSkyController, self).__init__()
# ########## Reference to top level window ##########
self.main_window = main_window # type: QtWidgets.QMainWindow
# ########## Get the settings instance ##########
self.settings = QtCore.QSettings()
# ########## Get the Pick And Plate instance of the logger ##########
self.logger = logging.getLogger("RoverBaseStation")
# ########## Thread Flags ##########
self.run_thread_flag = True
self.setup_controller_flag = True
self.data_acquisition_and_broadcast_flag = True
self.controller_aquired = False
# ########## Class Variables ##########
self.gamepad = None # type: GamePad
self.controller_states = {
"left_stick_x_axis": 0,
"left_stick_y_axis": 0,
"right_stick_x_axis": 0,
"right_stick_y_axis": 0,
"sa_state": 0,
"sb_state": 0,
"sc_state": 0,
"sd_state": 0,
"se_state": 0,
"sf_state": 0,
"sg_state": 0,
"sh_state": 0,
"ls_axis": 0,
"rs_axis": 0,
"s1_axis": 0,
"s2_axis": 0
}
self.raw_mapping_to_class_mapping = {
"ABS_RX": "left_stick_x_axis",
"ABS_X": "left_stick_y_axis",
"ABS_Z": "right_stick_x_axis",
"ABS_Y": "right_stick_y_axis",
"BTN_SOUTH": "sa_state",
"BTN_EAST": "sb_state",
"BTN_C": "sc_state",
"BTN_NORTH": "sd_state",
"BTN_WEST": "se_state",
"BTN_Z": "sf_state",
"BTN_TL": "sg_state",
"BTN_TR": "sh_state",
"ABS_RY": "ls_axis",
"ABS_RUDDER": "rs_axis",
"ABS_RZ": "s1_axis",
"ABS_THROTTLE" : "s2_axis"
}
self.last_time = time.time()
def run(self):
self.logger.debug("FreeSky Thread Starting...")
while self.run_thread_flag:
if self.setup_controller_flag:
self.controller_aquired = self.__setup_controller()
self.setup_controller_flag = False
if self.data_acquisition_and_broadcast_flag:
self.__get_controller_data()
self.__broadcast_if_ready()
self.logger.debug("FreeSky Thread Stopping...")
# noinspection PyUnresolvedReferences
def connect_signals_to_slots__slot(self):
self.main_window.kill_threads_signal.connect(self.on_kill_threads__slot)
def __setup_controller(self):
for device in devices.gamepads:
if device.name == GAME_CONTROLLER_NAME:
self.gamepad = device
self.controller_connection_aquired.emit(True)
return True
self.logger.info("FrySky Failed to Connect")
self.controller_connection_aquired.emit(False)
return False
def __get_controller_data(self):
if self.controller_aquired:
events = self.gamepad.read()
# self.logger.debug(events[0].manager)
for event in events:
if event.code in self.raw_mapping_to_class_mapping:
self.controller_states[self.raw_mapping_to_class_mapping[event.code]] = event.state
# self.logger.debug(event.state)
# if event.code not in self.raw_mapping_to_class_mapping and event.code != "SYN_REPORT":
# self.logger.debug(str(event.code) + " : " + str(event.state))
#
# if event.code == "ABS_RUDDER":
# self.logger.debug(str(event.code) + " : " + str(event.state))
# self.logger.debug("Frksy: " + str(self.controller_states))
def __broadcast_if_ready(self):
current_time = time.time()
if (current_time - self.last_time) > (1/CONTROLLER_DATA_UPDATE_FREQUENCY):
self.controller_update_ready_signal.emit()
self.last_time = current_time
# self.logger.debug(self.controller_states)
def on_kill_threads__slot(self):
self.terminate() # DON'T normally do this, but fine in this instance
self.run_thread_flag = False

View File

@@ -0,0 +1,130 @@
"""
This file contains the logging core sub-class
"""
#####################################
# Imports
#####################################
# Python native imports
from PyQt5 import QtCore
from os import makedirs, rename, walk, unlink
from os.path import exists, getmtime
import logging
from datetime import datetime
#####################################
# Global Variables
#####################################
MAX_NUM_LOG_FILES = 30
#####################################
# Logger Definition
#####################################
class Logger(QtCore.QObject):
def __init__(self, console_output=True):
super(Logger, self).__init__()
# ########## Local class variables ##########
self.console_output = console_output
# ########## Get the settings instance ##########
self.settings = QtCore.QSettings()
# # ########## Get the Pick And Plate instance of the logger ##########
self.logger = logging.getLogger("RoverBaseStation")
# ########## Set variables with useful paths ##########
self.appdata_base_directory = self.settings.value("appdata_directory", type=str)
self.log_directory = self.appdata_base_directory + "/logs"
self.log_file_path = self.log_directory + "/log.txt"
# ########## Cleanup old log files ##########
self.__cleanup_log_files()
# ########## Set up logger with desired settings ##########
self.__setup_logger()
# ########## Place divider in log file to see new program launch ##########
self.__add_startup_log_buffer_text()
def __setup_logger(self):
# Get the appdata directory and make the log path if it doesn't exist
if not exists(self.log_directory):
makedirs(self.log_directory)
# Set the debugging level
self.logger.setLevel(logging.DEBUG)
# Make a formatter with the log line format wanted
formatter = logging.Formatter(fmt='%(levelname)s : %(asctime)s : %(message)s', datefmt='%m/%d/%y %H:%M:%S')
# Set up a file handler so everything can be saved and attach it to the logger
file_handler = logging.FileHandler(filename=self.log_file_path)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG)
self.logger.addHandler(file_handler)
# Enable console output if requested
if self.console_output:
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.DEBUG)
self.logger.addHandler(console_handler)
def __cleanup_log_files(self):
# This copies the existing log.txt file to an old version with a datetime stamp
# It then checks if there are too many log files, and if so, deletes the oldest
if exists(self.log_directory):
# Get the number of log files
num_log_files = self.__get_num_files_in_directory(self.log_directory)
# Check that we actually have log files
if num_log_files > 0:
date_time = datetime.now().strftime("%Y%m%d-%H%M%S")
# If we do, move the current logfile to a backup in the format old_log_datetime
if exists(self.log_file_path):
rename(self.log_file_path, self.log_directory + "/old_log_" + date_time + ".txt")
# If we have more than the max log files delete the oldest one
if num_log_files >= MAX_NUM_LOG_FILES:
unlink(self.__get_name_of_oldest_file(self.log_directory))
def __add_startup_log_buffer_text(self):
# Prints a header saying when the program started
self.logger.info("########## Application Starting ##########")
@staticmethod
def __get_name_of_oldest_file(input_path):
oldest_file_path = None
previous_oldest_time = 0
# Walk the directory passed in to get all folders and files
for dir_path, dir_names, file_names in walk(input_path):
# Go through all of the filenames found
for file in file_names:
# Recreate the full path and get the modified time of the file
current_path = dir_path + "/" + file
time = getmtime(current_path)
# Default case for if the variables above have not been initially set
if previous_oldest_time == 0:
previous_oldest_time = time
oldest_file_path = current_path
# Saves the oldest time and file path of the current file if it's older (lower timestamp) than the
# last file saved in the variables
if time < previous_oldest_time:
previous_oldest_time = time
oldest_file_path = current_path
# Returns the path to the oldest file after checking all the files
return oldest_file_path
@staticmethod
def __get_num_files_in_directory(input_path):
# Walk the directory passed in to get all the files
for _, _, file_names in walk(input_path):
# Return the number of files found in the directory
return len(file_names)

View File

@@ -0,0 +1,247 @@
"""
This file contains the live logs page sub-class
"""
#####################################
# Imports
#####################################
# Python native imports
from PyQt5 import QtCore, QtWidgets, QtGui
from Resources.Libraries.goompy import GooMPy
from PIL.ImageQt import ImageQt
from PIL import ImageDraw
import logging
#####################################
# Global Variables
#####################################
INITIAL_LATITUDE = 38.4064262
INITIAL_LONGITUDE = -110.794115
INITIAL_ZOOM = 18 # 15,16, and 18 18 seems best
MAPTYPE = 'satellite'
POSSIBLE_ZOOM_LEVELS = [18]
WIDTH = 2 * 640
HEIGHT = 640
HAB_X_OFFSET = 82
HAB_Y_OFFSET = -9
#####################################
# Live Logs Class Definition
#####################################
class MapProcessor(QtCore.QThread):
update_map_label_signal = QtCore.pyqtSignal()
def __init__(self, main_window):
super(MapProcessor, self).__init__()
# ########## Reference to top level window ##########
self.main_window = main_window # type: QtWidgets.QMainWindow
# ########## Get the settings instance ##########
self.settings = QtCore.QSettings()
# ########## Get the Pick And Plate instance of the logger ##########
self.logger = logging.getLogger("RoverBaseStation")
# ########## Thread Flags ##########
self.run_thread_flag = True
self.setup_goompy_flag = True
# ########## References to GUI Elements ##########
self.map_label = self.main_window.map_label # type: QtWidgets.QLabel
# ########## Class Variables ##########
self.goompy = None
self.pillow = None
self.pixmap = QtGui.QPixmap("Resources/Images/waiting.png")
self.gps_valid = None
self.lat = None
self.lon = None
self.alt = None # {'gps_pos_valid': 1, 'altitude': 21052, 'longitude': -668740150, 'latitude': 229797144}
self.gps_track_valid = None
self.gps_speed = None
self.gps_heading = None
self.current_zoom_index = POSSIBLE_ZOOM_LEVELS.index(INITIAL_ZOOM)
self.last_zoom_level_index = self.current_zoom_index
self.num_zoom_levels = len(POSSIBLE_ZOOM_LEVELS)
# ########## Gui Event Variables ##########
self.x_drag = 0
self.y_drag = 0
def run(self):
self.logger.debug("Map Processor Thread Starting...")
self.update_map_label_signal.emit()
while self.run_thread_flag:
if self.setup_goompy_flag:
self.__setup_goompy()
self.setup_goompy_flag = False
else:
self.__get_goompy_pillow_image()
#self.__add_map_point_from_gps(38.406404, -110.783481)
self.__show_map_plus_overlays()
self.msleep(1)
self.logger.debug("Map Processor Thread Stopping...")
def __setup_goompy(self):
self.goompy = GooMPy(WIDTH, HEIGHT, INITIAL_LATITUDE, INITIAL_LONGITUDE, INITIAL_ZOOM, MAPTYPE, 1500)
self.goompy.move(HAB_X_OFFSET, HAB_Y_OFFSET)
def __get_goompy_pillow_image(self):
if self.last_zoom_level_index != self.current_zoom_index:
self.logger.debug("Changed to zoom level: " + str(POSSIBLE_ZOOM_LEVELS[self.current_zoom_index]))
self.goompy.useZoom(POSSIBLE_ZOOM_LEVELS[self.current_zoom_index])
self.last_zoom_level_index = self.current_zoom_index
self.pillow = self.goompy.getImage().convert("RGBA")
def __show_map_plus_overlays(self):
self.__draw_circle_at_point(self.pillow, self.goompy.width // 2, self.goompy.height // 2, 3)
qim = ImageQt(self.pillow)
self.pixmap = QtGui.QPixmap.fromImage(qim)
self.update_map_label_signal.emit()
def __add_map_point_from_gps(self, lat, lon):
self.logger.debug("Beginning")
if self.goompy:
viewport_lat_nw, viewport_lon_nw = self.goompy.northwest
viewport_lat_se, viewport_lon_se = self.goompy.southeast
viewport_lat_diff = viewport_lat_nw - viewport_lat_se
viewport_lon_diff = viewport_lon_se - viewport_lon_nw
viewport_upper_y = self.goompy.uppery
viewport_left_x = self.goompy.leftx
viewport_width = self.goompy.width
viewport_height = self.goompy.height
bigimage_width = self.goompy.bigimage.size[0]
bigimage_height = self.goompy.bigimage.size[1]
pixel_per_lat = bigimage_height / viewport_lat_diff
pixel_per_lon = bigimage_width / viewport_lon_diff
#
# viewport_left_percentage = viewport_left_x / bigimage_width
# viewport_right_percentage = (viewport_left_x + viewport_width) / bigimage_width
# viewport_top_percentage = viewport_upper_y / bigimage_height
# viewport_bottom_percentage = (viewport_upper_y + viewport_height) / bigimage_height
#
new_lat_gps_range_percentage = (viewport_lat_nw - lat)
new_lon_gps_range_percentage = (lon - viewport_lon_nw)
point_x = new_lon_gps_range_percentage * pixel_per_lon
point_y = new_lat_gps_range_percentage * pixel_per_lat
self.__draw_circle_at_point(self.goompy.bigimage, int(point_x), int(point_y), 3)
#
# self.logger.debug("vlp: " + str(viewport_left_percentage) + " nlatp: " + str(new_lon_gps_range_percentage) + " vrp: " + str(viewport_right_percentage))
# self.logger.debug("vtp: " + str(viewport_top_percentage) + " nlonp: " + str(new_lat_gps_range_percentage) + " vbp: " + str(viewport_bottom_percentage))
#
# x = new_lon_gps_range_percentage * bigimage_width
# y = new_lat_gps_range_percentage * bigimage_height
#
# self.__draw_circle_at_point(self.goompy.bigimage, x, y, 3)
#
# if viewport_left_percentage < new_lon_gps_range_percentage < viewport_right_percentage and \
# viewport_top_percentage < new_lat_gps_range_percentage < viewport_bottom_percentage:
# self.logger.debug("valid point")
# else:
# self.logger.debug("invalid point")
self.logger.debug("Made it here")
def __draw_circle_at_point(self, image, x, y, r):
draw = ImageDraw.Draw(image)
draw.ellipse((x - r, y - r, x + r, y + r), fill=(255, 0, 0, 255))
def __on_map_label_update_requested__slot(self):
self.map_label.setPixmap(self.pixmap)
def on_gps_coordinates_updated__slot(self, sdict):
self.gps_valid = sdict["gps_pos_valid"]
self.lat = (sdict["latitude"] * 10 ** -5) / 60
self.lon = (sdict["longitude"] * 10 ** -5) / 60
self.alt = (sdict["altitude"] / 10)
if self.gps_valid:
pass
#self.logger.debug("LAT: " + str(self.lat) + " | LON: " + str(self.lon) + " | ALT: " + str(self.alt))
else:
pass
#self.logger.debug("Waiting for GPS lock")
def on_gps_track_updated__slot(self, sdict):
self.gps_track_valid = sdict["gps_track_valid"]
self.gps_speed = sdict["gps_speed"]
self.gps_heading = sdict["gps_heading"]
if self.gps_track_valid:
pass
# self.logger.debug("SPEED: " + str(self.gps_speed) + " | HEADING: " + str(self.gps_heading))
else:
pass
def __mouse_released_on_map_event(self, _):
self.x_drag = -1
self.y_drag = -1
def __mouse_scrolled_on_map(self, event):
if self.goompy:
if event.angleDelta().y() > 0:
self.current_zoom_index += 1
else:
self.current_zoom_index -= 1
self.current_zoom_index = self.clamp(self.current_zoom_index, 0, self.num_zoom_levels - 1)
def __mouse_moved_on_map_label(self, event):
if self.x_drag != -1 and self.y_drag != -1 and self.goompy:
buttons = event.buttons()
if buttons == QtCore.Qt.LeftButton:
x_pos = event.pos().x()
y_pos = event.pos().y()
dx = x_pos - self.x_drag
dy = y_pos - self.y_drag
if dy < 300 and dx < 300:
# self.logger.debug(str(dx) + " : " + str(dy))
self.goompy.move(-dx, -dy)
self.x_drag = x_pos
self.y_drag = y_pos
else:
self.x_drag = event.pos().x()
self.y_drag = event.pos().y()
else:
self.x_drag = event.pos().x()
self.y_drag = event.pos().y()
# noinspection PyUnresolvedReferences
def connect_signals_to_slots__slot(self):
self.main_window.kill_threads_signal.connect(self.on_kill_threads__slot)
self.update_map_label_signal.connect(self.__on_map_label_update_requested__slot)
self.main_window.miniboard_class.data_gps_position.connect(self.on_gps_coordinates_updated__slot)
self.main_window.miniboard_class.data_gps_track.connect(self.on_gps_track_updated__slot)
self.map_label.mouseReleaseEvent = self.__mouse_released_on_map_event
self.map_label.mouseMoveEvent = self.__mouse_moved_on_map_label
self.map_label.wheelEvent = self.__mouse_scrolled_on_map
def on_kill_threads__slot(self):
self.run_thread_flag = False
@staticmethod
def clamp(n, minn, maxn):
return max(min(maxn, n), minn)

View File

@@ -0,0 +1,305 @@
#!/usr/bin/env python3
"""
This file contains the rover communication code.
"""
import sys
import os
import serial
from PyQt5 import QtCore, QtWidgets, QtGui
import logging
import struct
import signal
import time
from serial.tools.list_ports import comports
from io import StringIO, BytesIO
import enum
# Import docparse
docparsepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + "/common/"
sys.path.append(docparsepath)
import docparse
with open(docparsepath + "/SPECIFICATION.md") as p:
RoverCmdTable = docparse.extract_table(p.read())
RoverCmdDict = {cmd["code"]: cmd for cmd in RoverCmdTable}
for k in RoverCmdDict:
expected_size = 5
for a in RoverCmdDict[k]["argument"]:
if a[0] == "*":
expected_size += 256
else:
expected_size += int(a[0][1:]) // 8
RoverCmdDict[k]["expected_size"] = expected_size
# Create read/write functions
def create_funcs(module_vars, cmd_table):
"""Create functions of the form
read_<command_name>(signal)
write_<command_name>(signal, *args)
such as
read_pause(signal)
write_pause(signal, pause_state)
in the current module."""
log = logging.getLogger("Miniboard Command Functions")
def build_read_func(cmd):
"""Return a function that takes a signal (connected to MiniboardIO's send())
and emits a string of packet data."""
def f(signal):
packet_data = [cmd["code"] | 0x80]
signal.emit(packet_data)
return f
def build_write_func(cmd):
"""Return a function that takes a signal (connected to MiniboardIO's __send())
and emits a string of packet data."""
fmtcodes = {"u8": "<B", "i8": "<b", "u16": "<H", "i16": "<h", "u32": "<I", "i32": "<i", "i64": "<Q",
"i64": "<q"}
def f(signal, *args):
if len(args) != len(cmd["argument"]):
log.error("Error calling write func for command %s: expected %d arguments, got %d" % (
cmd["name"], len(args), len(cmd["arguments"])))
return
packet_data = [cmd["code"]]
for a, v in zip(cmd["argument"], args):
if a[0] == "*":
packet_data += list(v)
else:
packet_data += list(struct.pack(fmtcodes[a[0]], v))
signal.emit(packet_data)
return f
for c in cmd_table:
if "r" in c["rw"]: module_vars["read_" + docparse.cannon_name(c["name"])] = build_read_func(c)
if "w" in c["rw"]: module_vars["write_" + docparse.cannon_name(c["name"])] = build_write_func(c)
create_funcs(vars(), RoverCmdTable)
def make_signals():
"""Return a string that when eval'd in MiniboardIO
creates signals for received data from the rover.
Each signal has the name data_<canonical command name>,
such as data_battery_voltage.
Each signal emits a dictionary containing string keys of the
command's argument names."""
s = ""
for c in RoverCmdTable:
signame = "data_" + docparse.cannon_name(c["name"])
s += "%s = QtCore.pyqtSignal([dict])\n" % signame
signame = "ack_" + docparse.cannon_name(c["name"])
s += "%s = QtCore.pyqtSignal()\n" % signame
return s
signal_eval_str = make_signals()
class MiniboardIO(QtCore.QThread):
"""Handles reading and writing from the miniboard."""
path = "/dev/ttyUSB0"
# path = "COM21"
baud = 115200
on_kill_threads__slot = QtCore.pyqtSignal()
message_received_signal = QtCore.pyqtSignal()
exec(signal_eval_str)
def __init__(self, main_window):
super().__init__()
self.main_window = main_window
self.logger = logging.getLogger("RoverBaseStation")
os.system("stty -F %s -hupcl" % self.path)
com_name = comports()[0].device
if "COM" in com_name:
com_name = "COM21"
self.logger.debug("Connecting to " + com_name)
# self.tty = serial.Serial(port=com_name,
# baudrate=self.baud,
# parity=serial.PARITY_NONE,
# stopbits=serial.STOPBITS_ONE,
# bytesize=serial.EIGHTBITS)
self.run_thread_flag = True
self.queue = []
def calc_crc(self, body):
remainder = 0xFFFF
for i in range(0, len(body)):
remainder ^= body[i] << 8
remainder &= 0xFFFF
for bit in reversed(range(1, 9)):
if remainder & 0x8000:
remainder = (remainder << 1) ^ 0x1021
remainder &= 0xFFFF
else:
remainder <<= 1
remainder &= 0xFFFF
return remainder
def append(self, body_list):
"""Add a command to the queue."""
self.queue.append(body_list)
def __send(self, body_list):
"""Given a packet body list, turn it into a packet and send it to the rover."""
packet_contents = body_list
plen = len(packet_contents) + 2
crc = self.calc_crc(packet_contents)
packet_contents = [0x01] + [plen] + [crc & 0xFF] + [crc >> 8] + packet_contents
self.tty.write(packet_contents)
self.msleep((4 + len(body_list)) * .001)
def run(self):
"""Read from the serial port, recognize the command, and emit a signal."""
return
self.logger.debug("MiniboardIO Thread Starting...")
self.reply = []
fmtcodes = {"u8": "<B", "i8": "<b", "u16": "<H", "i16": "<h", "u32": "<I", "i32": "<i", "i64": "<Q",
"i64": "<q"}
waiting_for_command_reply = False
start_time = time.time()
while self.run_thread_flag:
if len(self.queue) > 0 and not waiting_for_command_reply:
body = self.queue[0]
self.__send(body)
self.queue = self.queue[1:]
if len(body) < 1:
expected_size = 1000000
else:
if body[0] & 0x80: # read
expected_size = RoverCmdDict[body[0] & 0x7F]["expected_size"]
else:
expected_size = 5
start_time = time.time()
waiting_for_command_reply = True
if waiting_for_command_reply:
if self.tty.inWaiting() >= expected_size:
waiting_for_command_reply = False
self.reply = list(self.tty.read(size=expected_size))
# n = 2
# while n > 0 and len(self.reply) < expected_size:
# self.reply += list(self.tty.read(size=1))
# n -= 1
while len(self.reply) > 0:
while len(self.reply) > 0:
if self.reply[0] != 0x01:
self.reply = self.reply[1:]
else:
break
if len(self.reply) > 0: # Got start byte
if len(self.reply) >= 5: # Got minimum complete packet
if len(self.reply) >= (self.reply[1] + 2): # Got enough bytes for this packet
if self.calc_crc(self.reply[4:]) == struct.unpack("<H", bytes(self.reply[2:4]))[0]: # CRC OK
if self.reply[4] & 0x80:
# self.logger.debug("read")
code = self.reply[4] & 0x7F
cmd = RoverCmdDict[code]
adict = {}
b = 5
vs = []
for a, i in zip(cmd["argument"], range(0, len(cmd["argument"]))):
if a[0] != "*":
s = struct.calcsize(fmtcodes[a[0]])
value = struct.unpack(fmtcodes[a[0]], bytes(self.reply[b:b + s]))[0]
adict[a[1]] = value
vs.append(value)
b += s
else:
s = vs[i - 1]
value = self.reply[b:b + s]
adict[a[1]] = value
b += s
getattr(self, "data_" + docparse.cannon_name(cmd["name"])).emit(adict)
# self.logger.debug(docparse.cannon_name + ": " + adict.__str__())
else:
code = self.reply[4] & 0x7F
cmd = RoverCmdDict[code]
getattr(self, "ack_" + docparse.cannon_name(cmd["name"])).emit()
self.reply = self.reply[(self.reply[1] + 2):]
self.message_received_signal.emit()
#self.logger.debug("MiniIO Loop: " + str(time.time() - start_time))
if time.time() - start_time > 0.3:
self.logger.error("MINIIO: Lost ACK : " + str(time.time() - start_time))
waiting_for_command_reply = False
self.queue = []
self.reply = []
self.tty.flushOutput()
self.tty.flushInput()
self.msleep(1)
self.logger.debug("MiniboardIO Thread Stopping...")
def clear_buffers_and_queues(self):
self.reply = ""
self.queue = []
self.tty.flushInput()
self.tty.flushOutput()
def connect_signals_to_slots__slot(self):
self.main_window.kill_threads_signal.connect(self.on_kill_threads__slot)
def on_kill_threads__slot(self):
self.run_thread_flag = False
class DemoThread(QtCore.QThread):
on_kill_threads__slot = QtCore.pyqtSignal()
send_packet = QtCore.pyqtSignal(list)
def __init__(self, main_window):
super().__init__()
self.main_window = main_window
self.connect_signals_to_slots()
def run(self):
while True:
write_swerve_drive_state(self.send_packet, 3)
read_swerve_drive_state(self.send_packet)
read_callsign(self.send_packet)
self.msleep(1000)
def connect_signals_to_slots(self):
self.main_window.kill_threads_signal.connect(self.on_kill_threads__slot)
self.send_packet.connect(self.main_window.m.append)
self.main_window.m.data_swerve_drive_state.connect(self.handle_swerve_data)
def handle_swerve_data(self, sdict):
pass
# self.logger.debug(sdict)
class DemoWindow(QtWidgets.QMainWindow):
kill_threads_signal = QtCore.pyqtSignal()
def __init__(self):
super().__init__()
self.m = MiniboardIO(self)
self.d = DemoThread(self)
self.m.start()
self.d.start()
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL) # This allows the keyboard interrupt kill to work properly
application = QtWidgets.QApplication(sys.argv) # Create the base qt gui application
app_window = DemoWindow() # Make a window in this application
app_window.setWindowTitle("Demo Demo") # Sets the window title
app_window.show() # Show the window in the application
application.exec_() # Execute launching of the application

View File

@@ -0,0 +1,507 @@
"""
This file contains the live logs page sub-class
"""
#####################################
# Imports
#####################################
# Python native imports
from PyQt5 import QtCore, QtWidgets
import logging
from Framework.MiniBoardIOCore import write_drive_motor_power, read_drive_motor_power, write_pause, \
write_arm_motors, write_swerve_drive_state, write_joystick
import time
#####################################
# Global Variables
#####################################
JOY_MIN = -127
JOY_MAX = 127
DRIVE_MIN = -127
DRIVE_MAX = 127
ARMS_MIN = -127
ARMS_MAX = 127
DEAD_BAND_FRSKY = 20
DEAD_BAND_XBOX = 1500
DRIVE_TIMEOUT = 0.5 # Seconds
ARM_TIMEOUT = 0.25 # Seconds
PAN_TILT_TIMEOUT = 0.25 # Seconds
DRIVE_SWERVE_TIMEOUT = 5 # Seconds
#####################################
# Notes
#####################################
# ##### Xbox State Names
# "left_stick_x_axis"
# "left_stick_y_axis"
# "left_stick_center_pressed"
#
# "right_stick_x_axis"
# "right_stick_y_axis"
# "right_stick_center_pressed"
#
# "left_trigger_z_axis"
# "left_bumper_pressed"
#
# "right_trigger_z_axis"
# "right_bumper_pressed"
#
# "dpad_x"
# "dpad_y"
#
# "select_pressed"
# "start_pressed"
# "home_pressed"
#
# "a_pressed"
# "b_pressed"
# "x_pressed"
# "y_pressed"
# ##### Frsky State Names
# "left_stick_x_axis"
# "left_stick_y_axis"
#
# "right_stick_x_axis"
# "right_stick_y_axis"
#
# "sc_state"
# "sg_state"
# "se_state"
# "sa_state"
#
# "ls_axis"
# "rs_axis"
#
# "s1_axis"
#####################################
# Controller Class Definition
#####################################
class MotionProcessor(QtCore.QThread):
send_miniboard_control_packet = QtCore.pyqtSignal(list)
def __init__(self, main_window):
super(MotionProcessor, self).__init__()
# ########## Reference to top level window ##########
self.main_window = main_window # type: QtWidgets.QMainWindow
# ########## Get the settings instance ##########
self.settings = QtCore.QSettings()
# ########## Get the Pick And Plate instance of the logger ##########
self.logger = logging.getLogger("RoverBaseStation")
# ########## Thread Flags ##########
self.run_thread_flag = True
self.wait_for_drive_response = False
self.wait_for_pan_tilt_response = False
self.wait_for_arm_response = False
self.wait_for_swerve_state_response = False
self.wait_for_passthrough_response = False
# ########## Class Variables ##########
self.xbox_states = self.main_window.xbox_controller_class.controller_states
self.frsky_states = self.main_window.frsky_controller_class.controller_states
self.last_pause_state = 0
self.last_swerve_state = 0
self.last_drive_state = 0
self.last_left_drive_value = 0
self.last_right_drive_value = 0
self.pan_position = -1
self.tilt_position = -1
self.frsky_locked = False
self.xbox_locked = False
def run(self):
self.logger.debug("Motion Processor Thread Starting...")
while self.run_thread_flag:
if self.xbox_states and self.frsky_states:
start_time = time.time()
self.__send_controller_passthrough()
if not self.frsky_states["sa_state"]: # 0 is drive mode
if not self.frsky_states["se_state"]: # 0 is drive mode
# TODO: Change to send faster
pass
else: # 1 is auto mode
# TODO: Change to send slower
pass
# self.logger.debug("Control time: " + str(time.time() - start_time))
self.msleep(1)
self.logger.debug("Motion Processor Thread Stopping...")
# noinspection PyUnresolvedReferences
def connect_signals_to_slots__slot(self):
self.main_window.xbox_controller_class.controller_update_ready_signal.connect(self.on_xbox_states_updated__slot)
self.main_window.frsky_controller_class.controller_update_ready_signal.connect(
self.on_frsky_states_updated__slot)
self.send_miniboard_control_packet.connect(self.main_window.miniboard_class.append)
self.main_window.miniboard_class.data_drive_motor_power.connect(
self.on_drive_motor_power_response_received__slot)
self.main_window.miniboard_class.ack_drive_motor_power.connect(self.on_drive_response_received__slot)
# self.main_window.miniboard_class.data_pan_tilt.connect(self.on_pan_tilt_primary_position_response__slot)
# self.main_window.miniboard_class.data_pan_tilt_secondary.connect(
# self.on_pan_tilt_secondary_position_response__slot)
# self.main_window.miniboard_class.ack_pan_tilt.connect(self.on_primary_pan_tilt_write_acknowledged__slot)
self.main_window.miniboard_class.ack_arm_motors.connect(self.on_arm_motors_write_acknowledged__slot)
self.main_window.miniboard_class.ack_swerve_drive_state.connect(self.on_swerve_state_response_received__slot)
self.main_window.miniboard_class.ack_joystick.connect(self.on_passthrough_response_received__slot)
self.main_window.kill_threads_signal.connect(self.on_kill_threads__slot)
def __send_controller_passthrough(self):
# ##### Frsky #####
fr_left_horiz = int(self.clamp(self.frsky_states["left_stick_x_axis"], JOY_MIN, JOY_MAX))
fr_left_vert = int(self.clamp(self.frsky_states["left_stick_y_axis"], JOY_MIN, JOY_MAX))
fr_right_horiz = int(self.clamp(self.frsky_states["right_stick_x_axis"], JOY_MIN, JOY_MAX))
fr_right_vert = int(self.clamp(self.frsky_states["right_stick_y_axis"], JOY_MIN, JOY_MAX))
fr_left_pot = int(self.clamp(self.frsky_states["s1_axis"], JOY_MIN, JOY_MAX))
fr_right_pot = int(self.clamp(self.frsky_states["s2_axis"], JOY_MIN, JOY_MAX))
fr_left_side_pot = int(self.clamp(self.frsky_states["ls_axis"], JOY_MIN, JOY_MAX))
fr_right_side_pot = int(self.clamp(self.frsky_states["rs_axis"], JOY_MIN, JOY_MAX))
fr_button_a = self.frsky_states["sa_state"]
fr_button_b = self.frsky_states["sb_state"]
fr_button_c = self.frsky_states["sc_state"]
fr_button_d = self.frsky_states["sd_state"]
fr_button_e = self.frsky_states["se_state"]
fr_button_f = self.frsky_states["sf_state"]
fr_button_g = self.frsky_states["sg_state"]
fr_button_h = self.frsky_states["sh_state"]
frsky_buttons_byte = 0
frsky_buttons_byte |= (fr_button_a << 0)
frsky_buttons_byte |= (fr_button_b << 1)
frsky_buttons_byte |= (fr_button_c << 2)
frsky_buttons_byte |= (fr_button_d << 3)
frsky_buttons_byte |= (fr_button_e << 4)
frsky_buttons_byte |= (fr_button_f << 5)
frsky_buttons_byte |= (fr_button_g << 6)
frsky_buttons_byte |= (fr_button_h << 7)
# ##### XBOX #####
xb_left_horiz = int(self.clamp(self.xbox_states["left_stick_x_axis"] // 256, JOY_MIN, JOY_MAX))
xb_left_vert = int(self.clamp(self.xbox_states["left_stick_y_axis"] // 256, JOY_MIN, JOY_MAX))
xb_right_horiz = int(self.clamp(self.xbox_states["right_stick_x_axis"] // 256, JOY_MIN, JOY_MAX))
xb_right_vert = int(self.clamp(self.xbox_states["right_stick_y_axis"] // 256, JOY_MIN, JOY_MAX))
xb_left_trig = int(self.clamp(self.xbox_states["left_trigger_z_axis"] - 127, JOY_MIN, JOY_MAX))
xb_right_trig = int(self.clamp(self.xbox_states["right_trigger_z_axis"] - 127, JOY_MIN, JOY_MAX))
xb_button_a = self.xbox_states["a_pressed"]
xb_button_b = self.xbox_states["b_pressed"]
xb_button_x = self.xbox_states["x_pressed"]
xb_button_y = self.xbox_states["y_pressed"]
xb_button_lb = self.xbox_states["left_bumper_pressed"]
xb_button_rb = self.xbox_states["right_bumper_pressed"]
xb_button_lsc = self.xbox_states["left_stick_center_pressed"]
xb_button_rsc = self.xbox_states["right_stick_center_pressed"]
xb_button_sel = self.xbox_states["select_pressed"]
xb_button_strt = self.xbox_states["start_pressed"]
xb_button_home = self.xbox_states["home_pressed"]
xb_button_dph = self.xbox_states["dpad_x"]
xb_button_dpv = self.xbox_states["dpad_y"]
xbox_buttons_low_byte = 0
xbox_buttons_high_byte = 0
xbox_buttons_low_byte |= (xb_button_a << 0)
xbox_buttons_low_byte |= (xb_button_b << 1)
xbox_buttons_low_byte |= (xb_button_x << 2)
xbox_buttons_low_byte |= (xb_button_y << 3)
xbox_buttons_low_byte |= (xb_button_lb << 4)
xbox_buttons_low_byte |= (xb_button_rb << 5)
xbox_buttons_low_byte |= (xb_button_lsc << 6)
xbox_buttons_low_byte |= (xb_button_rsc << 7)
xbox_buttons_high_byte |= (xb_button_sel << 0)
xbox_buttons_high_byte |= (xb_button_strt << 1)
xbox_buttons_high_byte |= (xb_button_home << 2)
dpad_l_state = 0
dpad_r_state = 0
dpad_u_state = 0
dpad_d_state = 0
if xb_button_dph == -1:
dpad_l_state = 1
elif xb_button_dph == 1:
dpad_r_state = 1
if xb_button_dpv == -1:
dpad_u_state = 1
elif xb_button_dpv == 1:
dpad_d_state = 1
xbox_buttons_high_byte |= (dpad_l_state << 3)
xbox_buttons_high_byte |= (dpad_r_state << 4)
xbox_buttons_high_byte |= (dpad_u_state << 5)
xbox_buttons_high_byte |= (dpad_d_state << 6)
xbox_buttons_high_byte |= (1 << 7)
# self.logger.debug('{0:08b}'.format(xbox_buttons_high_byte) + " : " + '{0:08b}'.format(xbox_buttons_low_byte))
# current_array = [fr_left_horiz, fr_left_vert, fr_right_horiz, fr_right_vert, fr_left_pot, fr_right_pot, fr_left_side_pot, fr_right_side_pot, frsky_buttons_byte, xb_left_horiz, xb_left_vert, xb_right_horiz, xb_right_vert, xb_left_trig, xb_right_trig, xbox_buttons_high_byte, xbox_buttons_low_byte]
# desired_array = [str(number) for number in current_array]
# joined = " : ".join(desired_array)
# self.logger.debug(joined)
self.wait_for_passthrough_response = True
write_joystick(self.send_miniboard_control_packet, fr_left_horiz, fr_left_vert, fr_right_horiz, fr_right_vert, fr_left_pot, fr_right_pot, fr_left_side_pot, fr_right_side_pot, frsky_buttons_byte, xb_left_horiz, xb_left_vert, xb_right_horiz, xb_right_vert, xb_left_trig, xb_right_trig, xbox_buttons_high_byte, xbox_buttons_low_byte)
# ##### Standard timeout block #####
start_time = time.time()
time_elapsed = 0.0
while self.wait_for_passthrough_response and time_elapsed < DRIVE_TIMEOUT: # I'm being explicit here
time_elapsed = time.time() - start_time
self.msleep(1)
if time_elapsed > DRIVE_TIMEOUT:
self.logger.debug("MOTION: TOO SLOW: " + str(time_elapsed))
# ##### End standard timeout block #####
def __set_pause_on_state_change(self):
current_state = self.frsky_states['sf_state']
if current_state != self.last_pause_state:
write_pause(self.send_miniboard_control_packet, current_state)
self.last_pause_state = current_state
def __all_stop_on_arm_control(self):
current_state = self.frsky_states["sa_state"]
if (current_state == 1) and (self.last_drive_state == 0):
self.wait_for_drive_response = True
write_drive_motor_power(self.send_miniboard_control_packet, 0, 0, 0, 0,
0, 0)
# ##### Standard timeout block #####
start_time = time.time()
time_elapsed = 0
while self.wait_for_drive_response and time_elapsed < DRIVE_TIMEOUT: # I'm being explicit here
time_elapsed = time.time() - start_time
self.msleep(1)
# ##### End standard timeout block #####
while self.frsky_states["left_stick_y_axis"] > DEAD_BAND_FRSKY and self.frsky_states["right_stick_y_axis"] > DEAD_BAND_FRSKY:
self.msleep(1)
self.last_drive_state = current_state
elif (current_state == 0) and (self.last_drive_state == 1):
self.last_drive_state = 0
def __set_swerve_state_on_state_change(self):
current_state = self.frsky_states['sg_state']
self.logger.debug(str(current_state) + str(self.last_swerve_state))
if current_state != self.last_swerve_state:
self.logger.debug("State changed")
# #######################################
# ##### Then send the state change #####
# #######################################
if current_state: # 1 indicates turn to swerve, command 2
command = 2
else: # 0 indicates straight, command 1
command = 1
self.wait_for_swerve_state_response = True
write_swerve_drive_state(self.send_miniboard_control_packet, command)
# ##### Standard timeout block #####
start_time = time.time()
time_elapsed = 0
while self.wait_for_swerve_state_response and time_elapsed < DRIVE_SWERVE_TIMEOUT: # I'm being explicit here
time_elapsed = time.time() - start_time
self.msleep(1)
# ##### End standard timeout block #####
self.msleep(DRIVE_SWERVE_TIMEOUT)
self.last_swerve_state = current_state
def __arm_control_manual(self):
OFFSET = 127
max_speed_scale_raw = self.frsky_states["s1_axis"] + OFFSET
base_speed_raw = self.frsky_states["left_stick_x_axis"]
bicep_speed_raw = self.frsky_states["left_stick_y_axis"]
forearm_speed_raw = self.frsky_states["right_stick_y_axis"]
pitch_speed_raw = self.frsky_states["rs_axis"]
wrist_rotation_speed_raw = self.frsky_states["right_stick_x_axis"]
scale_percentage = max_speed_scale_raw / 255
base_speed_scaled = base_speed_raw * scale_percentage
bicep_speed_scaled = bicep_speed_raw * scale_percentage
forearm_speed_scaled = forearm_speed_raw * scale_percentage
pitch_speed_scaled = pitch_speed_raw * scale_percentage
wrist_rotation_speed_scaled = wrist_rotation_speed_raw * scale_percentage
base_speed_scaled = int(self.clamp(base_speed_scaled, ARMS_MIN, ARMS_MAX))
bicep_speed_scaled = int(self.clamp(bicep_speed_scaled, ARMS_MIN, ARMS_MAX))
forearm_speed_scaled = int(self.clamp(forearm_speed_scaled, ARMS_MIN, ARMS_MAX))
pitch_speed_scaled = int(self.clamp(pitch_speed_scaled, ARMS_MIN, ARMS_MAX))
wrist_rotation_speed_scaled = int(self.clamp(wrist_rotation_speed_scaled, ARMS_MIN, ARMS_MAX))
self.wait_for_arm_response = True
write_arm_motors(self.send_miniboard_control_packet,
base_speed_scaled,
bicep_speed_scaled,
forearm_speed_scaled,
pitch_speed_scaled,
wrist_rotation_speed_scaled)
# ##### Standard timeout block #####
start_time = time.time()
time_elapsed = 0
while self.wait_for_arm_response and time_elapsed < ARM_TIMEOUT: # I'm being explicit here
time_elapsed = time.time() - start_time
self.msleep(1)
# ##### End standard timeout block #####
# write_arm_motors(self.send_miniboard_control_packet, )
def __drive_manual(self):
OFFSET = 127
max_speed_scale_raw = self.frsky_states["s1_axis"] + OFFSET # Range (min to max) 0 to 255
left_stick_y_axis_raw = self.frsky_states["left_stick_y_axis"] # Range (min to max) -127 to 127
right_stick_y_axis_raw = self.frsky_states["right_stick_y_axis"] # Range (min to max) -127 to 127
scale_percentage = max_speed_scale_raw / 255
left_scaled = left_stick_y_axis_raw * scale_percentage
right_scaled = right_stick_y_axis_raw * scale_percentage
left_scaled = int(self.clamp(left_scaled, DRIVE_MIN, DRIVE_MAX))
right_scaled = int(self.clamp(right_scaled, DRIVE_MIN, DRIVE_MAX))
# if left_scaled != self.last_left_drive_value or right_scaled != self.last_right_drive_value:
self.wait_for_drive_response = True
write_drive_motor_power(self.send_miniboard_control_packet, left_scaled, left_scaled, left_scaled, right_scaled,
right_scaled, right_scaled)
self.logger.debug(str(left_scaled) + " : " + str(right_scaled))
# ##### Standard timeout block #####
start_time = time.time()
time_elapsed = 0
while self.wait_for_drive_response and time_elapsed < DRIVE_TIMEOUT: # I'm being explicit here
time_elapsed = time.time() - start_time
self.msleep(1)
# ##### End standard timeout block #####
# self.last_left_drive_value = left_scaled
# self.last_right_drive_value = right_scaled
# read_drive_motor_power(self.send_miniboard_control_packet)
def __drive_auto(self):
self.msleep(1)
def __pan_tilt_manual(self):
# X axis (min to max) -32768 to 32768
# Y axis (min to max) 32768 to 32768
controller_pan_raw = self.xbox_states["right_stick_x_axis"]
controller_tilt_raw = self.xbox_states["right_stick_y_axis"]
if abs(controller_pan_raw) < DEAD_BAND_XBOX:
controller_pan_raw = 0
if abs(controller_tilt_raw) < DEAD_BAND_XBOX:
controller_tilt_raw = 0
controller_pan = -controller_pan_raw/2560
controller_tilt = -controller_tilt_raw/2560
new_pan = self.clamp(int(self.pan_position+controller_pan), 0, 65535)
new_tilt = self.clamp(int(self.tilt_position+controller_tilt), 0, 65535)
self.wait_for_pan_tilt_response = True
write_pan_tilt(self.send_miniboard_control_packet, new_pan, new_tilt)
# ##### Standard timeout block #####
start_time = time.time()
time_elapsed = 0
while self.wait_for_pan_tilt_response and time_elapsed < PAN_TILT_TIMEOUT: # I'm being explicit here
time_elapsed = time.time() - start_time
self.msleep(1)
# ##### End standard timeout block #####
self.pan_position = new_pan
self.tilt_position = new_tilt
def on_xbox_states_updated__slot(self):
return
def on_frsky_states_updated__slot(self):
return
def on_primary_pan_tilt_write_acknowledged__slot(self):
self.wait_for_pan_tilt_response = False
def on_secondary_pan_tilt_write_acknowledged__slot(self):
self.wait_for_pan_tilt_response = False #TODO: make secondary
def on_pan_tilt_primary_position_response__slot(self, sdict):
try:
self.pan_position = sdict["pan"]
self.tilt_position = sdict["tilt"]
except:
pass
def on_pan_tilt_secondary_position_response__slot(self, sdict):
pass
#self.logger.debug(sdict)
def on_drive_motor_power_response_received__slot(self, sdict):
pass
#self.logger.debug(sdict)
def on_passthrough_response_received__slot(self):
self.wait_for_passthrough_response = False
def on_drive_response_received__slot(self):
self.wait_for_drive_response = False
def on_arm_motors_write_acknowledged__slot(self):
self.wait_for_arm_response = False
def on_swerve_state_response_received__slot(self):
self.wait_for_swerve_state_response = False
def on_kill_threads__slot(self):
self.run_thread_flag = False
@staticmethod
def clamp(n, minn, maxn):
return max(min(maxn, n), minn)

View File

@@ -0,0 +1,56 @@
"""
This file calls read voltage, and read drive motor power
"""
#####################################
# Imports
#####################################
# Python native imports
from PyQt5 import QtCore, QtWidgets
import logging
from Framework.MiniBoardIOCore import read_drive_motor_power, read_battery_voltage, read_arm_motors, read_gps_position, read_gps_track
import time
class ReadUpdater(QtCore.QThread):
send_miniboard_control_packet = QtCore.pyqtSignal(list)
def __init__(self, main_window):
super(ReadUpdater, self).__init__()
# ########## Reference to top level window ##########
self.main_window = main_window # type: QtWidgets.QMainWindow
# ########## Get the settings instance ##########
self.settings = QtCore.QSettings()
# ########## Get the Pick And Plate instance of the logger ##########
self.logger = logging.getLogger("RoverBaseStation")
# ########## Some Flags ##########
self.run_thread_flag = True
# ########## Class variables ##########
self.data_last_seen = time.time()
def run(self):
self.logger.debug("Read Updater Thread Starting...")
while self.run_thread_flag:
read_drive_motor_power(self.send_miniboard_control_packet)
#read_battery_voltage(self.send_miniboard_control_packet)
# read_arm_motors(self.send_miniboard_control_packet)
#read_gps_position(self.send_miniboard_control_packet)
#read_gps_track(self.send_miniboard_control_packet)
self.msleep(3000)
self.logger.debug("Read Updater Thread Stopping...")
def connect_signals_to_slots__slot(self):
self.send_miniboard_control_packet.connect(self.main_window.miniboard_class.append)
self.main_window.kill_threads_signal.connect(self.on_kill_threads__slot)
def on_kill_threads__slot(self):
self.run_thread_flag = False

View File

@@ -0,0 +1,56 @@
"""
This file contains the Settings sub-class as part of the Framework Class
This class handles initialization of system settings and handling defaults when no settings are found
"""
#####################################
# Imports
#####################################
# Python native imports
from PyQt5 import QtCore, QtWidgets
import os
# Custom imports
#####################################
# Global Variables
#####################################
#####################################
# PickAndPlateLogger Definition
#####################################
class Settings(QtCore.QObject):
def __init__(self, main_window):
super(Settings, self).__init__()
# ########## Reference to highest level window ##########
self.main_window = main_window # type: QtWidgets.QMainWindow
# ########## Set up settings for program ##########
self.__setup_settings()
# ########## Create Instance of settings ##########
self.settings = QtCore.QSettings()
# ########## Create Instance of settings ##########
self.__set_hardcoded_settings()
@staticmethod
def __setup_settings():
# noinspection PyCallByClass,PyTypeChecker,PyArgumentList
QtCore.QCoreApplication.setOrganizationName("OSURC")
# noinspection PyCallByClass,PyTypeChecker,PyArgumentList
QtCore.QCoreApplication.setOrganizationDomain("http://osurobotics.club")
# noinspection PyCallByClass,PyTypeChecker,PyArgumentList
QtCore.QCoreApplication.setApplicationName("RoverBaseStation")
def __set_hardcoded_settings(self):
# Set the temporary directory used to store files while processing them
try:
app_data_dir = os.environ["HOME"]
except:
app_data_dir = os.environ["APPDATA"]
folder_name = "basestation"
full_path = app_data_dir + "/" + folder_name + "/settings"
self.settings.setValue("appdata_directory", full_path)

View File

@@ -0,0 +1,158 @@
"""
This file contains the live logs page sub-class
"""
#####################################
# Imports
#####################################
# Python native imports
from PyQt5 import QtCore, QtWidgets, QtGui
import logging
from inputs import devices, GamePad
import time
#####################################
# Global Variables
#####################################
GAME_CONTROLLER_NAME = "Afterglow Gamepad for Xbox 360"
CONTROLLER_DATA_UPDATE_FREQUENCY = 50 # Times per second
#####################################
# Controller Class Definition
#####################################
class XBOXController(QtCore.QThread):
# ########## Signals ##########
controller_connection_aquired = QtCore.pyqtSignal(bool)
controller_update_ready_signal = QtCore.pyqtSignal()
def __init__(self, main_window):
super(XBOXController, self).__init__()
# ########## Reference to top level window ##########
self.main_window = main_window # type: QtWidgets.QMainWindow
# ########## Get the settings instance ##########
self.settings = QtCore.QSettings()
# ########## Get the Pick And Plate instance of the logger ##########
self.logger = logging.getLogger("RoverBaseStation")
# ########## Thread Flags ##########
self.run_thread_flag = True
self.setup_controller_flag = True
self.data_acquisition_and_broadcast_flag = True
self.controller_aquired = False
# ########## Class Variables ##########
self.gamepad = None # type: GamePad
self.controller_states = {
"left_stick_x_axis": 0,
"left_stick_y_axis": 0,
"left_stick_center_pressed": 0,
"right_stick_x_axis": 0,
"right_stick_y_axis": 0,
"right_stick_center_pressed": 0,
"left_trigger_z_axis": 0,
"left_bumper_pressed": 0,
"right_trigger_z_axis": 0,
"right_bumper_pressed": 0,
"dpad_x": 0,
"dpad_y": 0,
"select_pressed": 0,
"start_pressed": 0,
"home_pressed": 0,
"a_pressed": 0,
"b_pressed": 0,
"x_pressed": 0,
"y_pressed": 0
}
self.raw_mapping_to_class_mapping = {
"ABS_X": "left_stick_x_axis",
"ABS_Y": "left_stick_y_axis",
"BTN_THUMBL": "left_stick_center_pressed",
"ABS_RX": "right_stick_x_axis",
"ABS_RY": "right_stick_y_axis",
"BTN_THUMBR": "right_stick_center_pressed",
"ABS_Z": "left_trigger_z_axis",
"BTN_TL": "left_bumper_pressed",
"ABS_RZ": "right_trigger_z_axis",
"BTN_TR": "right_bumper_pressed",
"ABS_HAT0X": "dpad_x",
"ABS_HAT0Y": "dpad_y",
"BTN_SELECT": "select_pressed",
"BTN_START": "start_pressed",
"BTN_MODE": "home_pressed",
"BTN_SOUTH": "a_pressed",
"BTN_EAST": "b_pressed",
"BTN_NORTH": "x_pressed",
"BTN_WEST": "y_pressed"
}
self.last_time = time.time()
def run(self):
self.logger.debug("Xbox Thread Starting...")
while self.run_thread_flag:
if self.setup_controller_flag:
self.controller_aquired = self.__setup_controller()
self.setup_controller_flag = False
if self.data_acquisition_and_broadcast_flag:
self.__get_controller_data()
self.__broadcast_if_ready()
self.logger.debug("Xbox Thread Stopping...")
# noinspection PyUnresolvedReferences
def connect_signals_to_slots__slot(self):
self.main_window.kill_threads_signal.connect(self.on_kill_threads__slot)
def __setup_controller(self):
for device in devices.gamepads:
if device.name == GAME_CONTROLLER_NAME:
self.gamepad = device
self.controller_connection_aquired.emit(True)
return True
self.logger.info("XBOX Controller Failed to Connect")
self.controller_connection_aquired.emit(False)
return False
def __get_controller_data(self):
if (self.controller_aquired):
events = self.gamepad.read()
for event in events:
if event.code in self.raw_mapping_to_class_mapping:
self.controller_states[self.raw_mapping_to_class_mapping[event.code]] = event.state
# self.logger.debug("XBOX: " + str(self.controller_states))
def __broadcast_if_ready(self):
current_time = time.time()
if (current_time - self.last_time) > (1/CONTROLLER_DATA_UPDATE_FREQUENCY):
self.controller_update_ready_signal.emit()
self.last_time = current_time
def on_kill_threads__slot(self):
self.terminate() # DON'T normally do this!!!!!
self.run_thread_flag = False

View File

@@ -0,0 +1,224 @@
"""
This file contains the data view page sub-class
"""
#####################################
# Imports
#####################################
# Python native imports
from PyQt5 import QtCore, QtWidgets, QtGui
import logging
from Framework.MiniBoardIOCore import write_drive_motor_power, read_drive_motor_power, write_pause, write_arm_motors, write_soil_measure, write_soil_sensor_recv, read_soil_measurements, read_gps_position, read_gps_track
from datetime import datetime
from time import sleep
#####################################
# Global Variables
#####################################
JOYSTICK_AXIS_MIN = -144
JOYSTICK_AXIS_MAX = 144
#####################################
# About Class Definition
#####################################
class DataView(QtCore.QObject):
send_miniboard_control_packet = QtCore.pyqtSignal(list)
get_data_from_probe_signal = QtCore.pyqtSignal(int)
def __init__(self, main_window):
super(DataView, self).__init__()
# ########## Reference to top level window ##########
self.main_window = main_window # type: QtWidgets.QMainWindow
self.miniboard_class = main_window.miniboard_class
# ########## Get the settings instance ##########
self.settings = QtCore.QSettings()
# ########## Get the Pick And Plate instance of the logger ##########
self.logger = logging.getLogger("RoverBaseStation")
# ########## References to GUI Elements ##########
self.left_y_lb = self.main_window.left_y_lb
self.right_y_lb = self.main_window.right_y_lb
self.pause_mode = self.main_window.pause_mode
self.drive_mode = self.main_window.drive_mode
self.left_motor_power = self.main_window.left_motor
self.right_motor_power = self.main_window.right_motor
self.battery_voltage = self.main_window.battery_voltage
self.base_power = self.main_window.base
self.bicep_power = self.main_window.bicep
self.forearm_power = self.main_window.forearm
self.pitch_power = self.main_window.pitch
self.wrist_power = self.main_window.wrist
self.temp_label = self.main_window.temp_label
self.moisture_label = self.main_window.moisture_label
self.salinity_label = self.main_window.salinity_label
self.get_data_button = self.main_window.science_data_button
self.get_gps_button = self.main_window.gps_data_button
self.last_contact_label = self.main_window.last_contact_label
self.lat_label = self.main_window.gps_lat_label
self.lon_label = self.main_window.gps_lon_label
self.alt_label = self.main_window.gps_altitude_label
self.gps_heading_label = self.main_window.gps_heading_label
self.gps_speed_label = self.main_window.gps_speed_label
# self.time_label = self.main_window.time_label
# self.voltage_label = self.main_window.battery_voltage_label
# ########## Class Variables ##########
self.controller_states = None
self.sensor_read_timer = QtCore.QTimer()
self.sensor_read_timer.setSingleShot(True)
self.gps_valid = None
self.lat = None
self.lon = None
self.alt = None
self.gps_track_valid = None
self.gps_speed = None
self.gps_heading = None
# ########## Make signal/slot connections ##########
self.__connect_signals_to_slots()
# ########## Set flags ##########
self.xbox_connected = False
self.frysky_connected = False
def __connect_signals_to_slots(self):
# These are incorrect as they don't show the scaled versions
# self.main_window.frsky_controller_class.controller_update_ready_signal.connect(self.on_controller_update_ready__slot)
# self.main_window.frsky_controller_class.controller_connection_aquired.connect(self.frysky_connected__slot)
# self.main_window.xbox_controller_class.controller_connection_aquired.connect(self.xbox_connected__slot)
# the data signals pass dictionaries to our slots after read requested
self.main_window.miniboard_class.data_drive_motor_power.connect(self.__update_drive_power)
self.main_window.miniboard_class.data_battery_voltage.connect(self.__update_battery_voltage)
self.main_window.miniboard_class.data_arm_motors.connect(self.__update_arm_power)
self.get_data_button.clicked.connect(self.on_get_science_data_button_pressed__slot)
self.get_data_from_probe_signal.connect(self.sensor_read_timer.start)
self.sensor_read_timer.timeout.connect(self.on_science_measure_response_received__slot)
self.main_window.miniboard_class.data_soil_measurements.connect(self.on_science_data_received__slot)
self.main_window.miniboard_class.data_gps_position.connect(self.on_gps_coordinates_updated__slot)
self.main_window.miniboard_class.data_gps_track.connect(self.on_gps_track_updated__slot)
self.send_miniboard_control_packet.connect(self.main_window.miniboard_class.append)
self.main_window.miniboard_class.message_received_signal.connect(self.on_message_received__slot)
self.get_gps_button.clicked.connect(self.on_get_gps_button_pressed__slot)
def __update_drive_percentages(self):
if self.frysky_connected:
left_percentage = round((self.controller_states["left_stick_y_axis"] / JOYSTICK_AXIS_MAX * 100), 2)
right_percentage = round((self.controller_states["right_stick_y_axis"] / JOYSTICK_AXIS_MAX * 100), 2)
self.left_y_lb.setText(str(left_percentage) + "%")
self.right_y_lb.setText(str(right_percentage) + "%")
def __update_modes(self):
pause_mode = self.controller_states["sf_state"]
drive_mode = self.controller_states["se_state"]
self.pause_mode.setText(str(pause_mode))
self.drive_mode.setText(str(drive_mode))
def __update_drive_power(self, power_dict):
self.left_motor_power.setText("F:%3d M:%3d B:%3d" % (
power_dict["l_f_drive"],
power_dict["l_m_drive"],
power_dict["l_b_drive"],
))
self.right_motor_power.setText("F:%3d M:%3d B:%3d" % (
power_dict["r_f_drive"],
power_dict["r_m_drive"],
power_dict["r_b_drive"],
))
def __update_battery_voltage(self, battery_dict):
battery_visually_accurate = int(battery_dict["battery_voltage"]) / 1000
self.battery_voltage.setText("%0.2f V" % battery_visually_accurate)
def __update_arm_power(self, power_dict):
self.base_power.setText("%d" % power_dict["arm_motor_1"])
self.bicep_power.setText("%d" % power_dict["arm_motor_2"])
self.forearm_power.setText("%d" % power_dict["arm_motor_3"])
self.pitch_power.setText("%d" % power_dict["arm_motor_4"])
self.wrist_power.setText("%d" % power_dict["arm_motor_5"])
def on_update_other_gui_elements__slot(self, voltage, time):
# self.time_label.setText(time)
# self.voltage_label.setText(voltage)
pass
def on_controller_update_ready__slot(self, controller_states):
self.controller_states = controller_states
self.__update_drive_percentages()
self.__update_modes()
def xbox_connected__slot(self, connected):
self.xbox_connected = connected
def frysky_connected__slot(self, connected):
self.frysky_connected = connected
def on_get_science_data_button_pressed__slot(self):
self.temp_label.setText("Reading From Sensor")
self.moisture_label.setText("Reading From Sensor")
self.salinity_label.setText("Reading From Sensor")
write_soil_measure(self.send_miniboard_control_packet, 1) # Request data
self.get_data_from_probe_signal.emit(5000)
def on_science_measure_response_received__slot(self):
read_soil_measurements(self.send_miniboard_control_packet)
def on_science_data_received__slot(self, sdict):
self.temp_label.setText(str(sdict["temperature"]/1000) + " °C")
self.moisture_label.setText(str(sdict["moisture"]/10) + " %")
self.salinity_label.setText(str(sdict["salinity"]/1000) + " g/L")
# self.logger.debug(sdict)
def on_gps_coordinates_updated__slot(self, sdict):
self.gps_valid = sdict["gps_pos_valid"]
self.lat = "{0:.6f}".format((sdict["latitude"] * 10 ** -5) / 60)
self.lon = "{0:.6f}".format((sdict["longitude"] * 10 ** -5) / 60)
self.alt = "{0:.6f}".format((sdict["altitude"] / 10))
if self.gps_valid:
self.lat_label.setText(str(self.lat))
self.lon_label.setText(str(self.lon))
self.alt_label.setText(str(self.alt))
else:
self.lat_label.setText("No GPS Lock")
self.lon_label.setText("No GPS Lock")
self.alt_label.setText("No GPS Lock")
def on_gps_track_updated__slot(self, sdict):
self.gps_track_valid = sdict["gps_track_valid"]
self.gps_speed = sdict["gps_speed"]
self.gps_heading = sdict["gps_heading"]
if self.gps_track_valid:
self.gps_speed_label.setText(str(self.gps_speed))
self.gps_heading_label(str(self.gps_heading))
else:
self.gps_speed_label.setText("No Movement")
self.gps_heading_label.setText("No Movement")
def on_get_gps_button_pressed__slot(self):
read_gps_position(self.send_miniboard_control_packet)
read_gps_track(self.send_miniboard_control_packet)
def on_message_received__slot(self):
self.last_contact_label.setText(datetime.now().strftime("%Y-%m-%d %I:%M:%S %p")
)

View File

@@ -0,0 +1,46 @@
"""
This file contains the interface core sub-class
This instantiates all the high level sub-classes for the interface
"""
#####################################
# Imports
#####################################
# Python native imports
from PyQt5 import QtCore, QtWidgets, QtWebEngine
# Custom imports
from Interface.LiveLogs.LiveLogsCore import LiveLogs
from PyQt5.QtQuick import QQuickView, QQuickItem
from PyQt5.QtCore import QUrl, QObject
from Interface.Map.MapCore import Map
from Interface.DataView.DataViewCore import DataView
#####################################
# Interface Class Definition
#####################################
class Interface(QtCore.QObject):
def __init__(self, main_window):
super(Interface, self).__init__()
# ########## Reference to top level window ##########
self.main_window = main_window # type: QtWidgets.QMainWindow
QtWebEngine.QtWebEngine.initialize()
# ########## Instantiations of sub-classes ##########
self.live_logs_class = LiveLogs(self.main_window)
# self.maps_class = Map(self.main_window)
self.data_view_class = DataView(self.main_window)
# ########## References to GUI Elements ##########
self.tab_widget = self.main_window.tab_widget # type: QtWidgets.QTabWidget
# ########## Set default interface parameters ##########
# Always open to first tab on launch
self.tab_widget.setCurrentIndex(0)
def qml_clicked__slot(self):
print("UI CLICKED")

View File

@@ -0,0 +1,95 @@
"""
This file contains the live logs page sub-class
"""
#####################################
# Imports
#####################################
# Python native imports
from PyQt5 import QtCore, QtWidgets, QtGui
import logging
#####################################
# Live Logs Class Definition
#####################################
class LiveLogs(QtCore.QThread):
text_ready_signal = QtCore.pyqtSignal(str)
send_packet = QtCore.pyqtSignal(list)
def __init__(self, main_window):
super(LiveLogs, self).__init__()
# ########## Reference to top level window ##########
self.main_window = main_window # type: QtWidgets.QMainWindow
# ########## Get the settings instance ##########
self.settings = QtCore.QSettings()
# ########## Get the Pick And Plate instance of the logger ##########
self.logger = logging.getLogger("RoverBaseStation")
# ########## Thread Flags ##########
self.run_thread_flag = True
self.open_log_file_flag = True
self.show_log_file_flag = True
# ########## References to GUI Elements ##########
self.live_log_tb = self.main_window.live_log_text_browser # type: QtWidgets.QTextBrowser
# ########## Class Variables ##########
self.log_file_path = None
self.log_file_reader = None
self.log_file_prev_mtime = 0
def run(self):
self.logger.debug("Live Logs Thread Starting...")
while self.run_thread_flag:
if self.open_log_file_flag:
self.__open_log_file()
self.open_log_file_flag = False
elif self.show_log_file_flag:
self.__show_updated_log_file()
self.msleep(250)
self.logger.debug("Live Logs Thread Stopping...")
# noinspection PyUnresolvedReferences
def connect_signals_to_slots__slot(self):
self.text_ready_signal.connect(self.live_log_tb.setText)
self.live_log_tb.textChanged.connect(self.__on_move_cursor_needed__slot)
self.main_window.kill_threads_signal.connect(self.on_kill_threads__slot)
def __open_log_file(self):
# Get the log file path
appdata_base_directory = self.settings.value("appdata_directory", type=str)
log_directory = appdata_base_directory + "/logs"
self.log_file_path = log_directory + "/log.txt"
# Open the class' reader for the file
self.log_file_reader = open(self.log_file_path, 'r')
def __show_updated_log_file(self):
log_browser_string = ""
# Seek back to the beginning of the file and read in the lines
self.log_file_reader.seek(0)
log_lines = self.log_file_reader.readlines()
# Go through line by line and only add lines that are selected to be shown via the checkboxes
for line in log_lines:
log_browser_string += line
# Display the text
self.text_ready_signal.emit(log_browser_string)
def __on_move_cursor_needed__slot(self):
# Move the cursor to the end when the text browser text updates. This essentially scrolls constantly.
self.live_log_tb.moveCursor(QtGui.QTextCursor.End)
def on_kill_threads__slot(self):
self.run_thread_flag = False

View File

@@ -0,0 +1,47 @@
"""
This file contains the map page sub-class
"""
#####################################
# Imports
#####################################
# Python native imports
from PyQt5 import QtCore, QtWidgets, QtGui, QtWebEngine
from PyQt5.QtCore import QUrl
import logging
#####################################
# About Class Definition
#####################################
class Map(QtCore.QObject):
def __init__(self, main_window):
super(Map, self).__init__()
# ########## Reference to top level window ##########
self.main_window = main_window # type: QtWidgets.QMainWindow
# ########## Get the settings instance ##########
self.settings = QtCore.QSettings()
# ########## Get the Pick And Plate instance of the logger ##########
self.logger = logging.getLogger("RoverBaseStation")
# ########## References to GUI Elements ##########
self.maps_view = self.main_window.map_label # type: QtWidgets.QLabel
# ########## Class Variables ##########
# self.map_view_pixmap = QtGui.QPixmap("Resources/Maps/mars_testing_site.png")
#
# self.maps_view.setPixmap(self.map_view_pixmap)
QtWebEngine.QtWebEngine.initialize()
self.map_view = self.main_window.leaflet_map # type: QtWidgets.QQuickView
self.map_view.setSource(QUrl("Resources/UI/map_view.qml"))
# ########## Set defaults on GUI Elements ##########
self.__load_settings()
def __load_settings(self):
self.logger.info("Map Interface Configured")

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

View File

@@ -0,0 +1,202 @@
'''
GooMPy: Google Maps for Python
Copyright (C) 2015 Alec Singer and Simon D. Levy
This code is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This code is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this code. If not, see <http://www.gnu.org/licenses/>.
'''
import math
import PIL.Image
from io import StringIO, BytesIO
import urllib.request
import os
import time
_KEY = '&key=AIzaSyAFjAWMwQauf2Dy5KteOuT8KexfMjOCIS8' # This is Corwin's temp API key
_EARTHPIX = 268435456 # Number of pixels in half the earth's circumference at zoom = 21
_DEGREE_PRECISION = 4 # Number of decimal places for rounding coordinates
_TILESIZE = 640 # Larget tile we can grab without paying
_GRABRATE = 4 # Fastest rate at which we can download tiles without paying
_pixrad = _EARTHPIX / math.pi
def _new_image(width, height):
return PIL.Image.new('RGB', (width, height))
def _roundto(value, digits):
return int(value * 10 ** digits) / 10. ** digits
def _pixels_to_degrees(pixels, zoom):
return pixels * 2 ** (21 - zoom)
def _grab_tile(lat, lon, zoom, maptype, _TILESIZE, sleeptime):
urlbase = 'https://maps.googleapis.com/maps/api/staticmap?center=%f,%f&zoom=%d&maptype=%s&size=%dx%d&format=jpg'
urlbase += _KEY
specs = lat, lon, zoom, maptype, _TILESIZE, _TILESIZE
filename = 'Resources/Maps/' + ('%f_%f_%d_%s_%d_%d' % specs) + '.jpg'
tile = None
if os.path.isfile(filename):
tile = PIL.Image.open(filename)
else:
url = urlbase % specs
result = urllib.request.urlopen(url).read()
tile = PIL.Image.open(BytesIO(result))
if not os.path.exists('Resources/Maps'):
os.mkdir('Resources/Maps')
tile.save(filename)
time.sleep(sleeptime) # Choke back speed to avoid maxing out limit
return tile
def _pix_to_lon(j, lonpix, ntiles, _TILESIZE, zoom):
return math.degrees((lonpix + _pixels_to_degrees(((j) - ntiles / 2) * _TILESIZE, zoom) - _EARTHPIX) / _pixrad)
def _pix_to_lat(k, latpix, ntiles, _TILESIZE, zoom):
return math.degrees(math.pi / 2 - 2 * math.atan(
math.exp(((latpix + _pixels_to_degrees((k - ntiles / 2) * _TILESIZE, zoom)) - _EARTHPIX) / _pixrad)))
def fetchTiles(latitude, longitude, zoom, maptype, radius_meters=None, default_ntiles=4):
'''
Fetches tiles from GoogleMaps at the specified coordinates, zoom level (0-22), and map type ('roadmap',
'terrain', 'satellite', or 'hybrid'). The value of radius_meters deteremines the number of tiles that will be
fetched; if it is unspecified, the number defaults to default_ntiles. Tiles are stored as JPEG images
in the mapscache folder.
'''
latitude = _roundto(latitude, _DEGREE_PRECISION)
longitude = _roundto(longitude, _DEGREE_PRECISION)
# https://groups.google.com/forum/#!topic/google-maps-js-api-v3/hDRO4oHVSeM
pixels_per_meter = 2 ** zoom / (156543.03392 * math.cos(math.radians(latitude)))
# number of tiles required to go from center latitude to desired radius in meters
ntiles = default_ntiles if radius_meters is None else int(
round(2 * pixels_per_meter / (_TILESIZE / 2. / radius_meters)))
lonpix = _EARTHPIX + longitude * math.radians(_pixrad)
sinlat = math.sin(math.radians(latitude))
latpix = _EARTHPIX - _pixrad * math.log((1 + sinlat) / (1 - sinlat)) / 2
bigsize = ntiles * _TILESIZE
bigimage = _new_image(bigsize, bigsize)
for j in range(ntiles):
lon = _pix_to_lon(j, lonpix, ntiles, _TILESIZE, zoom)
for k in range(ntiles):
lat = _pix_to_lat(k, latpix, ntiles, _TILESIZE, zoom)
tile = _grab_tile(lat, lon, zoom, maptype, _TILESIZE, 1. / _GRABRATE)
bigimage.paste(tile, (j * _TILESIZE, k * _TILESIZE))
west = _pix_to_lon(0, lonpix, ntiles, _TILESIZE, zoom)
east = _pix_to_lon(ntiles - 1, lonpix, ntiles, _TILESIZE, zoom)
north = _pix_to_lat(0, latpix, ntiles, _TILESIZE, zoom)
south = _pix_to_lat(ntiles - 1, latpix, ntiles, _TILESIZE, zoom)
return bigimage, (north, west), (south, east)
class GooMPy(object):
def __init__(self, width, height, latitude, longitude, zoom, maptype, radius_meters=None, default_ntiles=4):
'''
Creates a GooMPy object for specified display widthan and height at the specified coordinates,
zoom level (0-22), and map type ('roadmap', 'terrain', 'satellite', or 'hybrid').
The value of radius_meters deteremines the number of tiles that will be used to create
the map image; if it is unspecified, the number defaults to default_ntiles.
'''
self.lat = latitude
self.lon = longitude
self.width = width
self.height = height
self.zoom = zoom
self.maptype = maptype
self.radius_meters = radius_meters
self.winimage = _new_image(self.width, self.height)
self._fetch()
halfsize = self.bigimage.size[0] // 2
self.leftx = halfsize
self.uppery = halfsize
self._update()
def getImage(self):
'''
Returns the current image as a PIL.Image object.
'''
return self.winimage
def move(self, dx, dy):
'''
Moves the view by the specified pixels dx, dy.
'''
self.leftx = self._constrain(self.leftx, dx, self.width)
self.uppery = self._constrain(self.uppery, dy, self.height)
self._update()
def useMaptype(self, maptype):
'''
Uses the specified map type 'roadmap', 'terrain', 'satellite', or 'hybrid'.
Map tiles are fetched as needed.
'''
self.maptype = maptype
self._fetch_and_update()
def useZoom(self, zoom):
'''
Uses the specified zoom level 0 through 22.
Map tiles are fetched as needed.
'''
self.zoom = zoom
self._fetch_and_update()
def _fetch_and_update(self):
self._fetch()
self._update()
def _fetch(self):
self.bigimage, self.northwest, self.southeast = fetchTiles(self.lat, self.lon, self.zoom, self.maptype,
self.radius_meters)
def _update(self):
self.winimage.paste(self.bigimage, (-self.leftx, -self.uppery))
def _constrain(self, oldval, diff, dimsize):
newval = oldval + diff
return newval if newval > 0 and newval < self.bigimage.size[0] - dimsize else oldval

Some files were not shown because too many files have changed in this diff Show More