Source code for mesh2hrtf.Output2HRTF.output2hrtf

"""
Python tools for Mesh2HRTF including functions to generate SOFA files
containing the HRTF/HRIR data, merge SOFA files containing data for the left
and right ear and generate evaluation grids.
"""
import os
import csv
import warnings
import numpy as np
import json
import sofar as sf
import mesh2hrtf as m2h


[docs]def output2hrtf(folder=None): """ Process NumCalc output and write data to disk. Processing the data is done in the following steps 1. Read project parameter `from parameters.json` 2. use :py:func:`~write_output_report` to parse files in project_folder/NumCalc/source_*/NC*.out, write project report to project_folder/Output2HRTF/report_source_*.csv. Raise a warning if any issues were detected and write report_issues.txt to the same folder 3. Read simulated pressures from project_folder/NumCalc/source_*/be.out. This and the following steps are done, even if an issue was detected in the previous step 4. use :py:func:`~mesh2hrtf.reference_hrtfs` and :py:func:`~mesh2hrtf.compute_hrirs` to save the results to SOFA files Parameters ---------- folder : str, optional The path of the Mesh2HRTF project folder, i.e., the folder containing the subfolders EvaluationsGrids, NumCalc, and ObjectMeshes. The default, ``None`` uses the current working directory. """ # check input if folder is None: folder = os.getcwd() # check and load parameters, required parameters are: # Mesh2HRTF_version, reference, computeHRIRs, speedOfSound, densityOfAir, # numSources, sourceType, sourceCenter, sourceArea, # numFrequencies, frequencies params = os.path.join(folder, "parameters.json") if not os.path.isfile(params): raise ValueError(( f"The folder {folder} is not a valid Mesh2HRTF project. " "It must contain the file 'parameters.json'")) with open(params, "r") as file: params = json.load(file) # output directory if not os.path.exists(os.path.join(folder, 'Output2HRTF')): os.makedirs(os.path.join(folder, 'Output2HRTF')) # write the project report and check for issues print('\n Writing the project report ...') found_issues, report = m2h.write_output_report(folder) if found_issues: warnings.warn(report) # get the evaluation grids print('\n Loading the EvaluationGrids ...') evaluationGrids, _ = _read_nodes_and_elements( os.path.join(folder, 'EvaluationGrids')) # Load EvaluationGrid data if not len(evaluationGrids) == 0: print('\nLoading data for the evaluation grids ...') pressure, _ = _read_numcalc_data( params["numSources"], params["numFrequencies"], folder, 'pEvalGrid') # save to struct cnt = 0 for grid in evaluationGrids: evaluationGrids[grid]["pressure"] = pressure[ cnt:cnt+evaluationGrids[grid]["num_nodes"], :, :] cnt = cnt + evaluationGrids[grid]["num_nodes"] del grid, pressure, cnt # process BEM data for writing HRTFs and HRIRs to SOFA files for grid in evaluationGrids: print(f'\nSaving HRTFs for EvaluationGrid "{grid}" ...\n') # get pressure as SOFA object (all following steps are run on SOFA # objects. This way they are available to other users as well) sofa = _get_sofa_object( evaluationGrids[grid]["pressure"], evaluationGrids[grid]["nodes"][:, 1:4], "cartesian", params["sourceCenter"], "HRTF", params["Mesh2HRTF_Version"], frequencies=params["frequencies"]) # reference to sound pressure at the center of the head if params["reference"]: sofa = m2h.reference_hrtfs( sofa, params["sourceType"], params["sourceArea"], params["speedOfSound"], params["densityOfMedium"], mode="min") # write HRTF data to SOFA file sf.write_sofa(os.path.join( folder, 'Output2HRTF', f'HRTF_{grid}.sofa'), sofa) # calculate and write HRIRs if params["computeHRIRs"]: if not params["reference"]: raise ValueError("Computing HRIRs requires prior referencing") # calculate shift value (equivalent to a 30 cm shift) sampling_rate = round(2*sofa.N[-1]) n_shift = int(np.round( .30 / (1/sampling_rate * params["speedOfSound"]))) sofa = m2h.compute_hrirs(sofa, n_shift) sf.write_sofa(os.path.join( folder, 'Output2HRTF', f'HRIR_{grid}.sofa'), sofa) print('Done\n')
def _read_nodes_and_elements(folder, objects=None): """ Read the nodes and elements of the evaluation grids or object meshes. Parameters ---------- folder : str Folder containing the object. Must end with EvaluationGrids or Object Meshes objects : str, options Name of the object. The default ``None`` reads all objects in folder Returns ------- grids : dict One item per object (with the item name being the object name). Each item has the sub-items `nodes`, `elements`, `num_nodes`, `num_elements` gridsNumNodes : int Number of nodes in all grids """ # check input if os.path.basename(folder) not in ['EvaluationGrids', 'ObjectMeshes']: raise ValueError('folder must be EvaluationGrids or ObjectMeshes!') if objects is None: objects = os.listdir(folder) # discard hidden folders that might occur on Mac OS objects = [o for o in objects if not o.startswith('.')] elif isinstance(objects, str): objects = [objects] grids = {} gridsNumNodes = 0 for grid in objects: tmpNodes = np.loadtxt(os.path.join( folder, grid, 'Nodes.txt'), delimiter=' ', skiprows=1) tmpElements = np.loadtxt(os.path.join( folder, grid, 'Elements.txt'), delimiter=' ', skiprows=1) grids[grid] = { "nodes": tmpNodes, "elements": tmpElements, "num_nodes": tmpNodes.shape[0], "num_elements": tmpElements.shape[0]} gridsNumNodes += grids[grid]['num_nodes'] return grids, gridsNumNodes def _read_numcalc_data(numSources, numFrequencies, folder, data): """Read the sound pressure on the object meshes or evaluation grid.""" pressure = [] if data not in ['pBoundary', 'pEvalGrid', 'vBoundary', 'vEvalGrid']: raise ValueError( 'data must be pBoundary, pEvalGrid, vBoundary, or vEvalGrid') print('\n Loading %s data ...' % data) for source in range(numSources): print('\n Source %d ...' % (source+1)) tmpFilename = os.path.join( folder, 'NumCalc', f'source_{source+1}', 'be.out') tmpPressure, indices = _output_to_hrtf_load( tmpFilename, data, numFrequencies) pressure.append(tmpPressure) pressure = np.transpose(np.array(pressure), (2, 0, 1)) return pressure, indices def _get_sofa_object(data, source_position, source_position_type, receiver_position, mode, Mesh2HRTF_version, frequencies=None, sampling_rate=None): """ Write complex pressure or impulse responses to a SOFA object. Parameters ---------- data : numpy array The data as an array of shape (MRE) evaluation_grid : numpy array The evaluation grid in Cartesian coordinates as an array of shape (MC) receiver_position : numpy array The position of the receivers (ears) in Cartesian coordinates mode : str "HRTF" to save HRTFs, "HRIR" to save HRIRs Mesh2HRTF_version : str frequencies : numpy array The frequencies at which the HRTFs were calculated. Required if mode is "HRTF" sampling_rate : The sampling rate. Required if mode is "HRIR" Returns ------- sofa : sofar.Sofa object """ # get source coordinates in spherical convention if source_position_type == "cartesian": xyz = source_position radius = np.sqrt(xyz[:, 0]**2 + xyz[:, 1]**2 + xyz[:, 2]**2) z_div_r = np.where(radius != 0, xyz[:, 2] / radius, 0) elevation = 90 - np.arccos(z_div_r) / np.pi * 180 azimuth = np.mod(np.arctan2(xyz[:, 1], xyz[:, 0]), 2 * np.pi) \ / np.pi * 180 source_position = np.concatenate( (azimuth[..., np.newaxis], elevation[..., np.newaxis], radius[..., np.newaxis]), axis=-1) # get number of sources from data numSources = data.shape[1] # create empty SOFA object if mode == "HRTF": convention = "SimpleFreeFieldHRTF" if numSources == 2 else "GeneralTF" else: convention = "SimpleFreeFieldHRIR" if numSources == 2 else "GeneralFIR" sofa = sf.Sofa(convention) # write meta data sofa.GLOBAL_ApplicationName = 'Mesh2HRTF' sofa.GLOBAL_ApplicationVersion = Mesh2HRTF_version sofa.GLOBAL_History = "numerically simulated data" # Source and receiver data sofa.SourcePosition = source_position sofa.SourcePosition_Units = "degree, degree, meter" sofa.SourcePosition_Type = "spherical" sofa.ReceiverPosition = receiver_position sofa.ReceiverPosition_Units = "meter" sofa.ReceiverPosition_Type = "cartesian" # HRTF/HRIR data if mode == "HRTF": sofa.Data_Real = np.real(data) sofa.Data_Imag = np.imag(data) sofa.N = np.array(frequencies).flatten() else: sofa.Data_IR = data sofa.Data_SamplingRate = sampling_rate sofa.Data_Delay = np.zeros((1, data.shape[1])) return sofa def _output_to_hrtf_load(foldername, filename, numFrequencies): """ Load results of the BEM calculation. Parameters ---------- foldername : string The folder from which the data is loaded. The data to be read is located in the folder be.out inside NumCalc/source_* filename : string The kind of data that is loaded pBoundary The sound pressure on the object mesh vBoundary The sound velocity on the object mesh pEvalGrid The sound pressure on the evaluation grid vEvalGrid The sound velocity on the evaluation grid numFrequencies : int the number of simulated frequencies Returns ------- data : numpy array Pressure or abs velocity values of shape (numFrequencies, numEntries) """ # ---------------------check number of header and data lines--------------- current_file = os.path.join(foldername, 'be.1', filename) numDatalines = None with open(current_file) as file: line = csv.reader(file, delimiter=' ', skipinitialspace=True) for idx, li in enumerate(line): # read number of data points and head lines if len(li) == 2 and not li[0].startswith("Mesh"): numDatalines = int(li[1]) # read starting index elif numDatalines and len(li) > 2: start_index = int(li[0]) break # ------------------------------load data---------------------------------- dtype = complex if filename.startswith("p") else float data = np.zeros((numFrequencies, numDatalines), dtype=dtype) for ii in range(numFrequencies): tmpData = [] current_file = os.path.join(foldername, 'be.%d' % (ii+1), filename) with open(current_file) as file: line = csv.reader(file, delimiter=' ', skipinitialspace=True) for li in line: # data lines have 3 ore more entries if len(li) < 3 or li[0].startswith("Mesh"): continue if filename.startswith("p"): tmpData.append(complex(float(li[1]), float(li[2]))) elif filename == "vBoundary": tmpData.append(np.abs(complex(float(li[1]), float(li[2])))) elif filename == "vEvalGrid": tmpData.append(np.sqrt( np.abs(complex(float(li[1]), float(li[2])))**2 + np.abs(complex(float(li[3]), float(li[4])))**2 + np.abs(complex(float(li[5]), float(li[6])))**2)) data[ii, :] = tmpData if tmpData else np.nan return data, np.arange(start_index, numDatalines + start_index)