fprime/Autocoders/Python/bin/tlmLayout.py
2019-03-04 16:38:38 -08:00

724 lines
22 KiB
Python

#!/usr/bin/python
#===============================================================================
# NAME: TlmLayout.py
#
# DESCRIPTION: Takes as input a telemetry layout in tab-delimited format and
# generates .hpp of the packet layout(s) and a table that can be
# used to form the packet(s)
#
# USAGE:
#
# AUTHOR: Len Day
# EMAIL: len.day@jpl.nasa.gov
# DATE CREATED : 09/28/15
#
# Copyright 2015, California Institute of Technology.
# ALL RIGHTS RESERVED. U.S. Government Sponsorship acknowledged.
#===============================================================================
#
# Python standard modules
#
import logging
import os
import sys
import time
import argparse
import csv
import datetime
import copy
from Cheetah.Template import Template
from tkinter.constants import NONE
#
# Python extention modules and custom interfaces
#
#
# Universal globals used within module go here.
# (DO NOT USE MANY!)
#
tlm_input_line_num = 0
tlm_max_packet_bytes = 0
tlm_packet_list = []
tlm_duration = None
tlm_period = None
tlm_packet_id_schedule_list = []
tlm_packet_max_ids_list = []
tlm_packet_to_chan_list = []
#===============================================================================
#
class HeaderItem:
def __init__(self):
self.m_type = ""
self.m_id = 0
self.m_bit_start = 0
self.m_bits = 0
self.m_comment = ""
class Item:
def __init__(self):
self.m_is_reserve = False
self.m_is_constant = False
self.m_name = ""
self.m_id = 0
self.m_data_type = ""
self.m_constant_value = 0
self.m_bit_start = 0
self.m_bits = 0
self.m_comment = ""
# The information for a telemetry packet is accumulated in a Packet class
# instance, then the packet_complete method is called when it is complete
class Packet:
def init(self):
self.m_name = ""
self.m_id = 0
self.m_id_comment = ""
self.m_bit_index = 0
self.m_bytes = 0
self.m_num_fixed_fields = 0
self.m_num_variable_fields = 0
self.m_max_field_bits = 0;
self.num_header_general_fields = 0
self.m_header_list = []
self.m_item_list = []
# Additional packet information needed
# for scheduling and multiplexing to an address
#
self.m_chan = None
#
self.m_freq = None
self.m_offset = None
def __init__(self):
self.init()
# errors encountered parsing the file use err_msg to output the
# error and exit.
def err_msg(self, msg):
global tlm_input_line_num
print(msg, "at input line ", tlm_input_line_num)
exit(1)
def duration(self, line):
global tlm_duration
tlm_duration = float(line[1])
def run_period(self, line):
global tlm_period
tlm_period = float(line[1])
def frequency(self, line):
"""
Frequency of packet is always in Hz. so no units given.
"""
self.m_freq = float(line[1])
def offset(self,line):
"""
Offset of packet is essentially it's phase within duration.
If no units detected just count cycle slots.
If "s" for seconds is detected then compute cycle slots here.
NOTE: One and only one offset per ID can currently be specified.
"""
if "s" in line[1].lower():
self.m_offset = int((float(line[1].split()[0].strip()) / (1.0/tlm_period))+0.5)
else:
self.m_offset = int(line[1])
def channel(self,line):
"""
Channel number. This is a channel number assigned to
the packet of type ID. It is typically used for mapping
of packet ID's types to 1553 sub-addresses.
"""
self.m_chan = int(line[1])
# Comment records are ignored
def comment(self, line):
pass # Ignored
# Process and save a packet record. This completed processing of the
# previous packet (if any)
def packet(self, line):
self.packet_complete()
self.init()
# Process and save a header record
def header(self, line):
if (len(line) < 2):
self.err_msg("Missing parameter for header")
he = HeaderItem();
he.m_bit_start = self.m_bit_index
kw = line[1].lower()
if (kw == "id"):
if (len(line) < 5):
self.err_msg("Missing parameter for header ID")
he.m_type = "HEADER_FIELD_ID";
s = line[4].strip();
if (not s.isdigit()):
self.err_msg("Illegal value for number of bits: '" + s + "'")
he.m_bits = int(s);
self.m_bit_index += he.m_bits
elif (kw == "time"):
he.m_type = "HEADER_FIELD_TIME"
he.m_bits = 48
self.m_bit_index += he.m_bits
elif(kw == "sequence"):
if (len(line) < 6):
self.err_msg("Missing parameter for header sequence")
he.m_type = "HEADER_FIELD_SEQUENCE";
s = line[4].strip();
if (not s.isdigit()):
self.err_msg("Illegal value for number of bits: '" + s + "'")
he.m_bits = int(s);
self.m_bit_index += he.m_bits
elif(kw == "field"):
if (len(line) < 4):
self.err_msg("Missing parameter for header field")
he.m_type = "HEADER_FIELD_FIELD";
s = line[2].strip();
if (not s.isdigit()):
self.err_msg("Illegal value for ID: '" + s + "'")
he.m_id = int(s);
s = line[4].strip();
if (not s.isdigit()):
self.err_msg("Illegal value for number of bits: '" + s + "'")
he.m_bits = int(s);
self.m_bit_index += he.m_bits
self.num_header_general_fields += 1
else:
self.err_msg("Illegal heyword for header: '" + line[1] + "'")
if (len(line) > 5):
he.m_comment = line[5]
self.m_header_list.append(he)
# Process and save the packet identifier record
def identifier(self, line):
if (len(line) < 3):
self.err_msg("Missing parameter(s) for identifier")
self.m_name = line[1].strip()
if (not len(self.m_name)):
self.err_msg("Name cannot be blank")
if (len(self.m_name.split()) > 1):
self.err_msg("Name must not contain spaces")
if (self.m_name.find("/") >= 0):
self.err_msg("Name must not contain '/'")
if (self.m_name.find("\\") >= 0):
self.err_msg("Name must not contain '\\'")
s = line[2].strip()
if (not s.isdigit()):
self.err_msg("Illegal value for identifier: '" + s + "'")
self.m_id = int(s)
if (self.m_id < 0):
self.err_msg("Negative value for identifier: '" + line[2] + "'")
if (len(line) > 5):
self.m_id_comment = line[5]
# Process and save an item record
def item(self, line):
self.m_num_variable_fields += 1
it = Item()
it.m_bit_start = self.m_bit_index
if (len(line) < 5):
self.err_msg("Missing parameter(s) for item")
it.m_name = line[1].strip()
if (not len(it.m_name)):
self.err_msg("Name cannot be blank")
if (len(it.m_name.split()) > 1):
self.err_msg("Name must not contain spaces")
s = line[2].strip()
if (not s.isdigit()):
self.err_msg("Illegal value for identifier: '" + s + "'")
it.m_id = int(s)
it.m_data_type = line[3].strip()
s = line[4].strip()
if (not s.isdigit()):
self.err_msg("Illegal value for number of bits: '" + s + "'")
it.m_bits = int(s)
self.m_bit_index += it.m_bits
if (len(line) > 5):
it.m_comment = line[5]
self.m_item_list.append(it)
# Process and save a reserve record
def reserve(self, line):
it = Item()
it.m_bit_start = self.m_bit_index
if (len(line) < 5):
self.err_msg("Missing parameter for reserve")
it.m_is_reserve = True
s = line[4].strip()
if (not s.isdigit()):
self.err_msg("Illegal value for number of bits: '" + s + "'")
it.m_bits = int(s)
if (it.m_bits < 1):
self.err_msg("Illegal value for number of bits: '" + s + "'")
self.m_bit_index += it.m_bits
if (len(line) > 5):
it.m_comment = line[5]
self.m_item_list.append(it)
# Process and save an align record
def align(self, line):
it = Item()
it.m_bit_start = self.m_bit_index
if (len(line) < 2):
self.err_msg("Missing parameter for align")
it.m_is_reserve = True
s = line[1].strip()
if (not s.isdigit()):
self.err_msg("Illegal value for number of bits: '" + s + "'")
bits = int(s)
if (bits != 8 and bits != 16 and bits != 32 and bits != 64):
self.err_msg("Illegal value for number of bits: '" + s + "'")
if (self.m_bit_index % bits):
bits = bits - (self.m_bit_index % bits)
else:
return # Already aligned
it.m_bits = bits
self.m_bit_index += it.m_bits
if (len(line) > 5):
it.m_comment = line[5]
self.m_item_list.append(it)
# Process and save a constant record
def constant(self, line):
self.m_num_fixed_fields += 1
it = Item()
it.m_bit_start = self.m_bit_index
if (len(line) < 5):
self.err_msg("Missing parameter for constant")
it.m_is_constant = True
it.m_name = line[1].strip()
if (not len(it.m_name)):
self.err_msg("Name cannot be blank")
if (len(it.m_name.split()) > 1):
self.err_msg("Name must not contain spaces")
it.m_data_type = line[2].strip()
it.m_data_type = it.m_data_type.lower()
if (it.m_data_type != "integer" and
it.m_data_type != "float" and
it.m_data_type != "text"):
self.err_msg("Invalid date type: '" + it.data_tyoe + "'")
it.m_constant_value = line[3]
if (it.m_data_type == "integer"):
it.m_constant_value = it.m_constant_value.strip()
if (not it.m_constant_value.isdigit()):
self.err_msg("Invalid numeric value: '" + it.m_constant_value + "'")
it.m_constant_value = int(it.m_constant_value)
elif (it.m_data_type == "float"):
it.m_constant_value = it.m_constant_value.strip()
try:
f = float(it.m_constant_value)
except ValueError:
self.err_msg("Invalid floating point value: '" + it.m_constant_value + "'")
it.m_constant_value = f
s = line[4].strip()
if (not s.isdigit()):
self.err_msg("Illegal value for number of bits: '" + s + "'")
bits = int(s)
if (bits < 1):
self.err_msg("Illegal value for number of bits: '" + s + "'")
it.m_bits = bits
self.m_bit_index += it.m_bits
if (len(line) > 5):
it.m_comment = line[5]
self.m_item_list.append(it)
# packet_complete() is called when a packet definition is complete to save
# the packet
def packet_complete(self):
global tlm_input_line_num
global tlm_packet_list
global tlm_max_packet_bytes
global tlm_duration
global tlm_period
global verbose
if (not len(self.m_header_list) and not len(self.m_item_list)):
return
self.m_bytes = (self.m_bit_index + 7) / 8
if (self.m_bytes > tlm_max_packet_bytes):
tlm_max_packet_bytes = self.m_bytes
for field in self.m_header_list:
if (field.m_bits > self.m_max_field_bits):
self.m_max_field_bits = field.m_bits
for item in self.m_item_list:
if (not item.m_is_reserve and not item.m_is_constant):
if (item.m_bits > self.m_max_field_bits):
self.m_max_field_bits = item.m_bits
if (verbose):
if tlm_duration is not None:
print("Duration (in seconds only): %f" % tlm_duration)
if tlm_period is not None:
print("Run or Sample Period (in hz. only): %f" % tlm_period)
if self.m_freq is not None:
print("Packet frequency (Hz.): " + str(self.m_freq))
if self.m_offset is not None:
print("Packet offset: " + str(self.m_offset))
print("packet size in bits: " + str(self.m_bit_index) + " (" + str(self.m_bit_index / 8) + " bytes)")
print("Number of fixed-value fields:" + str(self.m_num_fixed_fields) + ", variable fields: " + str(self.m_num_variable_fields))
print("name: ", self.m_name)
print("packet ID: ", self.m_id, ", comment: ", self.m_id_comment)
print("Number of items in packet header list: ", len(self.m_header_list))
print("Number of general items in packet header list: ", self.num_header_general_fields)
print("Maximum field size in bits: ", self.m_max_field_bits)
for field in self.m_header_list:
print("\tType: ", field.m_type)
print("\tID: ", field.m_id)
print("\tStart bit: ", field.m_bit_start)
print("\tSize in bits: ", field.m_bits)
print("\tComment: ", field.m_comment)
print("")
print("Number of items in packet item list: ", len(self.m_item_list))
i = 0
for item in self.m_item_list:
i += 1
print("Item # ", i)
print("\tis_reserve: ", item.m_is_reserve)
print("\tis_constant: ", item.m_is_constant)
print("\tname: ", item.m_name)
print("\tid: ", item.m_id)
print("\tdata type: ", item.m_data_type)
print("\tconstant value: ", item.m_constant_value)
print("\tstart bit: ", item.m_bit_start)
print("\tbits: ", item.m_bits)
print("\tcomment: ", item.m_comment)
print()
if (self.m_name == ""):
self.err_msg("Preceeding packet has no name")
p = copy.deepcopy(self)
tlm_packet_list.append(p)
# The CsvLine class parses a single line and saves the info in the
# packet member. When a packet record is found the previously defined
# packet is saved.
class CsvLine:
def __init__(self):
global tlm_duration
self.packet = Packet()
self.keywords = {"duration" : self.packet.duration,
"runperiod" : self.packet.run_period,
"frequency" : self.packet.frequency,
"offset" : self.packet.offset,
"channel" : self.packet.channel,
"comment" : self.packet.comment,
"packet" : self.packet.packet,
"header" : self.packet.header,
"identifier" : self.packet.identifier,
"item" : self.packet.item,
"reserve" : self.packet.reserve,
"align" : self.packet.align,
"constant" : self.packet.constant}
# process() is called with a single line already tokenized as a list
def process(self, line):
global tlm_input_line_num
if (not len(line)):
return
nonblank = False
for i in range(len(line)):
s = line[i].strip()
if (len(s)):
nonblank = True
break
if (not nonblank):
return
kw = line[0].lower()
if (kw in self.keywords):
self.keywords[kw](line)
else:
print("Invalid keyword '" + line[0] + "' at line ", tlm_input_line_num)
exit(1)
def finish(self):
self.packet.packet_complete()
# CsvFile reads the input file and processes each line
class CsvFile:
def __init__(self):
self.line = CsvLine()
def process(self, name):
global tlm_input_line_num
m_file_name = name
try:
m_fp = open(name, "rU")
except:
print(("Error opening " + name))
exit()
m_reader = csv.reader(m_fp, dialect = 'excel')
for line in m_reader:
tlm_input_line_num += 1
self.line.process(line)
self.line.finish()
m_fp.close()
def sched_cycle_ids_max(max_cycle):
"""
Return a list of the maximum times an ID is assigned to a cycle slot.
"""
cycle_max_list = [0 for _ in range(max_cycle)]
cycle_offset = None
i = 0
if tlm_period == None:
return []
for p in tlm_packet_list:
if (p.m_freq is not None) and (p.m_offset is not None):
cycle_offset = int(((1.0/p.m_freq) / (1.0/tlm_period))+0.5)
offset = p.m_offset
else:
cycle_offset = None
if cycle_offset is not None:
for i in range(max_cycle):
if i % cycle_offset == 0:
if (i+offset) < max_cycle:
cycle_max_list[i + offset] += 1
#print p.m_id, p.m_freq, p.m_offset
# print cycle_offset
#print cycle_max_list
return cycle_max_list
def sched_cycle_ids(max_cycle):
"""
Return a list of cycle slots where each slot is a list of IDs assigned in that slot.
Note the list returned by sched_cycle_ids_max is the number of IDs per cycle slot.
"""
cycle_id_list = [list() for _ in range(max_cycle)]
cycle_offset = None
if tlm_period == None:
return []
for p in tlm_packet_list:
if (p.m_freq is not None) and (p.m_offset is not None):
cycle_offset = int(((1.0/p.m_freq) / (1.0/tlm_period))+0.5)
offset = p.m_offset
else:
cycle_offset = None
if cycle_offset is not None:
for i in range(max_cycle):
if i % cycle_offset == 0:
if (i+offset) < max_cycle:
cycle_id_list[i + offset].append(p.m_id)
#print p.m_id, p.m_freq, p.m_offset
for id in cycle_id_list:
if len(id) == 0:
id.append(-1)
#j = 0
#for i in cycle_id_list:
# print "%d: %s" % (j, i)
# j += 1
return cycle_id_list
def sched_id_arr_size(cycle_id_list):
"""
Return total size for the scheduled ID array.
"""
id_list_size = 0
for l in cycle_id_list:
s = len(l)
if s == 0:
id_list_size += 1
else:
id_list_size += s
return id_list_size
def id_to_channel_map():
"""
Return an association list of ID to channel number assignments.
NOTE: the channel number is intended to be 1553 sub-address.
"""
pass
def output_cpp(output_file, template_file):
global tlm_duration
global tlm_period
global tlm_packet_list
global tlm_max_packet_bytes
t = Template(file=template_file)
t.tlm_packet_list = tlm_packet_list
# Generate schedule code if both duration and period defined
if( (tlm_duration is not None) and (tlm_period is not None) ):
t.tlm_max_cycles = int(tlm_duration*tlm_period)
else:
t.tlm_max_cycles = None
t.tlm_cycle_max_list = sched_cycle_ids_max(t.tlm_max_cycles)
t.tlm_cycle_id_list = sched_cycle_ids(t.tlm_max_cycles)
# Create ID to channel mapping
t.tlm_cycle_id_arr_size = sched_id_arr_size(t.tlm_cycle_id_list)
t.tlm_max_packet_bytes = tlm_max_packet_bytes
t.tlm_max_num_chan = sum([pkt.m_chan is not None for pkt in tlm_packet_list])
# Create duration and period values
t.tlm_max_num_freq = sum([pkt.m_freq is not None for pkt in tlm_packet_list])
if tlm_duration is not None:
t.tlm_duration = tlm_duration
else:
t.tlm_duration = -1.0
#
if tlm_period is not None:
t.tlm_period = tlm_period
else:
t.tlm_period = -1
f = open(output_file, "w")
print(t, file=f)
def output_html():
global tlm_packet_list
pass
def output_dict():
global tlm_packet_list
pass
def output(cpp_output_file, template_file):
output_cpp(cpp_output_file, template_file)
output_html()
output_dict()
def main():
global verbose
parser = argparse.ArgumentParser(description='Process a telemetry definition file')
parser.add_argument('-v', '--verbose',
action='store_true',
help='Enable verbose mode')
parser.add_argument('--cpp',
action = 'store',
required = True,
help = 'Required. Fully-qualified output cpp file name')
parser.add_argument('--template',
action = 'store',
required = True,
help = 'Required. Fully-qualified file name of the cheetah template used by this script (normally in Autocoders/Python/src/utils/TlmPacketDefBase.tmpl)')
parser.add_argument('input_file',
nargs='+',
help = 'One or more input tab-delimited csv file names')
args = parser.parse_args()
verbose = args.verbose
cpp_output_file = args.cpp
template_file = args.template
f = CsvFile()
for file in args.input_file:
f.process(file)
output(cpp_output_file, template_file)
if (__name__ == '__main__'):
main()