Håkon Vågsether ee6fc4a419 grc: Add GRC Qt
Signed-off-by: Håkon Vågsether <hakon.vagsether@gmail.com>
2024-03-15 15:11:27 -04:00

791 lines
27 KiB
Python

"""
Copyright 2008-2020 Free Software Foundation, Inc.
Copyright 2021 GNU Radio contributors
This file is part of GNU Radio
SPDX-License-Identifier: GPL-2.0-or-later
"""
import collections
import itertools
import copy
import re
import ast
import typing
from ._templates import MakoTemplates, no_quotes
from ._flags import Flags
from ..base import Element
from ..params import Param
from ..utils.descriptors import lazy_property
def _get_elem(iterable, key):
items = list(iterable)
for item in items:
if item.key == key:
return item
return ValueError('Key "{}" not found in {}.'.format(key, items))
class Block(Element):
is_block = True
STATE_LABELS = ['disabled', 'enabled', 'bypassed']
key = ''
label = ''
category = []
vtype = '' # This is only used for variables when we want C++ output
flags = Flags('')
documentation = {'': ''}
doc_url = ''
value = None
asserts = []
templates = MakoTemplates()
parameters_data = []
inputs_data = []
outputs_data = []
extra_data = {}
loaded_from = '(unknown)'
def __init__(self, parent):
"""Make a new block from nested data."""
super(Block, self).__init__(parent)
param_factory = self.parent_platform.make_param
port_factory = self.parent_platform.make_port
self.params: typing.OrderedDict[str, Param] = collections.OrderedDict(
(data['id'], param_factory(parent=self, **data)) for data in self.parameters_data)
if self.key == 'options':
self.params['id'].hide = 'part'
self.sinks = [port_factory(parent=self, **params)
for params in self.inputs_data]
self.sources = [port_factory(parent=self, **params)
for params in self.outputs_data]
self.active_sources = [] # on rewrite
self.active_sinks = [] # on rewrite
self.states = {'state': 'enabled', 'bus_source': False,
'bus_sink': False, 'bus_structure': None}
self.block_namespace = {}
self.deprecated = self.is_deprecated()
if Flags.HAS_CPP in self.flags and self.enabled and not (self.is_virtual_source() or self.is_virtual_sink()):
# This is a workaround to allow embedded python blocks/modules to load as there is
# currently 'cpp' in the flags by default caused by the other built-in blocks
if hasattr(self, 'cpp_templates'):
# The original template, in case we have to edit it when transpiling to C++
self.orig_cpp_templates = self.cpp_templates
self.current_bus_structure = {'source': None, 'sink': None}
def get_bus_structure(self, direction):
if direction == 'source':
bus_structure = self.bus_structure_source
else:
bus_structure = self.bus_structure_sink
if not bus_structure:
return None
try:
clean_bus_structure = self.evaluate(bus_structure)
return clean_bus_structure
except Exception:
return None
# region Rewrite_and_Validation
def rewrite(self):
"""
Add and remove ports to adjust for the nports.
"""
Element.rewrite(self)
def rekey(ports):
"""Renumber non-message/message ports"""
domain_specific_port_index = collections.defaultdict(int)
for port in ports:
if not port.key.isdigit():
continue
domain = port.domain
port.key = str(domain_specific_port_index[domain])
domain_specific_port_index[domain] += 1
# Adjust nports
for ports in (self.sources, self.sinks):
self._rewrite_nports(ports)
rekey(ports)
self.update_bus_logic()
# disconnect hidden ports
self.parent_flowgraph.disconnect(
*[p for p in self.ports() if p.hidden])
self.active_sources = [p for p in self.sources if not p.hidden]
self.active_sinks = [p for p in self.sinks if not p.hidden]
# namespaces may have changed, update them
self.block_namespace.clear()
imports = ""
try:
imports = self.templates.render('imports')
exec(imports, self.block_namespace)
except ImportError:
# We do not have a good way right now to determine if an import is for a
# hier block, these imports will fail as they are not in the search path
# this is ok behavior, unfortunately we could be hiding other import bugs
pass
except Exception:
self.add_error_message(
f'Failed to evaluate import expression {imports!r}')
def update_bus_logic(self):
###############################
# Bus Logic
###############################
for direc in {'source', 'sink'}:
if direc == 'source':
ports = self.sources
ports_gui = self.filter_bus_port(self.sources)
bus_state = self.bus_source
else:
ports = self.sinks
ports_gui = self.filter_bus_port(self.sinks)
bus_state = self.bus_sink
# Remove the bus ports
removed_bus_ports = []
removed_bus_connections = []
if 'bus' in map(lambda a: a.dtype, ports):
for port in ports_gui:
for c in self.parent_flowgraph.connections:
if port is c.source_port or port is c.sink_port:
removed_bus_ports.append(port)
removed_bus_connections.append(c)
ports.remove(port)
if (bus_state):
struct = self.form_bus_structure(direc)
self.current_bus_structure[direc] = struct
# Hide ports that are not part of the bus structure
# TODO: Blocks where it is desired to only have a subset
# of ports included in the bus still has some issues
for idx, port in enumerate(ports):
if any([idx in bus for bus in self.current_bus_structure[direc]]):
if (port.stored_hidden_state is None):
port.stored_hidden_state = port.hidden
port.hidden = True
# Add the Bus Ports to the list of ports
for i in range(len(struct)):
# self.sinks = [port_factory(parent=self, **params) for params in self.inputs_data]
port = self.parent.parent.make_port(self, direction=direc, id=str(
len(ports)), label='bus', dtype='bus', bus_struct=struct[i])
ports.append(port)
for (saved_port, connection) in zip(removed_bus_ports, removed_bus_connections):
if port.key == saved_port.key:
self.parent_flowgraph.connections.remove(
connection)
if saved_port.is_source:
connection.source_port = port
if saved_port.is_sink:
connection.sink_port = port
self.parent_flowgraph.connections.add(connection)
else:
self.current_bus_structure[direc] = None
# Re-enable the hidden property of the ports
for port in ports:
if (port.stored_hidden_state is not None):
port.hidden = port.stored_hidden_state
port.stored_hidden_state = None
def _rewrite_nports(self, ports):
for port in ports:
if hasattr(port, 'master_port'): # Not a master port and no left-over clones
port.dtype = port.master_port.dtype
port.vlen = port.master_port.vlen
continue
nports = port.multiplicity
for clone in port.clones[nports - 1:]:
# Remove excess connections
self.parent_flowgraph.disconnect(clone)
port.remove_clone(clone)
ports.remove(clone)
# Add more cloned ports
for j in range(1 + len(port.clones), nports):
clone = port.add_clone()
ports.insert(ports.index(port) + j, clone)
def validate(self):
"""
Validate this block.
Call the base class validate.
Evaluate the checks: each check must evaluate to True.
"""
Element.validate(self)
self._run_asserts()
self._validate_generate_mode_compat()
self._validate_output_language_compat()
self._validate_var_value()
def _run_asserts(self):
"""Evaluate the checks"""
for expr in self.asserts:
try:
if not self.evaluate(expr):
self.add_error_message(
'Assertion "{}" failed.'.format(expr))
except Exception:
self.add_error_message(
'Assertion "{}" did not evaluate.'.format(expr))
def _validate_generate_mode_compat(self):
"""check if this is a GUI block and matches the selected generate option"""
current_generate_option = self.parent.get_option('generate_options')
def check_generate_mode(label, flag, valid_options):
block_requires_mode = (
flag in self.flags or self.label.upper().startswith(label)
)
if block_requires_mode and current_generate_option not in valid_options:
self.add_error_message("Can't generate this block in mode: {} ".format(
repr(current_generate_option)))
check_generate_mode('QT GUI', Flags.NEED_QT_GUI,
('qt_gui', 'hb_qt_gui'))
def _validate_output_language_compat(self):
"""check if this block supports the selected output language"""
current_output_language = self.parent.get_option('output_language')
if current_output_language == 'cpp':
if 'cpp' not in self.flags:
self.add_error_message(
"This block does not support C++ output.")
if self.key == 'parameter':
if not self.params['type'].value:
self.add_error_message(
"C++ output requires you to choose a parameter type.")
def _validate_var_value(self):
"""or variables check the value (only if var_value is used)"""
if self.is_variable and self.value != 'value':
try:
self.parent_flowgraph.evaluate(
self.value, local_namespace=self.namespace)
except Exception as err:
self.add_error_message(
'Value "{}" cannot be evaluated:\n{}'.format(self.value, err))
# endregion
# region Properties
def __str__(self):
return 'Block - {} - {}({})'.format(self.name, self.label, self.key)
def __repr__(self):
try:
name = self.name
except Exception:
name = self.key
return 'block[' + name + ']'
@property
def name(self):
return self.params['id'].value
@lazy_property
def is_virtual_or_pad(self):
return self.key in ("virtual_source", "virtual_sink", "pad_source", "pad_sink")
@lazy_property
def is_variable(self):
return bool(self.value)
@lazy_property
def is_import(self):
return self.key == 'import'
@lazy_property
def is_snippet(self):
return self.key == 'snippet'
@property
def comment(self):
return self.params['comment'].value
@property
def state(self):
"""Gets the block's current state."""
state = self.states['state']
return state if state in self.STATE_LABELS else 'enabled'
@state.setter
def state(self, value):
"""Sets the state for the block."""
self.states['state'] = value
# Enable/Disable Aliases
@property
def enabled(self):
"""Get the enabled state of the block"""
return self.state != 'disabled'
@property
def bus_sink(self):
"""Gets the block's current Toggle Bus Sink state."""
return self.states['bus_sink']
@bus_sink.setter
def bus_sink(self, value):
"""Sets the Toggle Bus Sink state for the block."""
self.states['bus_sink'] = value
@property
def bus_source(self):
"""Gets the block's current Toggle Bus Sink state."""
return self.states['bus_source']
@bus_source.setter
def bus_source(self, value):
"""Sets the Toggle Bus Source state for the block."""
self.states['bus_source'] = value
@property
def bus_structure_source(self):
"""Gets the block's current source bus structure."""
try:
bus_structure = self.params['bus_structure_source'].value or None
except Exception:
bus_structure = None
return bus_structure
@property
def bus_structure_sink(self):
"""Gets the block's current source bus structure."""
try:
bus_structure = self.params['bus_structure_sink'].value or None
except Exception:
bus_structure = None
return bus_structure
# endregion
##############################################
# Getters (old)
##############################################
def get_var_make(self):
return self.templates.render('var_make')
def get_cpp_var_make(self):
return self.cpp_templates.render('var_make')
def get_var_value(self):
return self.templates.render('var_value')
def get_callbacks(self):
"""
Get a list of function callbacks for this block.
Returns:
a list of strings
"""
def make_callback(callback):
if 'self.' in callback:
return callback
return 'self.{}.{}'.format(self.name, callback)
return [make_callback(c) for c in self.templates.render('callbacks')]
def get_cpp_callbacks(self):
"""
Get a list of C++ function callbacks for this block.
Returns:
a list of strings
"""
def make_callback(callback):
if self.is_variable:
return callback
if 'this->' in callback:
return callback
return 'this->{}->{}'.format(self.name, callback)
return [make_callback(c) for c in self.cpp_templates.render('callbacks')]
def format_expr(self, py_type):
"""
Evaluate the value of the variable block and decide its type.
Returns:
None
"""
value = self.params['value'].value
self.cpp_templates = copy.copy(self.orig_cpp_templates)
# Determine the lvalue type
def get_type(element: str, vtype: typing.Optional[type] = None) -> str:
evaluated = None
try:
evaluated = ast.literal_eval(element)
if vtype is None:
vtype = type(evaluated)
except ValueError or SyntaxError as excp:
if vtype is None:
print(excp)
simple_types = {int: "long", float: "double", bool: "bool", complex: "gr_complex", str: "std::string"}
if vtype in simple_types:
return simple_types[vtype]
elif vtype == list:
try:
# For container types we must also determine the type of the template parameter(s)
return f"std::vector<{get_type(str(evaluated[0]), type(evaluated[0]))}>"
except IndexError: # empty list
return 'std::vector<std::string>'
elif vtype == dict:
try:
# For container types we must also determine the type of the template parameter(s)
key, val = next(iter(evaluated.entries()))
return f"std::map<{get_type(str(key), type(key))}, {get_type(str(val), type(val))}>"
except IndexError: # empty dict
return 'std::map<std::string, std::string>'
# Get the lvalue type
self.vtype = get_type(value, py_type)
# The r-value for these types must be transformed to create legal C++ syntax.
if self.vtype in ['bool', 'gr_complex'] or 'std::map' in self.vtype or 'std::vector' in self.vtype:
evaluated = ast.literal_eval(value)
self.cpp_templates['var_make'] = self.cpp_templates['var_make'].replace(
'${value}', self.get_cpp_value(evaluated))
if 'string' in self.vtype:
self.cpp_templates['includes'].append('#include <string>')
def get_cpp_value(self, pyval, vtype: typing.Optional[type] = None) -> str:
"""
Convert an evaluated variable value from Python to C++ with a defined type.
Returns:
string representation of the C++ value
"""
if vtype is None:
vtype = type(pyval)
else:
assert vtype == type(pyval)
if vtype == int or vtype == float:
val_str = str(pyval)
# Check for PI and replace with C++ constant
pi_re = r'^(math|numpy|np|scipy|sp)\.pi$'
if re.match(pi_re, str(pyval)):
val_str = re.sub(
pi_re, 'boost::math::constants::pi<double>()', val_str)
self.cpp_templates['includes'].append(
'#include <boost/math/constants/constants.hpp>')
return str(pyval)
elif vtype == bool:
return str(pyval).lower()
elif vtype == complex:
self.cpp_templates['includes'].append(
'#include <gnuradio/gr_complex.h>')
evaluated = ast.literal_eval(str(pyval).strip())
return '{' + str(evaluated.real) + ', ' + str(evaluated.imag) + '}'
elif vtype == list:
self.cpp_templates['includes'].append('#include <vector>')
if len(pyval) == 0:
return '{}'
item_type = type(pyval[0])
elements = [str(self.get_cpp_value(element, item_type)) for element in pyval]
return '{' + ', '.join(elements) + '}'
elif vtype == dict:
self.cpp_templates['includes'].append('#include <map>')
key_type, val_type = next(iter(pyval.entries()))
key_type, val_type = type(key_type), type(val_type)
entries = ['{' + self.get_cpp_value(key, key_type) + ', ' + self.get_cpp_value(val, val_type) + '}'
for key, val in pyval.entries()]
return '{' + ', '.join(entries) + '}'
elif vtype == str:
self.cpp_templates['includes'].append('#include <string>')
value = pyval.strip()
if value in ['""', "''"]:
return '""'
return f'"{no_quotes(value)}"'
raise TypeError(f"Unsupported C++ vtype: {vtype}")
def is_virtual_sink(self):
return self.key == 'virtual_sink'
def is_virtual_source(self):
return self.key == 'virtual_source'
def is_deprecated(self):
"""
Check whether the block is deprecated.
For now, we just check the category name for presence of "deprecated".
As it might be desirable in the future to have such "tags" be stored
explicitly, we're taking the detour of introducing a property.
"""
if not self.category:
return False
try:
return (self.flags.deprecated or
any("deprecated".casefold() in cat.casefold()
for cat in self.category))
except Exception as exception:
print(exception.message)
return False
# Block bypassing
def get_bypassed(self):
"""
Check if the block is bypassed
"""
return self.state == 'bypassed'
def set_bypassed(self):
"""
Bypass the block
Returns:
True if block changes state
"""
if self.state != 'bypassed' and self.can_bypass():
self.state = 'bypassed'
return True
return False
def can_bypass(self):
"""
Check the number of sinks and sources and see if this block can be bypassed
"""
# Check to make sure this is a single path block
# Could possibly support 1 to many blocks
if len(self.sources) != 1 or len(self.sinks) != 1:
return False
if not (self.sources[0].dtype == self.sinks[0].dtype):
return False
if self.flags.disable_bypass:
return False
return True
def ports(self):
return itertools.chain(self.sources, self.sinks)
def active_ports(self):
return itertools.chain(self.active_sources, self.active_sinks)
def children(self):
return itertools.chain(self.params.values(), self.ports())
def connections(self):
block_connections = []
for port in self.ports():
block_connections = block_connections + list(port.connections())
return block_connections
##############################################
# Access
##############################################
def get_sink(self, key):
return _get_elem(self.sinks, key)
def get_source(self, key):
return _get_elem(self.sources, key)
##############################################
# Resolve
##############################################
@property
def namespace(self):
# update block namespace
self.block_namespace.update({key: param.get_evaluated() for key, param in self.params.items()})
return self.block_namespace
@property
def namespace_templates(self):
return {key: param.template_arg for key, param in self.params.items()}
def evaluate(self, expr):
return self.parent_flowgraph.evaluate(expr, self.namespace)
##############################################
# Import/Export Methods
##############################################
def export_data(self):
"""
Export this block's params to nested data.
Returns:
a nested data odict
"""
data = collections.OrderedDict()
if self.key != 'options':
data['name'] = self.name
data['id'] = self.key
data['parameters'] = collections.OrderedDict(sorted(
(param_id, param.value)
for param_id, param in self.params.items()
if (param_id != 'id' or self.key == 'options')
))
data['states'] = collections.OrderedDict(sorted(self.states.items()))
return data
def import_data(self, name, states, parameters, **_):
"""
Import this block's params from nested data.
Any param keys that do not exist will be ignored.
Since params can be dynamically created based another param,
call rewrite, and repeat the load until the params stick.
"""
self.params['id'].value = name
self.states.update(states)
def get_hash():
return hash(tuple(hash(v) for v in self.params.values()))
pre_rewrite_hash = -1
while pre_rewrite_hash != get_hash():
for key, value in parameters.items():
try:
self.params[key].set_value(value)
except KeyError:
continue
# Store hash and call rewrite
pre_rewrite_hash = get_hash()
self.rewrite()
##############################################
# Controller Modify
##############################################
def filter_bus_port(self, ports):
buslist = [p for p in ports if p.dtype == 'bus']
return buslist or ports
def type_controller_modify(self, direction):
"""
Change the type controller.
Args:
direction: +1 or -1
Returns:
true for change
"""
changed = False
type_param = None
for param in filter(lambda p: p.is_enum(), self.get_params()):
children = self.get_ports() + self.get_params()
# Priority to the type controller
if param.get_key() in ' '.join(map(lambda p: p._type, children)):
type_param = param
# Use param if type param is unset
if not type_param:
type_param = param
if type_param:
# Try to increment the enum by direction
try:
keys = type_param.get_option_keys()
old_index = keys.index(type_param.get_value())
new_index = (old_index + direction + len(keys)) % len(keys)
type_param.set_value(keys[new_index])
changed = True
except Exception:
pass
return changed
def form_bus_structure(self, direc):
if direc == 'source':
ports = self.sources
bus_structure = self.get_bus_structure('source')
else:
ports = self.sinks
bus_structure = self.get_bus_structure('sink')
struct = [range(len(ports))]
# struct = list(range(len(ports)))
# TODO for more complicated port structures, this code is needed but not working yet
if any([p.multiplicity for p in ports]):
structlet = []
last = 0
# group the ports with > n inputs together on the bus
cnt = 0
idx = 0
for p in ports:
if p.domain == 'message':
continue
if cnt > 0:
cnt -= 1
continue
if p.multiplicity > 1:
cnt = p.multiplicity - 1
structlet.append([idx + j for j in range(p.multiplicity)])
else:
structlet.append([idx])
struct = structlet
if bus_structure:
struct = bus_structure
self.current_bus_structure[direc] = struct
return struct
def bussify(self, direc):
if direc == 'source':
ports = self.sources
if ports:
ports_gui = self.filter_bus_port(self.sources)
self.bus_structure = self.get_bus_structure('source')
self.bus_source = not self.bus_source
else:
ports = self.sinks
if ports:
ports_gui = self.filter_bus_port(self.sinks)
self.bus_structure = self.get_bus_structure('sink')
self.bus_sink = not self.bus_sink
# Disconnect all the connections when toggling the bus state
for port in ports:
l_connections = list(port.connections())
for connect in l_connections:
self.parent.remove_element(connect)
self.update_bus_logic()