Source code for pyaavs.tile_1_6

# type: ignore
# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.
"""
Hardware functions for the TPM 1.6 hardware.
"""
import functools
import logging
import time
import os

from pyfabil.base.definitions import Device, LibraryError, BoardError
from pyfabil.base.utils import ip2long
from pyfabil.boards.tpm_1_6 import TPM_1_6
from pyaavs.tile import Tile


# Helper to disallow certain function calls on unconnected tiles
[docs] def connected(f): """ Helper to disallow certain function calls on unconnected tiles. :param f: the method wrapped by this helper :type f: callable :return: the wrapped method :rtype: callable """ @functools.wraps(f) def wrapper(self, *args, **kwargs): """ Wrapper that checks the TPM is connected before allowing the wrapped method to proceed. :param self: the method called :type self: object :param args: positional arguments to the wrapped method :type args: list :param kwargs: keyword arguments to the wrapped method :type kwargs: dict :raises LibraryError: if the TPM is not connected :return: whatever the wrapped method returns :rtype: object """ if self.tpm is None: self.logger.warning( "Cannot call function " + f.__name__ + " on unconnected TPM" ) raise LibraryError( "Cannot call function " + f.__name__ + " on unconnected TPM" ) else: return f(self, *args, **kwargs) return wrapper
[docs] class Tile_1_6(Tile): """ Tile hardware interface library. Methods specific for TPM 1.6. """ def __init__( self, ip="10.0.10.2", port=10000, lmc_ip="10.0.10.1", lmc_port=4660, sampling_rate=800e6, logger=None, ): """ Iniitalise a new Tile16 instance. :param logger: the logger to be used by this Command. If not provided, then a default module logger will be used. :type logger: :py:class:`logging.Logger` :param ip: IP address of the hardware :type ip: str :param port: UCP Port address of the hardware port :type port: int :param lmc_ip: IP address of the MCCS DAQ recevier :type lmc_ip: str :param lmc_port: UCP Port address of the MCCS DAQ receiver :type lmc_port: int :param sampling_rate: ADC sampling rate :type sampling_rate: float """ super(Tile_1_6, self).__init__( ip, port, lmc_ip, lmc_port, sampling_rate, logger ) # Override inherited preadu signal map attribute for TPM 1.6 preadu self.preadu_signal_map = {0: {'preadu_id': 1, 'channel': 0}, 1: {'preadu_id': 1, 'channel': 1}, 2: {'preadu_id': 1, 'channel': 2}, 3: {'preadu_id': 1, 'channel': 3}, 4: {'preadu_id': 1, 'channel': 4}, 5: {'preadu_id': 1, 'channel': 5}, 6: {'preadu_id': 1, 'channel': 6}, 7: {'preadu_id': 1, 'channel': 7}, 8: {'preadu_id': 0, 'channel': 14}, 9: {'preadu_id': 0, 'channel': 15}, 10: {'preadu_id': 0, 'channel': 12}, 11: {'preadu_id': 0, 'channel': 13}, 12: {'preadu_id': 0, 'channel': 10}, 13: {'preadu_id': 0, 'channel': 11}, 14: {'preadu_id': 0, 'channel': 8}, 15: {'preadu_id': 0, 'channel': 9}, 16: {'preadu_id': 1, 'channel': 8}, 17: {'preadu_id': 1, 'channel': 9}, 18: {'preadu_id': 1, 'channel': 10}, 19: {'preadu_id': 1, 'channel': 11}, 20: {'preadu_id': 1, 'channel': 12}, 21: {'preadu_id': 1, 'channel': 13}, 22: {'preadu_id': 1, 'channel': 14}, 23: {'preadu_id': 1, 'channel': 15}, 24: {'preadu_id': 0, 'channel': 6}, 25: {'preadu_id': 0, 'channel': 7}, 26: {'preadu_id': 0, 'channel': 4}, 27: {'preadu_id': 0, 'channel': 5}, 28: {'preadu_id': 0, 'channel': 2}, 29: {'preadu_id': 0, 'channel': 3}, 30: {'preadu_id': 0, 'channel': 0}, 31: {'preadu_id': 0, 'channel': 1}} # Main functions ------------------------------------
[docs] def tpm_version(self): """ Determine whether this is a TPM V1.2 or TPM V1.6 :return: TPM hardware version :rtype: string """ return "tpm_v1_6"
[docs] def connect( self, initialise=False, load_plugin=True, enable_ada=False, enable_adc=True, dsp_core=True, adc_mono_channel_14_bit=False, adc_mono_channel_sel=0, ): """ Connect to the hardware and loads initial configuration. :param initialise: Initialises the TPM object :type initialise: bool :param load_plugin: loads software plugins :type load_plugin: bool :param enable_ada: Enable ADC amplifier (usually not present) :type enable_ada: bool :param enable_adc: Enable ADC :type enable_adc: bool :param dsp_core: Enable loading of DSP core plugins :type dsp_core: bool :param adc_mono_channel_14_bit: Enable ADC mono channel 14bit mode :type adc_mono_channel_14_bit: bool :param adc_mono_channel_sel: Select channel in mono channel mode (0=A, 1=B) :type adc_mono_channel_sel: int """ # Try to connect to board, if it fails then set tpm to None self.tpm = TPM_1_6() # Add plugin directory (load module locally) tf = __import__("pyaavs.plugins.tpm_1_6.tpm_test_firmware", fromlist=[None]) self.tpm.add_plugin_directory(os.path.dirname(tf.__file__)) # Connect using tpm object. # simulator parameter is used not to load the TPM specific plugins, # no actual simulation is performed. try: self.tpm.connect( ip=self._ip, port=self._port, initialise=initialise, simulator=not load_plugin, enable_ada=enable_ada, enable_adc=enable_adc, fsample=self._sampling_rate, mono_channel_14_bit=adc_mono_channel_14_bit, mono_channel_sel=adc_mono_channel_sel, ) except (BoardError, LibraryError): self.tpm = None self.logger.error("Failed to connect to board at " + self._ip) return # Load tpm test firmware for both FPGAs (no need to load in simulation) if load_plugin and self.tpm.is_programmed(): for device in [Device.FPGA_1, Device.FPGA_2]: self.tpm.load_plugin( "Tpm_1_6_TestFirmware", device=device, fsample=self._sampling_rate, dsp_core=dsp_core, logger=self.logger, ) elif not self.tpm.is_programmed(): self.logger.warning("TPM is not programmed! No plugins loaded")
[docs] def initialise(self, station_id=0, tile_id=0, lmc_use_40g=False, lmc_dst_ip=None, lmc_dst_port=4660, lmc_integrated_use_40g=False, src_ip_fpga1=None, src_ip_fpga2=None, dst_ip_fpga1=None, dst_ip_fpga2=None, src_port=4661, dst_port=4660, dst_port_single_port_mode=4662, rx_port_single_port_mode=4662, netmask_40g=None, gateway_ip_40g=None, active_40g_ports_setting="port1-only", enable_adc=True, enable_ada=False, enable_test=False, use_internal_pps=False, pps_delay=0, time_delays=0, is_first_tile=False, is_last_tile=False, qsfp_detection="auto", adc_mono_channel_14_bit=False, adc_mono_channel_sel=0): """ Connect and initialise. :param enable_ada: enable adc amplifier, Not present in most TPM versions :type enable_ada: bool :param enable_test: setup internal test signal generator instead of ADC :param enable_adc: Enable ADC :type enable_adc: bool :type enable_test: bool :param use_internal_pps: use internal PPS generator synchronised across FPGAs :type use_internal_pps: bool :param qsfp_detection: "auto" detects QSFP cables automatically, "qsfp1", force QSFP1 cable detected, QSFP2 cable not detected "qsfp2", force QSFP1 cable not detected, QSFP2 cable detected "all", force QSFP1 and QSFP2 cable detected "none", force no cable not detected :type qsfp_detection: str :param adc_mono_channel_14_bit: Enable ADC mono channel 14bit mode :type adc_mono_channel_14_bit: bool :param adc_mono_channel_sel: Select channel in mono channel mode (0=A, 1=B) :type adc_mono_channel_sel: int """ if use_internal_pps: logging.error("Cannot initialise board - use_internal_pps = True not supported") return # Connect to board self.connect(initialise=True, enable_ada=enable_ada, enable_adc=enable_adc, adc_mono_channel_14_bit=adc_mono_channel_14_bit, adc_mono_channel_sel=adc_mono_channel_sel) # Hack to reset MCU # self.tpm[0x30000120] = 0 # Before initialing, check if TPM is programmed if not self.tpm.is_programmed(): self.logger.error("Cannot initialise board which is not programmed") return # Disable debug UDP header self["board.regfile.ena_header"] = 0x1 # write PPS delay correction variable into the FPGAs if pps_delay < -128 or pps_delay > 127: self.logger.error("PPS delay out of range [-128, 127]") return self["fpga1.pps_manager.sync_tc.cnt_2"] = pps_delay & 0xFF self["fpga2.pps_manager.sync_tc.cnt_2"] = pps_delay & 0xFF # Initialise firmware plugin for firmware in self.tpm.tpm_test_firmware: firmware.initialise_firmware() # Set station and tile IDs self.set_station_id(station_id, tile_id) # Set LMC IP self.tpm.set_lmc_ip(self._lmc_ip, self._lmc_port) # Enable C2C streaming self.tpm["board.regfile.ena_stream"] = 0x1 # self.tpm['board.regfile.ethernet_pause'] = 10000 self.set_c2c_burst() # Display Temperature during initialisation logging.info(f"Board Temperature - {round(self.get_temperature(), 1)} C") # Switch off both PREADUs self.tpm.tpm_preadu[0].switch_off() self.tpm.tpm_preadu[1].switch_off() # Switch on preadu for preadu in self.tpm.tpm_preadu: preadu.switch_on() time.sleep(1) preadu.read_configuration() # Synchronise FPGAs self.sync_fpga_time(use_internal_pps=False) # Initialize f2f link for f2f in self.tpm.tpm_f2f: f2f.assert_reset() for f2f in self.tpm.tpm_f2f: f2f.deassert_reset() # AAVS-only - swap polarisations due to remapping performed by preadu self.tpm["fpga1.jesd204_if.regfile_pol_switch"] = 0b11110000 self.tpm["fpga2.jesd204_if.regfile_pol_switch"] = 0b11110000 # Reset test pattern generator for _test_generator in self.tpm.test_generator: _test_generator.channel_select(0x0000) _test_generator.disable_prdg() # Use test_generator plugin instead! if enable_test: # Test pattern. Tones on channels 72 & 75 + pseudo-random noise self.logger.info("Enabling test pattern") for generator in self.tpm.test_generator: generator.set_tone(0, 72 * self._sampling_rate / 1024, 0.0) generator.enable_prdg(0.4) generator.channel_select(0xFFFF) # Configure Active 40G ports self.configure_active_40g_ports(active_40g_ports_setting) # Set destination and source IP/MAC/ports for 10G cores # This will create a loopback between the two FPGAs self.set_default_eth_configuration( src_ip_fpga1=src_ip_fpga1, src_ip_fpga2=src_ip_fpga2, dst_ip_fpga1=dst_ip_fpga1, dst_ip_fpga2=dst_ip_fpga2, src_port=src_port, dst_port=dst_port, channel2_dst_port=dst_port_single_port_mode, channel2_rx_port=rx_port_single_port_mode, netmask_40g=netmask_40g, gateway_ip_40g=gateway_ip_40g, qsfp_detection=qsfp_detection) for firmware in self.tpm.tpm_test_firmware: if not firmware.check_ddr_initialisation(): firmware.initialise_ddr() # Configure standard data streams if lmc_use_40g: logging.info("Using 10G for LMC traffic") self.set_lmc_download("10g", 8192, dst_ip=lmc_dst_ip, dst_port=lmc_dst_port, netmask_40g=netmask_40g, gateway_ip_40g=gateway_ip_40g) else: logging.info("Using 1G for LMC traffic") self.set_lmc_download("1g") # Configure integrated data streams if lmc_integrated_use_40g: logging.info("Using 10G for integrated LMC traffic") self.set_lmc_integrated_download("10g", 1024, 2048, dst_ip=lmc_dst_ip, dst_port=lmc_dst_port, netmask_40g=netmask_40g, gateway_ip_40g=gateway_ip_40g) else: logging.info("Using 1G for integrated LMC traffic") self.set_lmc_integrated_download("1g", 1024, 2048) # Set time delays self.set_time_delays(time_delays) # set first/last tile flag for _station_beamf in self.tpm.station_beamf: _station_beamf.set_first_last_tile(is_first_tile, is_last_tile) # Clear Health Monitoring Following Initialisation # Clears any false errors detected from bring-up # Bitfiles sbf410 and older do not support health monitoring # Check for existance of moved pps register if self.tpm.has_register('fpga1.pps_manager.pps_errors'): self.enable_health_monitoring() self.clear_health_status()
[docs] def f2f_aurora_test_start(self): """Start test on Aurora f2f link.""" for f2f in self.tpm.tpm_f2f: f2f.start_tx_test() for f2f in self.tpm.tpm_f2f: f2f.start_rx_test()
[docs] def f2f_aurora_test_check(self): """Get test results for Aurora f2f link Tests printed on stdout.""" for f2f in self.tpm.tpm_f2f: f2f.get_test_result()
[docs] def f2f_aurora_test_stop(self): """Stop test on Aurora f2f link.""" for f2f in self.tpm.tpm_f2f: f2f.stop_test()
[docs] def is_qsfp_module_plugged(self, qsfp_id=0): """ Initialise firmware components. :return: True when cable is detected """ qsfp_status = self.tpm.tpm_qsfp_adapter[qsfp_id].get('ModPrsL') if qsfp_status == 0: return True else: return False