#!/usr/bin/env python3 """This packet generates C++ files for handling packets on the side of the base station.""" import re from collections import OrderedDict class BasePackets(object): """ Autogenerates C++ code to encode and decode packets sent between the base station and the miniboard. This class takes in template C++ source and header files and expands them to include Qt signals emitted when a packet of a specific type is parsed successfully, as well as slots that allow for sending packets to the miniboard. The input files should contain comments with an "@" followed by a command or attributes. Atttributes are variables defined in the header files that will be used in constructing autogenerated functions. They are specified with the attribute name followed by a colon and the name of the variable in the source file. For example, writing "@datastream: m_datastream" will specify that the QDataStream class to read and write data from is called m_datastream. Currently, available attributes are "datastream", designating the QDataStream to use, the "types_enum", the enum that stores the codes for the packet types, and the "crc", the name of the CRC function. Additionally, functions denoted by an "@" in the source provide the location for expanding the file during the autogeneration process. These functions match the names in this class. Available functions are: "packet_types_header", defining where to enumerate the packet codes, "recieved_signals_header", header functions for the recieved signals, "write_slots_header", header functions for slots that send packets with for writing, "read_slots_header", header functions for slots that read data from the miniboard, "parse_packets", the source function that parses a packet, it takes in a parameter specifying the size of the packet, such as "@parse_packets(size)", "write_slots_source", write the source code for write slots, and "read_slots_source", source code for read slots. """ def __init__(self, specification=None, files={}): """ Creates an instance of the BasePackets class. Parameters ---------- specification : string The path to the packet specification file. files : dictionary Dictionary of the input and output files. Input files contain special commands that the parser will recognize. The autogenerated files will be written to the output files. The format for the dictionary is {"input_file": "output_file"}. """ # TODO: normalized naming to template file (low priority) # files {"input_file": "output_file"} self._specification = specification self.files = files # packets generated by the extract_table function self._packets = None # parameters extracted from header files, such as "datastream", etc. self._params = {"start_byte": 0x01} self._tabsize = 4 # requires removing all whitespace # TODO: make not use directly # regular expressions for various parsing and extraction self._param_extractor = re.compile(r"(?<=:)[a-zA-Z0-9_]+") # TODO: try (?<=^class) (low priority) self._class_extractor = re.compile(r"(?<=class )[a-zA-Z]+") self._function_arg_extractor = re.compile(r"\(.+\)") self._arg_size_extractor = re.compile(r"(\d|\*)+$") def packets(): doc = """A list of packets and their properties. The format is specified in the extrac_table function.""" def fget(self): return self._packets return locals() packets = property(**packets()) def specification(): doc = """The path to the packet specification file.""" def fget(self): return self._specification def fset(self, value): self._specification = value return locals() specification = property(**specification()) def files(): doc = """Dictionary of files used in the parsing and autogeneration process. Files are stored in an ordered dictionary with source (.cpp) files at the end.""" def fget(self): return self._files def fset(self, value): self._files = OrderedDict(sorted(value.items(), key=lambda x: x[0].endswith(".cpp"))) return locals() files = property(**files()) def write_files(self): """Parses the input files and outputs the autogenerated files. This is the function to call after the necessary files have been specified.""" for t, s in self._files.items(): self._write_file(str(t), str(s)) def _write_file(self, template, output): """Parses an individual file. Extracts packets from the specification if not done so already, then looks for matching functions in the template and calls the respective function. Parameters ---------- template : string The path to the template used for autogenerating the output. output : string The path to the output file. """ if self._specification is None: raise ValueError("The specification file has not been specified.") if self._packets is None: self.extract_table() with open(template) as f: file_template = f.read().splitlines() with open(output, "w+") as f: for line in file_template: f.write(line + "\n") if "class" in line: if "enum" in line: continue self._params.update({"class": re.search(self._class_extractor, line).group(0)}) if "@" not in line: continue elif "@datastream" in line: self._params.update({"datastream": self._extract_param(line)}) elif "@types_enum" in line: self._params.update({"types_enum": self._extract_param(line)}) elif "@crc" in line: self._params.update({"crc": self._extract_param(line)}) elif "@packet_types_header" in line: f.write(self.packet_types_header(line=line)) elif "@recieved_signals_header" in line: f.write(self.packet_signals_header(line=line)) elif "@write_slots_header" in line: f.write(self.write_slots_header(line=line)) elif "@read_slots_header" in line: f.write(self.read_slots_header(line=line)) elif "@parse_packets" in line: f.write(self.parse_packets(line=line)) elif "@write_slots_source" in line: f.write(self.write_slots_source(line=line)) elif "@read_slots_source" in line: f.write(self.read_slots_source(line=line)) def extract_table(self): """Extract the command table from the text of the specification file. Creates a list of dictionaries with the following members: "name" : str, name of command "rw" : str, contains r is command can be read, w if command can be written, - if neither "code" : int, command code (0-127) "argument" : list, list of 0 or more tuples, each with the following structure: tuple(format code:str, command format code, u8, i16, etc, or * to indicate a variable number of bytes. name:str)) "default" : list of default values of arguments, as strings "notes" : str, notes on command.""" with open(self._specification, "r") as f: file_str = f.read() s = file_str.find("\n| Name | RW | Command Code") s += 1 # Skip over first newline e = file_str[s:].find("\n\n") if e < s: table_str = file_str[s:] else: table_str = file_str[s:s + e] self._packets = [] for l in table_str.split("\n"): if len(l) == 0: continue if l[0] == "|": l = l[1:] col = [c.strip() for c in l.split("|")] if col[0][0:3] == "---": continue if col[0] == "Name": continue name = col[0] rw = col[1].lower() code = int(col[2], 0) notes = col[5] argument = [] for a in [c.strip() for c in col[3].split(",")]: a = a.split(" ") if len(a) == 2: argument.append((a[0], a[1])) default = [] if len(argument) > 0 and col[4].strip() != "-": for d in [c.strip() for c in col[4].split(",")]: default.append(d) if len(default) != len(argument) and len(default) != 0: raise RuntimeError( "Default list length mismatch on command %s" % name ) self._packets.append({ "name": name, "rw": rw, "code": code, "argument": argument, "default": default, "notes": notes }) # removing redundant packet arguments for packet in self._packets: for i, arg in enumerate(packet["argument"]): if "length" in arg[1]: packet["argument"].pop(i) def packet_types_header(self, line=None): """ Generates header function for the packet types. Parameters ---------- line : string, optional The string line that contains the command. Used for controlling the amount of trailing whitespace. """ string = "" ws = self._whitespace(line) for packet in self._packets: uppercase_name = packet["name"].upper().replace(" ", "_") string += ws + ("%s = %i,\n" % (uppercase_name, packet["code"])) return string def packet_signals_header(self, line=None): """ Generates header function for the packet signals. Parameters ---------- line : string, optional The string line that contains the command. Used for controlling the amount of trailing whitespace. """ string = "" ws = self._whitespace(line) for packet in self._packets: if "r" in packet["rw"]: string += (ws + "void " + self._signal_name(packet) + "(" + self._argument_proto(packet) + ");\n") return string def write_slots_header(self, line=None): string = "" ws = self._whitespace(line) for packet in self._packets: if "w" in packet["rw"]: string += (ws + "void write" + self._camelcase(packet["name"]) + "(" + self._argument_proto(packet) + ");\n") return string def read_slots_header(self, line=None): string = "" ws = self._whitespace(line) for packet in self._packets: if "r" in packet["rw"]: string += (ws + "void read" + self._camelcase(packet["name"]) + "();\n") return string def parse_packets(self, line=None): # note that parse_packets assumes that the datastream head is at the # beginning of the crc ws = self._whitespace(line) string = "" # bitshift in the crc, and the packet type string += ws + "quint16 _read_crc;\n" string += ws + ("%s >> _read_crc;\n" % self._params["datastream"]) string += ws + "quint8 _packetType;\n" string += ws + ("%s >> _packetType;\n" % self._params["datastream"]) # index to specify whether to use "if" or "else if" i = 0 for packet in self._packets: uppercase_name = packet["name"].upper().replace(" ", "_") if "r" not in packet["rw"]: continue if i is 0: i += 1 conditional = "if" else: conditional = "else if" string += (ws + "%s(_packetType == (static_cast" "(%s::%s) | 0x80)) {\n" % ( conditional, self._params["types_enum"], uppercase_name) ) packet_size = self._packet_size(packet) # TODO: what if the packet size is variable? This requires adding # in a size argument to the parse_packets function, creating a data # type of that size, and bitshifting in the data. # Note: this will probably require using readRawBytes or readBytes # function. if packet_size is not None: for arg in packet["argument"]: # for every argument in the packet, create a new variable # and bitshift it in from the stream arg_type = self._expand_argument(arg[0]) string += ws + "\t%s %s;\n" % (arg_type, arg[1]) string += ws + "\t%s >> %s;\n" % ( self._params["datastream"], arg[1] ) string += ws + '\tqDebug() << "argument: " << "%s" << "; value: " << %s;\n' % ( arg[1], arg[1] ) string += self._crc_calculation(packet, ws=ws, write=True) # TODO: checking against crc (high priority) # then emit string += ws + "\tif(_crc == _read_crc) {\n" string += ws + '\t\tqDebug() << "CRC check successful.";\n' string += ws + "\t\temit " + self._signal_name(packet) + "(" string += "".join(map(lambda x: x[1] + ", ", packet["argument"])).strip(" ,") string += ");\n" string += ws + "\t}\n" else: string += ws + "\tchar _data[size];\n" # TODO: remove hard coding string += ws + "\t%s.readRawData(_data, size);\n" % ( self._params["datastream"], ) string += ws + "\tQByteArray _byte_array = QByteArray(_data);\n" string += ws + "\tquint16 _crc = 0xFFFF;\n" string += ws + "\t_crc = %s(&_packetType, sizeof(quint8), _crc);\n" % ( self._params["crc"], ) string += ws + "\t_crc = %s(&_byte_array, size, _crc);\n" % ( self._params["crc"] ) string += ws + '\tqDebug() << "argument: %s; value: "' % ( packet["argument"][-1][1] ) string += " << _byte_array;\n" string += ws + "\tif(_crc == _read_crc) {\n" string += ws + "\t\temit %s(_byte_array);\n" % ( self._signal_name(packet) ) string += ws + "\t}\n" # TODO string += ws + "}\n" for packet in self._packets: uppercase_name = packet["name"].upper().replace(" ", "_") if "w" not in packet["rw"]: continue string += (ws + "else if(_packetType == static_cast" "(%s::%s)) {\n") % ( self._params["types_enum"], uppercase_name ) string += ws + "\tquint16 _crc = 0xFFFF;\n" string += ws + "\t_crc = %s(&_packetType, sizeof(quint8), _crc);\n" % ( self._params["crc"], ) string += ws + "\tif(_crc == _read_crc) {\n" string += ws + '\t\tqDebug() << "Confirmed packet reception.";\n' string += ws + "\t}\n" # TODO: don't do anything yet, but to be added # TODO: like a qDebug statement string += ws + "}\n" return string.expandtabs(self._tabsize) def write_slots_source(self, line=None): string = "" for packet in self._packets: if "w" not in packet["rw"]: continue string += ("void %s::write%s(%s)\n{\n" % ( self._params["class"], self._camelcase(packet["name"]), self._argument_proto(packet), )) string += '\tqDebug() << "Beginning packet write.";\n' string += "\tquint8 _packetType = static_cast(%s::%s);\n" % ( self._params["types_enum"], self._uppercase_name(packet), ) string += self._crc_calculation(packet) string += "\t%s << (quint8)%s;\n" % ( self._params["datastream"], self._params["start_byte"], ) if self._packet_size(packet) is not None: string += "\t%s << (quint8)%s;\n" % ( self._params["datastream"], self._packet_size(packet) ) else: string += "\t%s << static_cast(%s.size() + 3);\n" % ( self._params["datastream"], packet["argument"][-1][1], ) string += "\t%s << _crc;\n" % self._params["datastream"] string += "\t%s << _packetType;\n" % ( self._params["datastream"], ) if self._packet_size(packet) is not None: for arg in packet["argument"]: string += '\tqDebug("writing: %s; value: %i",' + '"%s", %s);\n' % ( arg[1], arg[1] ) string += "\t%s << %s;\n" % ( self._params["datastream"], arg[1] ) string += '\tqDebug() << "argument:" << "%s" << "; value:" << %s;\n' % ( arg[1], arg[1] ) else: string += "\t%s.writeRawData(%s.constData(), %s.size());\n" % ( self._params["datastream"], packet["argument"][-1][1], packet["argument"][-1][1], ) string += "}\n\n" return string.expandtabs(self._tabsize) def _crc_calculation(self, packet, ws="", write=True, packet_type="_packetType"): string = "" string += ws + "\tquint16 _crc = 0xFFFF;\n" string += ws + "\t_crc = %s(&%s, sizeof(quint8), _crc);\n" % ( self._params["crc"], packet_type, ) if not write: return string if self._packet_size(packet) is not None: for arg in packet["argument"]: string += ws + "\t_crc = %s(&%s, sizeof(%s), _crc);\n" % ( self._params["crc"], arg[1], arg[1], # TODO: fix this, unworking for variable size packets ) else: string += ws + "\t_crc = %s(&%s, %s.size(), _crc);\n" % ( self._params["crc"], packet["argument"][-1][1], packet["argument"][-1][1], ) return string def read_slots_source(self, line=None): string = "" for packet in self._packets: if "r" not in packet["rw"]: continue string += "void %s::read%s()\n{\n" % ( self._params["class"], self._camelcase(packet["name"]), ) string += '\tqDebug() << "Beginning packet write.";\n' string += ("\tquint8 _packetType = static_cast(%s::%s) " "| 0x80;\n") % ( self._params["types_enum"], self._uppercase_name(packet), ) string += self._crc_calculation(packet, write=False) string += "\t%s << (quint8)%s;\n" % ( self._params["datastream"], self._params["start_byte"], ) string += "\t%s << (quint8)3;\n" % self._params["datastream"] string += "\t%s << _crc;\n" % self._params["datastream"] string += "\t%s << _packetType;\n" % ( self._params["datastream"], ) string += "}\n\n" return string.expandtabs(self._tabsize) def _parse_function_args(self, str): arguments = re.search(self._function_arg_extractor, str).group(0) if arguments is None: return None arguments = arguments.strip("() ") arguments = list(map(lambda x: x.strip(" ,"), arguments)) return arguments def _signal_name(self, packet): return (self._camelcase(packet["name"], capitalize_first_word=False) + "Received") def _expand_argument(self, arg): if arg == "*": arg_type = "QByteArray" elif "u" in arg: arg_type = "q" + arg.replace("u", "uint") elif "i" in arg: arg_type = "q" + arg.replace("i", "int") else: raise ValueError("Argument type %s id not recognized" % arg) return arg_type def _argument_proto(self, packet): argument_str = "" for arg in packet["argument"]: arg_type = self._expand_argument(arg[0]) argument_str += arg_type + " " + arg[1] + ", " argument_str = argument_str.strip(" ,") return argument_str def _camelcase(self, string, capitalize_first_word=True): """Takes a string of words and returns a camelcase version. Parameters ---------- string : string This is a string of space-separated words to be used in constructing a camelcase string. capitalize_first_word : bool, optional Specifies whether to capitalize the first word. Returns ------- words : string A camelcase version of the input string. For example, "this is a string" would return "ThisIsAString" or "thisIsAString". """ words = string.split(" ") words = map(lambda x: x.capitalize(), words) words = "".join(words) if not capitalize_first_word: words = words[0].lower() + words[1:] return words def _whitespace(self, reference): if reference is None: return "" num_spaces = (len(reference) - len(reference.lstrip(" "))) return " " * num_spaces def _packet_size(self, packet): # in bytes # add two bytes for CRC, and one byte for packet type size = 3 for arg in packet["argument"]: arg_size = re.search(self._arg_size_extractor, arg[0]).group(0) if arg_size is None: raise ValueError("Cannot determine size of packet.") elif arg_size == "*": return None arg_size = int(arg_size) if (arg_size % 8 is not 0): raise ValueError("Argument sizes must be multiples of 8.") size += (arg_size // 8) return size def _uppercase_name(self, packet): return packet["name"].upper().replace(" ", "_") def _extract_param(self, string): s = string.replace(" ", "") return re.search(self._param_extractor, s).group(0) if __name__ == "__main__": import argparse import os.path parser = argparse.ArgumentParser() parser.add_argument("--src", type=str, default=".") parser.add_argument("--dest", type=str, default=".") parser.add_argument("--name", type=str, default="packets") parser.add_argument("--spec", type=str, default="SPECIFICATION.md") args = parser.parse_args() header_template = os.path.join(args.src, args.name) + ".h" header_output = os.path.join(args.dest, args.name) + ".h" source_template = os.path.join(args.src, args.name) + ".cpp" source_output = os.path.join(args.dest, args.name) + ".cpp" specification = args.spec b = BasePackets( specification=specification, files = {header_template: header_output, source_template: source_output} ) b.write_files()