# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.
"""
Hardware functions for the TPM 1.2 hardware.
This is a transcript of the corresponding class from the pyaavs library,
with code style modified for SKA coding conventions. It depends heavily
on the pyfabil low level software and specific hardware module plugins.
"""
from __future__ import annotations # allow forward references in type hints
__author__ = "Alessio Magro"
import logging
import time
from typing import Any, Optional
from pyfabil.base.definitions import (
Status,
PluginError,
BoardError,
BoardMake,
Device,
firmware,
compatibleboards,
friendlyname,
maxinstances,
)
from pyfabil.plugins.firmwareblock import FirmwareBlock
from time import sleep
__all__ = ["TpmTestFirmware"]
[docs]
class TpmTestFirmware(FirmwareBlock):
"""FirmwareBlock tests class."""
@firmware({"design": "tpm_test", "major": "1", "minor": ">1"})
@compatibleboards(BoardMake.TpmBoard)
@friendlyname("tpm_test_firmware")
@maxinstances(2)
def __init__(self: TpmTestFirmware, board: Any, **kwargs: Any) -> None:
"""
Initialize a new TpmTestFirmware instance.
:param board: Pointer to board instance
:param kwargs: named arguments
:raises PluginError: device argument must be specified
"""
super(TpmTestFirmware, self).__init__(board)
# Device must be specified in kwargs
if kwargs.get("device", None) is None:
raise PluginError("TpmTestFirmware requires device argument")
self._device = kwargs["device"]
if kwargs.get("fsample", None) is None:
logging.info("TpmTestFirmware: Setting default sampling frequency 800 MHz.")
self._fsample = 800e6
else:
self._fsample = float(kwargs["fsample"])
self._dsp_core: Optional[bool] = kwargs.get("dsp_core")
if self._dsp_core is None:
logging.debug(
"TpmTestFirmware: Setting default value True to dsp_core flag."
)
self._dsp_core = True
if not self._dsp_core:
logging.info(
"TpmTestFirmware: dsp_core flag is False."
)
self._device_name = "fpga1" if self._device is Device.FPGA_1 else "fpga2"
# retrieving firmware features from feature register
self.xg_eth = False
self.xg_40g_eth = False
self.tile_beamformer_implemented = False
self.station_beamformer_implemented = False
self.antenna_buffer_implemented = False
self.multiple_channel_tx_implemented = False
self.multiple_channel_tx_nof_channels = 0
if self.board["fpga1.regfile.feature.xg_eth_implemented"] == 1:
self.xg_eth = True
if self.board["fpga1.regfile.feature.xg_eth_40g_implemented"] == 1:
self.xg_40g_eth = True
if self.board["fpga1.dsp_regfile.feature.tile_beamformer_implemented"] == 1:
self.tile_beamformer_implemented = True
if self.board["fpga1.dsp_regfile.feature.station_beamformer_implemented"] == 1:
self.station_beamformer_implemented = True
if self.board["fpga1.dsp_regfile.feature.antenna_buffer_implemented"] == 1:
self.antenna_buffer_implemented = True
if self.board.memory_map.has_register("fpga1.dsp_regfile.feature.multiple_channels_mode_implemented") and\
self.board["fpga1.dsp_regfile.feature.multiple_channels_mode_implemented"] == 1:
self.multiple_channel_tx_implemented = True
self.multiple_channel_tx_nof_channels = self.board["fpga1.dsp_regfile.feature.nof_multiple_channels"]
# plugins
self._jesd1 = None
self._jesd2 = None
self._fpga = None
self._teng = []
self._f2f = []
self._spead_gen = []
self._fortyg = None
self._sysmon = None
self._clock_monitor = None
self._beamf = None
self._testgen = None
self._patterngen = None
self._power_meter = None
self._integrator = None
self._station_beamf = None
self._antenna_buffer = None
self._multiple_channel_tx = None
self.load_plugin()
[docs]
def load_plugin(self: TpmTestFirmware) -> None:
"""Load required plugin."""
self._jesd1 = self.board.load_plugin("TpmJesd", device=self._device, core=0, frame_length=216)
self._jesd2 = self.board.load_plugin("TpmJesd", device=self._device, core=1, frame_length=216)
self._fpga = self.board.load_plugin("TpmFpga", device=self._device)
if self.xg_eth and self.xg_40g_eth:
self._fortyg = self.board.load_plugin(
"TpmFortyGCoreXg", device=self._device, core=0
)
self._f2f = [
self.board.load_plugin("TpmFpga2Fpga", core=0),
self.board.load_plugin("TpmFpga2Fpga", core=1),
]
self._sysmon = self.board.load_plugin("TpmSysmon", device=self._device)
self._clock_monitor = self.board.load_plugin("TpmClockmon", device=self._device)
if self._dsp_core:
if self.tile_beamformer_implemented:
self._beamf = self.board.load_plugin("BeamfFD", device=self._device)
if self.station_beamformer_implemented:
self._station_beamf = self.board.load_plugin(
"StationBeamformer", device=self._device
)
if self.antenna_buffer_implemented:
self._antenna_buffer = self.board.load_plugin(
"AntennaBuffer", device=self._device
)
self._testgen = self.board.load_plugin(
"TpmTestGenerator", device=self._device, fsample=self._fsample
)
self._patterngen = self.board.load_plugin(
"TpmPatternGenerator", device=self._device, fsample=self._fsample
)
self._power_meter = self.board.load_plugin(
"AdcPowerMeter", device=self._device, fsample=self._fsample
)
self._integrator = self.board.load_plugin(
"TpmIntegrator", device=self._device, fsample=self._fsample
)
self._spead_gen = [
self.board.load_plugin("SpeadTxGen", device=self._device, core=0),
self.board.load_plugin("SpeadTxGen", device=self._device, core=1),
self.board.load_plugin("SpeadTxGen", device=self._device, core=2),
self.board.load_plugin("SpeadTxGen", device=self._device, core=3),
]
if self.multiple_channel_tx_implemented:
self._multiple_channel_tx = self.board.load_plugin(
"MultipleChannelTx", device=self._device
)
[docs]
def initialise_firmware(self: TpmTestFirmware) -> None:
"""
Initialise firmware components.
:raises BoardError: cannot configure JESD core
"""
max_retries = 4
retries = 0
while (
self.board[self._device_name + ".jesd204_if.regfile_status"] & 0x1F != 0x1E
and retries < max_retries
):
if retries > 0:
logging.debug(
"Retrying JESD cores configuration of " + self._device_name.upper()
)
# Reset FPGA
self._fpga.fpga_global_reset()
self._fpga.fpga_mmcm_config(self._fsample)
self._fpga.fpga_jesd_gth_config(self._fsample)
self._fpga.fpga_reset()
# Start JESD cores
self._jesd1.jesd_core_start()
self._jesd2.jesd_core_start()
# Initialise FPGAs
# I have no idea what these ranges are
self._fpga.fpga_start(range(16), range(16))
retries += 1
sleep(0.2)
if retries == max_retries:
raise BoardError("TpmTestFirmware: Could not configure JESD cores")
# Initialise DDR
self.start_ddr_initialisation()
# Initialise power meter
self._power_meter.initialise()
# Initialise 10G/40G cores
if self.board["fpga1.regfile.feature.xg_eth_implemented"] == 1:
if self.xg_40g_eth:
self._fortyg.initialise_core()
else:
for teng in self._teng:
teng.initialise_core()
self._patterngen.initialise()
#######################################################################################
[docs]
def check_ddr_voltage(self: TpmTestFirmware) -> None:
"""Check if DDR voltage regulator is enabled, if not enable it. TPM 1.2 only"""
if self.board.memory_map.has_register("board.regfile.ctrl.en_ddr_vdd"):
if self.board["board.regfile.ctrl.en_ddr_vdd"] == 0:
self.board["board.regfile.ctrl.en_ddr_vdd"] = 1
time.sleep(0.5)
[docs]
def start_ddr_initialisation(self: TpmTestFirmware) -> None:
"""Start DDR initialisation."""
self.check_ddr_voltage()
logging.debug(self._device_name + " DDR reset")
self.board[self._device_name + ".regfile.reset.ddr_rst"] = 0x1
self.board[self._device_name + ".regfile.reset.ddr_rst"] = 0x0
# TODO: Move to a DDR plugin
[docs]
def check_ddr_initialisation(self: TpmTestFirmware) -> bool:
"""Check whether DDR has initialised."""
if self.board.memory_map.has_register(
self._device_name + ".regfile.stream_status.ddr_init_done"
):
status = self.board[
self._device_name + ".regfile.stream_status.ddr_init_done"
]
else:
status = self.board[self._device_name + ".regfile.status.ddr_init_done"]
if status == 0x0:
logging.debug("DDR of " + self._device_name.upper() + " is not initialised")
return False
else:
logging.debug("DDR of " + self._device_name.upper() + " initialised!")
return True
# TODO: Move to a DDR plugin
[docs]
def check_ddr_user_reset_counter(self: TpmTestFirmware, show_result=True) -> int:
"""
Return value of DDR user reset counter - increments each falling edge
of the DDR generated user logic reset.
"""
count = self.board[f'{self._device_name}.ddr_if.status.ddr_user_rst_cnt']
if show_result:
logging.info(f'{self._device_name.upper()} error count {count}')
return count
# TODO: Move to a DDR plugin
[docs]
def clear_ddr_user_reset_counter(self: TpmTestFirmware) -> None:
"""Reset value of DDR reset counter"""
self.board[f'{self._device_name}.ddr_if.status.ddr_monitoring_reset'] = 1
[docs]
def initialise_ddr(self: TpmTestFirmware) -> None:
"""Initialise DDR."""
for _n in range(3):
self.start_ddr_initialisation()
for _m in range(5):
time.sleep(0.2)
if self.check_ddr_initialisation():
return
logging.error("Cannot initialise DDR of " + self._device_name.upper())
[docs]
def check_pps_status(self: TpmTestFirmware) -> bool:
"""Check PPS detected and error free"""
pps_detect = self.board[f'{self._device_name}.pps_manager.pps_detected']
pps_error = self.board[f'{self._device_name}.pps_manager.pps_errors.pps_count_error']
return True if pps_detect and not pps_error else False
[docs]
def clear_pps_status(self: TpmTestFirmware) -> None:
"""Clear PPS errors"""
self.board[f'{self._device_name}.pps_manager.pps_errors.pps_errors_rst'] = 1
return
[docs]
def send_raw_data(self: TpmTestFirmware) -> None:
"""Send raw data from the TPM."""
self.board[self._device_name + ".lmc_gen.raw_all_channel_mode_enable"] = 0x0
self.board[self._device_name + ".lmc_gen.request.raw_data"] = 0x1
[docs]
def send_raw_data_synchronised(self: TpmTestFirmware) -> None:
"""Send raw data from the TPM."""
self.board[self._device_name + ".lmc_gen.raw_all_channel_mode_enable"] = 0x1
self.board[self._device_name + ".lmc_gen.request.raw_data"] = 0x1
[docs]
def send_channelised_data(
self: TpmTestFirmware,
number_of_samples: int = 128,
first_channel: int = 0,
last_channel: int = 511,
) -> None:
"""
Send channelized data from the TPM.
:param number_of_samples: contiguous time samples sent per channel
:param first_channel: First channel transmitted
:param last_channel: Last channel transmitted + 1 (python range convention)
"""
# get bitfiled configuration of single_channel_mode register
single_channel_mode_enable_shift = self.board.memory_map.register_list[
self._device_name + ".lmc_gen.channelized_single_channel_mode.enable"].shift
single_channel_mode_last_shift = self.board.memory_map.register_list[
self._device_name + ".lmc_gen.channelized_single_channel_mode.last"].shift
single_channel_mode_last_id = self.board.memory_map.register_list[
self._device_name + ".lmc_gen.channelized_single_channel_mode.id"].shift
# build register value
single_channel_mode_register = (0 << single_channel_mode_enable_shift) | \
(last_channel << single_channel_mode_last_shift) | \
(first_channel << single_channel_mode_last_id)
# write register value into firmware register
self.board[
self._device_name + ".lmc_gen.channelized_single_channel_mode"
] = single_channel_mode_register
self.board[self._device_name + ".lmc_gen.channelized_pkt_length"] = (
number_of_samples - 1
)
if (
len(
self.board.find_register(
self._device_name + ".lmc_gen.channelized_ddc_mode"
)
)
!= 0
):
self.board[self._device_name + ".lmc_gen.channelized_ddc_mode"] = 0x0
self.board[self._device_name + ".lmc_gen.request.channelized_data"] = 0x1
[docs]
def send_channelised_data_continuous(
self: TpmTestFirmware, channel_id: int, number_of_samples: int = 128
) -> None:
"""
Continuously send channelised data from a single channel.
:param channel_id: Channel ID
:param number_of_samples: contiguous time samples sent per channel
"""
# get bitfiled configuration of single_channel_mode register
single_channel_mode_enable_shift = self.board.memory_map.register_list[
self._device_name + ".lmc_gen.channelized_single_channel_mode.enable"].shift
single_channel_mode_last_shift = self.board.memory_map.register_list[
self._device_name + ".lmc_gen.channelized_single_channel_mode.last"].shift
single_channel_mode_last_id = self.board.memory_map.register_list[
self._device_name + ".lmc_gen.channelized_single_channel_mode.id"].shift
# build register value
single_channel_mode_register = (1 << single_channel_mode_enable_shift) | \
(0x1FF << single_channel_mode_last_shift) | \
(channel_id << single_channel_mode_last_id)
# write register value into firmware register
self.board[
self._device_name + ".lmc_gen.channelized_single_channel_mode"
] = single_channel_mode_register
self.board[self._device_name + ".lmc_gen.channelized_pkt_length"] = (
number_of_samples - 1
)
if (
len(
self.board.find_register(
self._device_name + ".lmc_gen.channelized_ddc_mode"
)
)
!= 0
):
self.board[self._device_name + ".lmc_gen.channelized_ddc_mode"] = 0x0
self.board[self._device_name + ".lmc_gen.request.channelized_data"] = 0x1
[docs]
def send_channelised_data_narrowband(
self: TpmTestFirmware,
band_frequency: int,
round_bits: int,
number_of_samples: int = 128,
) -> None:
"""
Continuously send channelised data from a single channel in narrowband mode.
:param band_frequency: central frequency (in Hz) of narrowband
:param round_bits: number of bits rounded after filter
:param number_of_samples: samples per lmc packet
"""
if (
len(
self.board.find_register(
self._device_name + ".lmc_gen.channelized_ddc_mode"
)
)
== 0
):
logging.error(
"Narrowband channelizer is not implemented in current FPGA firmware!"
)
return
channel_spacing = 800e6 / 1024
downsampling_factor = 128
# Number of LO steps in the channel spacing
lo_steps_per_channel = 2.0 ** 24 / 32.0 * 27
if band_frequency < 50e6 or band_frequency > 350e6:
logging.error(
"Invalid frequency for narrowband lmc. Must be between 50e6 and 350e6"
)
return
hw_frequency = band_frequency / channel_spacing
channel_id = int(round(hw_frequency))
lo_frequency = (
int(round((hw_frequency - channel_id) * lo_steps_per_channel)) & 0xFFFFFF
)
# get bitfiled configuration of single_channel_mode register
single_channel_mode_enable_shift = self.board.memory_map.register_list[
self._device_name + ".lmc_gen.channelized_single_channel_mode.enable"].shift
single_channel_mode_last_shift = self.board.memory_map.register_list[
self._device_name + ".lmc_gen.channelized_single_channel_mode.last"].shift
single_channel_mode_last_id = self.board.memory_map.register_list[
self._device_name + ".lmc_gen.channelized_single_channel_mode.id"].shift
# build register value
single_channel_mode_register = (1 << single_channel_mode_enable_shift) | \
(0x1FF << single_channel_mode_last_shift) | \
(channel_id << single_channel_mode_last_id)
# write register value into firmware register
self.board[
self._device_name + ".lmc_gen.channelized_single_channel_mode"
] = single_channel_mode_register
self.board[self._device_name + ".lmc_gen.channelized_pkt_length"] = (
number_of_samples * downsampling_factor - 1
)
if (
len(
self.board.find_register(
self._device_name + ".lmc_gen.channelized_ddc_mode"
)
)
!= 0
):
self.board[self._device_name + ".lmc_gen.channelized_ddc_mode"] = (
0x90000000 | ((round_bits & 0x7) << 24) | lo_frequency
)
self.board[self._device_name + ".lmc_gen.request.channelized_data"] = 0x1
[docs]
def stop_channelised_data_narrowband(self: TpmTestFirmware) -> None:
"""Stop transmission of narrowband channel data."""
self.stop_channelised_data_continuous()
[docs]
def stop_channelised_data_continuous(self: TpmTestFirmware) -> None:
"""Stop transmission of continuous channel data."""
self.board[
self._device_name + ".lmc_gen.channelized_single_channel_mode.enable"
] = 0x0
[docs]
def stop_channelised_data(self: TpmTestFirmware) -> None:
"""Stop sending channelised data."""
self.board[
self._device_name + ".lmc_gen.channelized_single_channel_mode.enable"
] = 0x0
[docs]
def clear_lmc_data_request(self: TpmTestFirmware) -> None:
"""Stop transmission of all LMC data."""
self.board[self._device_name + ".lmc_gen.request"] = 0
[docs]
def send_beam_data(self: TpmTestFirmware) -> None:
"""Send beam data from the TPM."""
self.board[self._device_name + ".lmc_gen.request.beamformed_data"] = 0x1
[docs]
def stop_integrated_channel_data(self: TpmTestFirmware) -> None:
"""Stop receiving integrated beam data from the board."""
self._integrator.stop_integrated_channel_data()
[docs]
def stop_integrated_beam_data(self: TpmTestFirmware) -> None:
"""Stop receiving integrated beam data from the board."""
self._integrator.stop_integrated_beam_data()
[docs]
def stop_integrated_data(self) -> None:
"""Stop transmission of integrated data."""
self._integrator.stop_integrated_data()
# Superclass method implementations
[docs]
def initialise(self: TpmTestFirmware) -> bool:
"""
Initialise TpmTestFirmware.
:return: success status
"""
logging.info("TpmTestFirmware has been initialised")
return True
[docs]
def status_check(self: TpmTestFirmware) -> Any:
"""
Perform status check.
:return: Status
"""
logging.info("TpmTestFirmware : Checking status")
return Status.OK
[docs]
def clean_up(self: TpmTestFirmware) -> bool:
"""
Perform cleanup.
:return: Success
"""
logging.info("TpmTestFirmware : Cleaning up")
return True