mirror of
https://github.com/caperren/school_archives.git
synced 2025-11-09 13:41:13 +00:00
619 lines
25 KiB
Python
619 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
|
|
|
|
"""This packet generates C++ files for handling packets on the
|
|
side of the base station."""
|
|
|
|
import re
|
|
from collections import OrderedDict
|
|
|
|
class BasePackets(object):
|
|
"""
|
|
Autogenerates C++ code to encode and decode packets sent between the base
|
|
station and the miniboard.
|
|
|
|
This class takes in template C++ source and header files and expands them
|
|
to include Qt signals emitted when a packet of a specific type is parsed
|
|
successfully, as well as slots that allow for sending packets to the
|
|
miniboard. The input files should contain comments with an "@" followed by a
|
|
command or attributes.
|
|
|
|
Atttributes are variables defined in the header files that will be used in
|
|
constructing autogenerated functions. They are specified with the
|
|
attribute name followed by a colon and the name of the variable in the
|
|
source file. For example, writing "@datastream: m_datastream" will specify
|
|
that the QDataStream class to read and write data from is called
|
|
m_datastream. Currently, available attributes are "datastream", designating
|
|
the QDataStream to use, the "types_enum", the enum that stores the codes
|
|
for the packet types, and the "crc", the name of the CRC function.
|
|
|
|
Additionally, functions denoted by an "@" in the source provide the location
|
|
for expanding the file during the autogeneration process. These functions
|
|
match the names in this class. Available functions are:
|
|
"packet_types_header", defining where to enumerate the packet codes,
|
|
"recieved_signals_header", header functions for the recieved signals,
|
|
"write_slots_header", header functions for slots that send packets with
|
|
for writing,
|
|
"read_slots_header", header functions for slots that read data from the
|
|
miniboard,
|
|
"parse_packets", the source function that parses a packet, it takes in
|
|
a parameter specifying the size of the packet, such as
|
|
"@parse_packets(size)",
|
|
"write_slots_source", write the source code for write slots, and
|
|
"read_slots_source", source code for read slots.
|
|
"""
|
|
def __init__(self, specification=None, files={}):
|
|
"""
|
|
Creates an instance of the BasePackets class.
|
|
|
|
Parameters
|
|
----------
|
|
specification : string
|
|
The path to the packet specification file.
|
|
files : dictionary
|
|
Dictionary of the input and output files. Input files contain
|
|
special commands that the parser will recognize. The autogenerated
|
|
files will be written to the output files. The format for the
|
|
dictionary is {"input_file": "output_file"}.
|
|
"""
|
|
# TODO: normalized naming to template file (low priority)
|
|
# files {"input_file": "output_file"}
|
|
self._specification = specification
|
|
self.files = files
|
|
|
|
# packets generated by the extract_table function
|
|
self._packets = None
|
|
# parameters extracted from header files, such as "datastream", etc.
|
|
self._params = {"start_byte": 0x01}
|
|
self._tabsize = 4
|
|
# requires removing all whitespace
|
|
# TODO: make not use directly
|
|
# regular expressions for various parsing and extraction
|
|
self._param_extractor = re.compile(r"(?<=:)[a-zA-Z0-9_]+")
|
|
# TODO: try (?<=^class) (low priority)
|
|
self._class_extractor = re.compile(r"(?<=class )[a-zA-Z]+")
|
|
self._function_arg_extractor = re.compile(r"\(.+\)")
|
|
self._arg_size_extractor = re.compile(r"(\d|\*)+$")
|
|
|
|
def packets():
|
|
doc = """A list of packets and their properties. The format is
|
|
specified in the extrac_table function."""
|
|
def fget(self):
|
|
return self._packets
|
|
return locals()
|
|
packets = property(**packets())
|
|
|
|
def specification():
|
|
doc = """The path to the packet specification file."""
|
|
def fget(self):
|
|
return self._specification
|
|
def fset(self, value):
|
|
self._specification = value
|
|
return locals()
|
|
specification = property(**specification())
|
|
|
|
def files():
|
|
doc = """Dictionary of files used in the parsing and autogeneration
|
|
process. Files are stored in an ordered dictionary with source
|
|
(.cpp) files at the end."""
|
|
def fget(self):
|
|
return self._files
|
|
def fset(self, value):
|
|
self._files = OrderedDict(sorted(value.items(),
|
|
key=lambda x: x[0].endswith(".cpp")))
|
|
return locals()
|
|
files = property(**files())
|
|
|
|
def write_files(self):
|
|
"""Parses the input files and outputs the autogenerated files. This is
|
|
the function to call after the necessary files have been specified."""
|
|
for t, s in self._files.items():
|
|
self._write_file(str(t), str(s))
|
|
|
|
def _write_file(self, template, output):
|
|
"""Parses an individual file. Extracts packets from the specification if
|
|
not done so already, then looks for matching functions in the template
|
|
and calls the respective function.
|
|
|
|
Parameters
|
|
----------
|
|
template : string
|
|
The path to the template used for autogenerating the output.
|
|
output : string
|
|
The path to the output file.
|
|
"""
|
|
if self._specification is None:
|
|
raise ValueError("The specification file has not been specified.")
|
|
if self._packets is None:
|
|
self.extract_table()
|
|
|
|
with open(template) as f:
|
|
file_template = f.read().splitlines()
|
|
|
|
with open(output, "w+") as f:
|
|
for line in file_template:
|
|
f.write(line + "\n")
|
|
if "class" in line:
|
|
if "enum" in line:
|
|
continue
|
|
self._params.update({"class":
|
|
re.search(self._class_extractor, line).group(0)})
|
|
if "@" not in line:
|
|
continue
|
|
elif "@datastream" in line:
|
|
self._params.update({"datastream":
|
|
self._extract_param(line)})
|
|
elif "@types_enum" in line:
|
|
self._params.update({"types_enum":
|
|
self._extract_param(line)})
|
|
elif "@crc" in line:
|
|
self._params.update({"crc":
|
|
self._extract_param(line)})
|
|
elif "@packet_types_header" in line:
|
|
f.write(self.packet_types_header(line=line))
|
|
elif "@recieved_signals_header" in line:
|
|
f.write(self.packet_signals_header(line=line))
|
|
elif "@write_slots_header" in line:
|
|
f.write(self.write_slots_header(line=line))
|
|
elif "@read_slots_header" in line:
|
|
f.write(self.read_slots_header(line=line))
|
|
elif "@parse_packets" in line:
|
|
f.write(self.parse_packets(line=line))
|
|
elif "@write_slots_source" in line:
|
|
f.write(self.write_slots_source(line=line))
|
|
elif "@read_slots_source" in line:
|
|
f.write(self.read_slots_source(line=line))
|
|
|
|
def extract_table(self):
|
|
"""Extract the command table from the text of the specification file.
|
|
Creates a list of dictionaries with the following members:
|
|
"name" : str, name of command
|
|
"rw" : str, contains r is command can be read, w if command
|
|
can be written, - if neither
|
|
"code" : int, command code (0-127)
|
|
"argument" : list, list of 0 or more tuples, each with the
|
|
following structure:
|
|
tuple(format code:str, command format code, u8, i16,
|
|
etc, or * to indicate a variable number of bytes.
|
|
name:str))
|
|
"default" : list of default values of arguments, as strings
|
|
"notes" : str, notes on command."""
|
|
with open(self._specification, "r") as f:
|
|
file_str = f.read()
|
|
|
|
s = file_str.find("\n| Name | RW | Command Code")
|
|
s += 1 # Skip over first newline
|
|
e = file_str[s:].find("\n\n")
|
|
if e < s:
|
|
table_str = file_str[s:]
|
|
else:
|
|
table_str = file_str[s:s + e]
|
|
self._packets = []
|
|
for l in table_str.split("\n"):
|
|
if len(l) == 0: continue
|
|
if l[0] == "|":
|
|
l = l[1:]
|
|
col = [c.strip() for c in l.split("|")]
|
|
if col[0][0:3] == "---": continue
|
|
if col[0] == "Name": continue
|
|
name = col[0]
|
|
rw = col[1].lower()
|
|
code = int(col[2], 0)
|
|
notes = col[5]
|
|
argument = []
|
|
for a in [c.strip() for c in col[3].split(",")]:
|
|
a = a.split(" ")
|
|
if len(a) == 2:
|
|
argument.append((a[0], a[1]))
|
|
default = []
|
|
if len(argument) > 0 and col[4].strip() != "-":
|
|
for d in [c.strip() for c in col[4].split(",")]:
|
|
default.append(d)
|
|
if len(default) != len(argument) and len(default) != 0:
|
|
raise RuntimeError(
|
|
"Default list length mismatch on command %s" % name
|
|
)
|
|
self._packets.append({
|
|
"name": name,
|
|
"rw": rw,
|
|
"code": code,
|
|
"argument": argument,
|
|
"default": default,
|
|
"notes": notes
|
|
})
|
|
# removing redundant packet arguments
|
|
for packet in self._packets:
|
|
for i, arg in enumerate(packet["argument"]):
|
|
if "length" in arg[1]:
|
|
packet["argument"].pop(i)
|
|
|
|
|
|
def packet_types_header(self, line=None):
|
|
"""
|
|
Generates header function for the packet types.
|
|
|
|
Parameters
|
|
----------
|
|
line : string, optional
|
|
The string line that contains the command. Used for controlling
|
|
the amount of trailing whitespace.
|
|
"""
|
|
string = ""
|
|
ws = self._whitespace(line)
|
|
for packet in self._packets:
|
|
uppercase_name = packet["name"].upper().replace(" ", "_")
|
|
string += ws + ("%s = %i,\n" % (uppercase_name, packet["code"]))
|
|
return string
|
|
|
|
def packet_signals_header(self, line=None):
|
|
"""
|
|
Generates header function for the packet signals.
|
|
|
|
Parameters
|
|
----------
|
|
line : string, optional
|
|
The string line that contains the command. Used for controlling
|
|
the amount of trailing whitespace.
|
|
"""
|
|
string = ""
|
|
ws = self._whitespace(line)
|
|
for packet in self._packets:
|
|
if "r" in packet["rw"]:
|
|
|
|
string += (ws + "void " + self._signal_name(packet) + "(" +
|
|
self._argument_proto(packet) + ");\n")
|
|
return string
|
|
|
|
def write_slots_header(self, line=None):
|
|
string = ""
|
|
ws = self._whitespace(line)
|
|
for packet in self._packets:
|
|
if "w" in packet["rw"]:
|
|
string += (ws + "void write" + self._camelcase(packet["name"]) +
|
|
"(" + self._argument_proto(packet) + ");\n")
|
|
return string
|
|
|
|
def read_slots_header(self, line=None):
|
|
string = ""
|
|
ws = self._whitespace(line)
|
|
for packet in self._packets:
|
|
if "r" in packet["rw"]:
|
|
string += (ws + "void read" + self._camelcase(packet["name"]) +
|
|
"();\n")
|
|
return string
|
|
|
|
def parse_packets(self, line=None):
|
|
# note that parse_packets assumes that the datastream head is at the
|
|
# beginning of the crc
|
|
ws = self._whitespace(line)
|
|
string = ""
|
|
# bitshift in the crc, and the packet type
|
|
string += ws + "quint16 _read_crc;\n"
|
|
string += ws + ("%s >> _read_crc;\n" % self._params["datastream"])
|
|
string += ws + "quint8 _packetType;\n"
|
|
string += ws + ("%s >> _packetType;\n" % self._params["datastream"])
|
|
|
|
# index to specify whether to use "if" or "else if"
|
|
i = 0
|
|
for packet in self._packets:
|
|
uppercase_name = packet["name"].upper().replace(" ", "_")
|
|
if "r" not in packet["rw"]:
|
|
continue
|
|
|
|
if i is 0:
|
|
i += 1
|
|
conditional = "if"
|
|
else:
|
|
conditional = "else if"
|
|
|
|
string += (ws + "%s(_packetType == (static_cast<quint8>"
|
|
"(%s::%s) | 0x80)) {\n" % (
|
|
conditional,
|
|
self._params["types_enum"],
|
|
uppercase_name)
|
|
)
|
|
packet_size = self._packet_size(packet)
|
|
# TODO: what if the packet size is variable? This requires adding
|
|
# in a size argument to the parse_packets function, creating a data
|
|
# type of that size, and bitshifting in the data.
|
|
# Note: this will probably require using readRawBytes or readBytes
|
|
# function.
|
|
if packet_size is not None:
|
|
for arg in packet["argument"]:
|
|
# for every argument in the packet, create a new variable
|
|
# and bitshift it in from the stream
|
|
arg_type = self._expand_argument(arg[0])
|
|
string += ws + "\t%s %s;\n" % (arg_type, arg[1])
|
|
string += ws + "\t%s >> %s;\n" % (
|
|
self._params["datastream"], arg[1]
|
|
)
|
|
string += ws + '\tqDebug() << "argument: " << "%s" << "; value: " << %s;\n' % (
|
|
arg[1], arg[1]
|
|
)
|
|
|
|
string += self._crc_calculation(packet, ws=ws, write=True)
|
|
|
|
# TODO: checking against crc (high priority)
|
|
# then emit
|
|
string += ws + "\tif(_crc == _read_crc) {\n"
|
|
string += ws + '\t\tqDebug() << "CRC check successful.";\n'
|
|
string += ws + "\t\temit " + self._signal_name(packet) + "("
|
|
string += "".join(map(lambda x: x[1] + ", ",
|
|
packet["argument"])).strip(" ,")
|
|
string += ");\n"
|
|
string += ws + "\t}\n"
|
|
|
|
else:
|
|
string += ws + "\tchar _data[size];\n" # TODO: remove hard coding
|
|
string += ws + "\t%s.readRawData(_data, size);\n" % (
|
|
self._params["datastream"],
|
|
)
|
|
string += ws + "\tQByteArray _byte_array = QByteArray(_data);\n"
|
|
|
|
|
|
string += ws + "\tquint16 _crc = 0xFFFF;\n"
|
|
|
|
string += ws + "\t_crc = %s(&_packetType, sizeof(quint8), _crc);\n" % (
|
|
self._params["crc"],
|
|
)
|
|
string += ws + "\t_crc = %s(&_byte_array, size, _crc);\n" % (
|
|
self._params["crc"]
|
|
)
|
|
string += ws + '\tqDebug() << "argument: %s; value: "' % (
|
|
packet["argument"][-1][1]
|
|
)
|
|
string += " << _byte_array;\n"
|
|
string += ws + "\tif(_crc == _read_crc) {\n"
|
|
string += ws + "\t\temit %s(_byte_array);\n" % (
|
|
self._signal_name(packet)
|
|
)
|
|
string += ws + "\t}\n"
|
|
# TODO
|
|
string += ws + "}\n"
|
|
|
|
for packet in self._packets:
|
|
uppercase_name = packet["name"].upper().replace(" ", "_")
|
|
if "w" not in packet["rw"]:
|
|
continue
|
|
string += (ws + "else if(_packetType == static_cast<quint8>"
|
|
"(%s::%s)) {\n") % (
|
|
self._params["types_enum"],
|
|
uppercase_name
|
|
)
|
|
string += ws + "\tquint16 _crc = 0xFFFF;\n"
|
|
string += ws + "\t_crc = %s(&_packetType, sizeof(quint8), _crc);\n" % (
|
|
self._params["crc"],
|
|
)
|
|
string += ws + "\tif(_crc == _read_crc) {\n"
|
|
string += ws + '\t\tqDebug() << "Confirmed packet reception.";\n'
|
|
string += ws + "\t}\n"
|
|
# TODO: don't do anything yet, but to be added
|
|
# TODO: like a qDebug statement
|
|
string += ws + "}\n"
|
|
|
|
|
|
return string.expandtabs(self._tabsize)
|
|
|
|
|
|
def write_slots_source(self, line=None):
|
|
string = ""
|
|
for packet in self._packets:
|
|
if "w" not in packet["rw"]:
|
|
continue
|
|
string += ("void %s::write%s(%s)\n{\n" % (
|
|
self._params["class"],
|
|
self._camelcase(packet["name"]),
|
|
self._argument_proto(packet),
|
|
))
|
|
string += '\tqDebug() << "Beginning packet write.";\n'
|
|
string += "\tquint8 _packetType = static_cast<quint8>(%s::%s);\n" % (
|
|
self._params["types_enum"],
|
|
self._uppercase_name(packet),
|
|
)
|
|
|
|
string += self._crc_calculation(packet)
|
|
|
|
string += "\t%s << (quint8)%s;\n" % (
|
|
self._params["datastream"],
|
|
self._params["start_byte"],
|
|
)
|
|
if self._packet_size(packet) is not None:
|
|
string += "\t%s << (quint8)%s;\n" % (
|
|
self._params["datastream"],
|
|
self._packet_size(packet)
|
|
)
|
|
else:
|
|
string += "\t%s << static_cast<quint8>(%s.size() + 3);\n" % (
|
|
self._params["datastream"],
|
|
packet["argument"][-1][1],
|
|
)
|
|
string += "\t%s << _crc;\n" % self._params["datastream"]
|
|
string += "\t%s << _packetType;\n" % (
|
|
self._params["datastream"],
|
|
)
|
|
if self._packet_size(packet) is not None:
|
|
for arg in packet["argument"]:
|
|
string += '\tqDebug("writing: %s; value: %i",' + '"%s", %s);\n' % (
|
|
arg[1], arg[1]
|
|
)
|
|
string += "\t%s << %s;\n" % (
|
|
self._params["datastream"], arg[1]
|
|
)
|
|
string += '\tqDebug() << "argument:" << "%s" << "; value:" << %s;\n' % (
|
|
arg[1], arg[1]
|
|
)
|
|
else:
|
|
string += "\t%s.writeRawData(%s.constData(), %s.size());\n" % (
|
|
self._params["datastream"],
|
|
packet["argument"][-1][1],
|
|
packet["argument"][-1][1],
|
|
)
|
|
|
|
string += "}\n\n"
|
|
return string.expandtabs(self._tabsize)
|
|
|
|
def _crc_calculation(self, packet, ws="", write=True, packet_type="_packetType"):
|
|
string = ""
|
|
string += ws + "\tquint16 _crc = 0xFFFF;\n"
|
|
string += ws + "\t_crc = %s(&%s, sizeof(quint8), _crc);\n" % (
|
|
self._params["crc"],
|
|
packet_type,
|
|
)
|
|
if not write:
|
|
return string
|
|
|
|
if self._packet_size(packet) is not None:
|
|
for arg in packet["argument"]:
|
|
string += ws + "\t_crc = %s(&%s, sizeof(%s), _crc);\n" % (
|
|
self._params["crc"],
|
|
arg[1],
|
|
arg[1], # TODO: fix this, unworking for variable size packets
|
|
)
|
|
else:
|
|
string += ws + "\t_crc = %s(&%s, %s.size(), _crc);\n" % (
|
|
self._params["crc"],
|
|
packet["argument"][-1][1],
|
|
packet["argument"][-1][1],
|
|
)
|
|
return string
|
|
|
|
def read_slots_source(self, line=None):
|
|
string = ""
|
|
for packet in self._packets:
|
|
if "r" not in packet["rw"]:
|
|
continue
|
|
string += "void %s::read%s()\n{\n" % (
|
|
self._params["class"],
|
|
self._camelcase(packet["name"]),
|
|
)
|
|
string += '\tqDebug() << "Beginning packet write.";\n'
|
|
string += ("\tquint8 _packetType = static_cast<quint8>(%s::%s) "
|
|
"| 0x80;\n") % (
|
|
self._params["types_enum"],
|
|
self._uppercase_name(packet),
|
|
)
|
|
|
|
string += self._crc_calculation(packet, write=False)
|
|
string += "\t%s << (quint8)%s;\n" % (
|
|
self._params["datastream"],
|
|
self._params["start_byte"],
|
|
)
|
|
|
|
string += "\t%s << (quint8)3;\n" % self._params["datastream"]
|
|
string += "\t%s << _crc;\n" % self._params["datastream"]
|
|
string += "\t%s << _packetType;\n" % (
|
|
self._params["datastream"],
|
|
)
|
|
|
|
string += "}\n\n"
|
|
return string.expandtabs(self._tabsize)
|
|
|
|
|
|
def _parse_function_args(self, str):
|
|
arguments = re.search(self._function_arg_extractor, str).group(0)
|
|
if arguments is None:
|
|
return None
|
|
arguments = arguments.strip("() ")
|
|
arguments = list(map(lambda x: x.strip(" ,"), arguments))
|
|
return arguments
|
|
|
|
def _signal_name(self, packet):
|
|
return (self._camelcase(packet["name"], capitalize_first_word=False) +
|
|
"Received")
|
|
|
|
def _expand_argument(self, arg):
|
|
if arg == "*":
|
|
arg_type = "QByteArray"
|
|
elif "u" in arg:
|
|
arg_type = "q" + arg.replace("u", "uint")
|
|
elif "i" in arg:
|
|
arg_type = "q" + arg.replace("i", "int")
|
|
else:
|
|
raise ValueError("Argument type %s id not recognized" % arg)
|
|
return arg_type
|
|
|
|
def _argument_proto(self, packet):
|
|
argument_str = ""
|
|
for arg in packet["argument"]:
|
|
arg_type = self._expand_argument(arg[0])
|
|
argument_str += arg_type + " " + arg[1] + ", "
|
|
argument_str = argument_str.strip(" ,")
|
|
return argument_str
|
|
|
|
def _camelcase(self, string, capitalize_first_word=True):
|
|
"""Takes a string of words and returns a camelcase version.
|
|
|
|
Parameters
|
|
----------
|
|
string : string
|
|
This is a string of space-separated words to be used in constructing
|
|
a camelcase string.
|
|
capitalize_first_word : bool, optional
|
|
Specifies whether to capitalize the first word.
|
|
|
|
Returns
|
|
-------
|
|
words : string
|
|
A camelcase version of the input string. For example, "this is a
|
|
string" would return "ThisIsAString" or "thisIsAString".
|
|
"""
|
|
words = string.split(" ")
|
|
words = map(lambda x: x.capitalize(), words)
|
|
words = "".join(words)
|
|
if not capitalize_first_word:
|
|
words = words[0].lower() + words[1:]
|
|
return words
|
|
|
|
def _whitespace(self, reference):
|
|
if reference is None:
|
|
return ""
|
|
num_spaces = (len(reference) - len(reference.lstrip(" ")))
|
|
return " " * num_spaces
|
|
|
|
def _packet_size(self, packet):
|
|
# in bytes
|
|
# add two bytes for CRC, and one byte for packet type
|
|
size = 3
|
|
for arg in packet["argument"]:
|
|
arg_size = re.search(self._arg_size_extractor, arg[0]).group(0)
|
|
if arg_size is None:
|
|
raise ValueError("Cannot determine size of packet.")
|
|
elif arg_size == "*":
|
|
return None
|
|
arg_size = int(arg_size)
|
|
if (arg_size % 8 is not 0):
|
|
|
|
raise ValueError("Argument sizes must be multiples of 8.")
|
|
size += (arg_size // 8)
|
|
|
|
return size
|
|
|
|
def _uppercase_name(self, packet):
|
|
return packet["name"].upper().replace(" ", "_")
|
|
|
|
def _extract_param(self, string):
|
|
s = string.replace(" ", "")
|
|
return re.search(self._param_extractor, s).group(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
import os.path
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--src", type=str, default=".")
|
|
parser.add_argument("--dest", type=str, default=".")
|
|
parser.add_argument("--name", type=str, default="packets")
|
|
parser.add_argument("--spec", type=str, default="SPECIFICATION.md")
|
|
args = parser.parse_args()
|
|
header_template = os.path.join(args.src, args.name) + ".h"
|
|
header_output = os.path.join(args.dest, args.name) + ".h"
|
|
source_template = os.path.join(args.src, args.name) + ".cpp"
|
|
source_output = os.path.join(args.dest, args.name) + ".cpp"
|
|
specification = args.spec
|
|
b = BasePackets(
|
|
specification=specification,
|
|
files = {header_template: header_output, source_template: source_output}
|
|
)
|
|
b.write_files()
|