Files

619 lines
25 KiB
Python

#!/usr/bin/env python3
"""This packet generates C++ files for handling packets on the
side of the base station."""
import re
from collections import OrderedDict
class BasePackets(object):
"""
Autogenerates C++ code to encode and decode packets sent between the base
station and the miniboard.
This class takes in template C++ source and header files and expands them
to include Qt signals emitted when a packet of a specific type is parsed
successfully, as well as slots that allow for sending packets to the
miniboard. The input files should contain comments with an "@" followed by a
command or attributes.
Atttributes are variables defined in the header files that will be used in
constructing autogenerated functions. They are specified with the
attribute name followed by a colon and the name of the variable in the
source file. For example, writing "@datastream: m_datastream" will specify
that the QDataStream class to read and write data from is called
m_datastream. Currently, available attributes are "datastream", designating
the QDataStream to use, the "types_enum", the enum that stores the codes
for the packet types, and the "crc", the name of the CRC function.
Additionally, functions denoted by an "@" in the source provide the location
for expanding the file during the autogeneration process. These functions
match the names in this class. Available functions are:
"packet_types_header", defining where to enumerate the packet codes,
"recieved_signals_header", header functions for the recieved signals,
"write_slots_header", header functions for slots that send packets with
for writing,
"read_slots_header", header functions for slots that read data from the
miniboard,
"parse_packets", the source function that parses a packet, it takes in
a parameter specifying the size of the packet, such as
"@parse_packets(size)",
"write_slots_source", write the source code for write slots, and
"read_slots_source", source code for read slots.
"""
def __init__(self, specification=None, files={}):
"""
Creates an instance of the BasePackets class.
Parameters
----------
specification : string
The path to the packet specification file.
files : dictionary
Dictionary of the input and output files. Input files contain
special commands that the parser will recognize. The autogenerated
files will be written to the output files. The format for the
dictionary is {"input_file": "output_file"}.
"""
# TODO: normalized naming to template file (low priority)
# files {"input_file": "output_file"}
self._specification = specification
self.files = files
# packets generated by the extract_table function
self._packets = None
# parameters extracted from header files, such as "datastream", etc.
self._params = {"start_byte": 0x01}
self._tabsize = 4
# requires removing all whitespace
# TODO: make not use directly
# regular expressions for various parsing and extraction
self._param_extractor = re.compile(r"(?<=:)[a-zA-Z0-9_]+")
# TODO: try (?<=^class) (low priority)
self._class_extractor = re.compile(r"(?<=class )[a-zA-Z]+")
self._function_arg_extractor = re.compile(r"\(.+\)")
self._arg_size_extractor = re.compile(r"(\d|\*)+$")
def packets():
doc = """A list of packets and their properties. The format is
specified in the extrac_table function."""
def fget(self):
return self._packets
return locals()
packets = property(**packets())
def specification():
doc = """The path to the packet specification file."""
def fget(self):
return self._specification
def fset(self, value):
self._specification = value
return locals()
specification = property(**specification())
def files():
doc = """Dictionary of files used in the parsing and autogeneration
process. Files are stored in an ordered dictionary with source
(.cpp) files at the end."""
def fget(self):
return self._files
def fset(self, value):
self._files = OrderedDict(sorted(value.items(),
key=lambda x: x[0].endswith(".cpp")))
return locals()
files = property(**files())
def write_files(self):
"""Parses the input files and outputs the autogenerated files. This is
the function to call after the necessary files have been specified."""
for t, s in self._files.items():
self._write_file(str(t), str(s))
def _write_file(self, template, output):
"""Parses an individual file. Extracts packets from the specification if
not done so already, then looks for matching functions in the template
and calls the respective function.
Parameters
----------
template : string
The path to the template used for autogenerating the output.
output : string
The path to the output file.
"""
if self._specification is None:
raise ValueError("The specification file has not been specified.")
if self._packets is None:
self.extract_table()
with open(template) as f:
file_template = f.read().splitlines()
with open(output, "w+") as f:
for line in file_template:
f.write(line + "\n")
if "class" in line:
if "enum" in line:
continue
self._params.update({"class":
re.search(self._class_extractor, line).group(0)})
if "@" not in line:
continue
elif "@datastream" in line:
self._params.update({"datastream":
self._extract_param(line)})
elif "@types_enum" in line:
self._params.update({"types_enum":
self._extract_param(line)})
elif "@crc" in line:
self._params.update({"crc":
self._extract_param(line)})
elif "@packet_types_header" in line:
f.write(self.packet_types_header(line=line))
elif "@recieved_signals_header" in line:
f.write(self.packet_signals_header(line=line))
elif "@write_slots_header" in line:
f.write(self.write_slots_header(line=line))
elif "@read_slots_header" in line:
f.write(self.read_slots_header(line=line))
elif "@parse_packets" in line:
f.write(self.parse_packets(line=line))
elif "@write_slots_source" in line:
f.write(self.write_slots_source(line=line))
elif "@read_slots_source" in line:
f.write(self.read_slots_source(line=line))
def extract_table(self):
"""Extract the command table from the text of the specification file.
Creates a list of dictionaries with the following members:
"name" : str, name of command
"rw" : str, contains r is command can be read, w if command
can be written, - if neither
"code" : int, command code (0-127)
"argument" : list, list of 0 or more tuples, each with the
following structure:
tuple(format code:str, command format code, u8, i16,
etc, or * to indicate a variable number of bytes.
name:str))
"default" : list of default values of arguments, as strings
"notes" : str, notes on command."""
with open(self._specification, "r") as f:
file_str = f.read()
s = file_str.find("\n| Name | RW | Command Code")
s += 1 # Skip over first newline
e = file_str[s:].find("\n\n")
if e < s:
table_str = file_str[s:]
else:
table_str = file_str[s:s + e]
self._packets = []
for l in table_str.split("\n"):
if len(l) == 0: continue
if l[0] == "|":
l = l[1:]
col = [c.strip() for c in l.split("|")]
if col[0][0:3] == "---": continue
if col[0] == "Name": continue
name = col[0]
rw = col[1].lower()
code = int(col[2], 0)
notes = col[5]
argument = []
for a in [c.strip() for c in col[3].split(",")]:
a = a.split(" ")
if len(a) == 2:
argument.append((a[0], a[1]))
default = []
if len(argument) > 0 and col[4].strip() != "-":
for d in [c.strip() for c in col[4].split(",")]:
default.append(d)
if len(default) != len(argument) and len(default) != 0:
raise RuntimeError(
"Default list length mismatch on command %s" % name
)
self._packets.append({
"name": name,
"rw": rw,
"code": code,
"argument": argument,
"default": default,
"notes": notes
})
# removing redundant packet arguments
for packet in self._packets:
for i, arg in enumerate(packet["argument"]):
if "length" in arg[1]:
packet["argument"].pop(i)
def packet_types_header(self, line=None):
"""
Generates header function for the packet types.
Parameters
----------
line : string, optional
The string line that contains the command. Used for controlling
the amount of trailing whitespace.
"""
string = ""
ws = self._whitespace(line)
for packet in self._packets:
uppercase_name = packet["name"].upper().replace(" ", "_")
string += ws + ("%s = %i,\n" % (uppercase_name, packet["code"]))
return string
def packet_signals_header(self, line=None):
"""
Generates header function for the packet signals.
Parameters
----------
line : string, optional
The string line that contains the command. Used for controlling
the amount of trailing whitespace.
"""
string = ""
ws = self._whitespace(line)
for packet in self._packets:
if "r" in packet["rw"]:
string += (ws + "void " + self._signal_name(packet) + "(" +
self._argument_proto(packet) + ");\n")
return string
def write_slots_header(self, line=None):
string = ""
ws = self._whitespace(line)
for packet in self._packets:
if "w" in packet["rw"]:
string += (ws + "void write" + self._camelcase(packet["name"]) +
"(" + self._argument_proto(packet) + ");\n")
return string
def read_slots_header(self, line=None):
string = ""
ws = self._whitespace(line)
for packet in self._packets:
if "r" in packet["rw"]:
string += (ws + "void read" + self._camelcase(packet["name"]) +
"();\n")
return string
def parse_packets(self, line=None):
# note that parse_packets assumes that the datastream head is at the
# beginning of the crc
ws = self._whitespace(line)
string = ""
# bitshift in the crc, and the packet type
string += ws + "quint16 _read_crc;\n"
string += ws + ("%s >> _read_crc;\n" % self._params["datastream"])
string += ws + "quint8 _packetType;\n"
string += ws + ("%s >> _packetType;\n" % self._params["datastream"])
# index to specify whether to use "if" or "else if"
i = 0
for packet in self._packets:
uppercase_name = packet["name"].upper().replace(" ", "_")
if "r" not in packet["rw"]:
continue
if i is 0:
i += 1
conditional = "if"
else:
conditional = "else if"
string += (ws + "%s(_packetType == (static_cast<quint8>"
"(%s::%s) | 0x80)) {\n" % (
conditional,
self._params["types_enum"],
uppercase_name)
)
packet_size = self._packet_size(packet)
# TODO: what if the packet size is variable? This requires adding
# in a size argument to the parse_packets function, creating a data
# type of that size, and bitshifting in the data.
# Note: this will probably require using readRawBytes or readBytes
# function.
if packet_size is not None:
for arg in packet["argument"]:
# for every argument in the packet, create a new variable
# and bitshift it in from the stream
arg_type = self._expand_argument(arg[0])
string += ws + "\t%s %s;\n" % (arg_type, arg[1])
string += ws + "\t%s >> %s;\n" % (
self._params["datastream"], arg[1]
)
string += ws + '\tqDebug() << "argument: " << "%s" << "; value: " << %s;\n' % (
arg[1], arg[1]
)
string += self._crc_calculation(packet, ws=ws, write=True)
# TODO: checking against crc (high priority)
# then emit
string += ws + "\tif(_crc == _read_crc) {\n"
string += ws + '\t\tqDebug() << "CRC check successful.";\n'
string += ws + "\t\temit " + self._signal_name(packet) + "("
string += "".join(map(lambda x: x[1] + ", ",
packet["argument"])).strip(" ,")
string += ");\n"
string += ws + "\t}\n"
else:
string += ws + "\tchar _data[size];\n" # TODO: remove hard coding
string += ws + "\t%s.readRawData(_data, size);\n" % (
self._params["datastream"],
)
string += ws + "\tQByteArray _byte_array = QByteArray(_data);\n"
string += ws + "\tquint16 _crc = 0xFFFF;\n"
string += ws + "\t_crc = %s(&_packetType, sizeof(quint8), _crc);\n" % (
self._params["crc"],
)
string += ws + "\t_crc = %s(&_byte_array, size, _crc);\n" % (
self._params["crc"]
)
string += ws + '\tqDebug() << "argument: %s; value: "' % (
packet["argument"][-1][1]
)
string += " << _byte_array;\n"
string += ws + "\tif(_crc == _read_crc) {\n"
string += ws + "\t\temit %s(_byte_array);\n" % (
self._signal_name(packet)
)
string += ws + "\t}\n"
# TODO
string += ws + "}\n"
for packet in self._packets:
uppercase_name = packet["name"].upper().replace(" ", "_")
if "w" not in packet["rw"]:
continue
string += (ws + "else if(_packetType == static_cast<quint8>"
"(%s::%s)) {\n") % (
self._params["types_enum"],
uppercase_name
)
string += ws + "\tquint16 _crc = 0xFFFF;\n"
string += ws + "\t_crc = %s(&_packetType, sizeof(quint8), _crc);\n" % (
self._params["crc"],
)
string += ws + "\tif(_crc == _read_crc) {\n"
string += ws + '\t\tqDebug() << "Confirmed packet reception.";\n'
string += ws + "\t}\n"
# TODO: don't do anything yet, but to be added
# TODO: like a qDebug statement
string += ws + "}\n"
return string.expandtabs(self._tabsize)
def write_slots_source(self, line=None):
string = ""
for packet in self._packets:
if "w" not in packet["rw"]:
continue
string += ("void %s::write%s(%s)\n{\n" % (
self._params["class"],
self._camelcase(packet["name"]),
self._argument_proto(packet),
))
string += '\tqDebug() << "Beginning packet write.";\n'
string += "\tquint8 _packetType = static_cast<quint8>(%s::%s);\n" % (
self._params["types_enum"],
self._uppercase_name(packet),
)
string += self._crc_calculation(packet)
string += "\t%s << (quint8)%s;\n" % (
self._params["datastream"],
self._params["start_byte"],
)
if self._packet_size(packet) is not None:
string += "\t%s << (quint8)%s;\n" % (
self._params["datastream"],
self._packet_size(packet)
)
else:
string += "\t%s << static_cast<quint8>(%s.size() + 3);\n" % (
self._params["datastream"],
packet["argument"][-1][1],
)
string += "\t%s << _crc;\n" % self._params["datastream"]
string += "\t%s << _packetType;\n" % (
self._params["datastream"],
)
if self._packet_size(packet) is not None:
for arg in packet["argument"]:
string += '\tqDebug("writing: %s; value: %i",' + '"%s", %s);\n' % (
arg[1], arg[1]
)
string += "\t%s << %s;\n" % (
self._params["datastream"], arg[1]
)
string += '\tqDebug() << "argument:" << "%s" << "; value:" << %s;\n' % (
arg[1], arg[1]
)
else:
string += "\t%s.writeRawData(%s.constData(), %s.size());\n" % (
self._params["datastream"],
packet["argument"][-1][1],
packet["argument"][-1][1],
)
string += "}\n\n"
return string.expandtabs(self._tabsize)
def _crc_calculation(self, packet, ws="", write=True, packet_type="_packetType"):
string = ""
string += ws + "\tquint16 _crc = 0xFFFF;\n"
string += ws + "\t_crc = %s(&%s, sizeof(quint8), _crc);\n" % (
self._params["crc"],
packet_type,
)
if not write:
return string
if self._packet_size(packet) is not None:
for arg in packet["argument"]:
string += ws + "\t_crc = %s(&%s, sizeof(%s), _crc);\n" % (
self._params["crc"],
arg[1],
arg[1], # TODO: fix this, unworking for variable size packets
)
else:
string += ws + "\t_crc = %s(&%s, %s.size(), _crc);\n" % (
self._params["crc"],
packet["argument"][-1][1],
packet["argument"][-1][1],
)
return string
def read_slots_source(self, line=None):
string = ""
for packet in self._packets:
if "r" not in packet["rw"]:
continue
string += "void %s::read%s()\n{\n" % (
self._params["class"],
self._camelcase(packet["name"]),
)
string += '\tqDebug() << "Beginning packet write.";\n'
string += ("\tquint8 _packetType = static_cast<quint8>(%s::%s) "
"| 0x80;\n") % (
self._params["types_enum"],
self._uppercase_name(packet),
)
string += self._crc_calculation(packet, write=False)
string += "\t%s << (quint8)%s;\n" % (
self._params["datastream"],
self._params["start_byte"],
)
string += "\t%s << (quint8)3;\n" % self._params["datastream"]
string += "\t%s << _crc;\n" % self._params["datastream"]
string += "\t%s << _packetType;\n" % (
self._params["datastream"],
)
string += "}\n\n"
return string.expandtabs(self._tabsize)
def _parse_function_args(self, str):
arguments = re.search(self._function_arg_extractor, str).group(0)
if arguments is None:
return None
arguments = arguments.strip("() ")
arguments = list(map(lambda x: x.strip(" ,"), arguments))
return arguments
def _signal_name(self, packet):
return (self._camelcase(packet["name"], capitalize_first_word=False) +
"Received")
def _expand_argument(self, arg):
if arg == "*":
arg_type = "QByteArray"
elif "u" in arg:
arg_type = "q" + arg.replace("u", "uint")
elif "i" in arg:
arg_type = "q" + arg.replace("i", "int")
else:
raise ValueError("Argument type %s id not recognized" % arg)
return arg_type
def _argument_proto(self, packet):
argument_str = ""
for arg in packet["argument"]:
arg_type = self._expand_argument(arg[0])
argument_str += arg_type + " " + arg[1] + ", "
argument_str = argument_str.strip(" ,")
return argument_str
def _camelcase(self, string, capitalize_first_word=True):
"""Takes a string of words and returns a camelcase version.
Parameters
----------
string : string
This is a string of space-separated words to be used in constructing
a camelcase string.
capitalize_first_word : bool, optional
Specifies whether to capitalize the first word.
Returns
-------
words : string
A camelcase version of the input string. For example, "this is a
string" would return "ThisIsAString" or "thisIsAString".
"""
words = string.split(" ")
words = map(lambda x: x.capitalize(), words)
words = "".join(words)
if not capitalize_first_word:
words = words[0].lower() + words[1:]
return words
def _whitespace(self, reference):
if reference is None:
return ""
num_spaces = (len(reference) - len(reference.lstrip(" ")))
return " " * num_spaces
def _packet_size(self, packet):
# in bytes
# add two bytes for CRC, and one byte for packet type
size = 3
for arg in packet["argument"]:
arg_size = re.search(self._arg_size_extractor, arg[0]).group(0)
if arg_size is None:
raise ValueError("Cannot determine size of packet.")
elif arg_size == "*":
return None
arg_size = int(arg_size)
if (arg_size % 8 is not 0):
raise ValueError("Argument sizes must be multiples of 8.")
size += (arg_size // 8)
return size
def _uppercase_name(self, packet):
return packet["name"].upper().replace(" ", "_")
def _extract_param(self, string):
s = string.replace(" ", "")
return re.search(self._param_extractor, s).group(0)
if __name__ == "__main__":
import argparse
import os.path
parser = argparse.ArgumentParser()
parser.add_argument("--src", type=str, default=".")
parser.add_argument("--dest", type=str, default=".")
parser.add_argument("--name", type=str, default="packets")
parser.add_argument("--spec", type=str, default="SPECIFICATION.md")
args = parser.parse_args()
header_template = os.path.join(args.src, args.name) + ".h"
header_output = os.path.join(args.dest, args.name) + ".h"
source_template = os.path.join(args.src, args.name) + ".cpp"
source_output = os.path.join(args.dest, args.name) + ".cpp"
specification = args.spec
b = BasePackets(
specification=specification,
files = {header_template: header_output, source_template: source_output}
)
b.write_files()