Source code for prfpy.model

import numpy as np
import scipy.signal as signal
from nilearn.glm.first_level.hemodynamic_models import spm_hrf, spm_time_derivative, spm_dispersion_derivative

from .rf import gauss2D_iso_cart, gauss1D_cart # import required RF shapes
from .timecourse import stimulus_through_prf, \
    convolve_stimulus_dm, \
    generate_random_cosine_drifts, \
    generate_arima_noise, \
    filter_predictions
    


[docs]class Model(object): """Model Class that takes care of generating grids for pRF fitting and simulations """ def __init__(self, stimulus): """__init__ constructor for Model, takes stimulus object as argument Parameters ---------- stimulus : PRFStimulus2D or PRFStimulusDD Stimulus object containing information about the stimulus, and the space in which it lives. """ self.stimulus = stimulus
[docs] def create_hrf(self, hrf_params): """ construct single or multiple HRFs Parameters ---------- hrf_params : TYPE, optional DESCRIPTION. The default is [1.0, 1.0, 0.0]. Returns ------- hrf : ndarray the hrf. """ hrf = np.array( [ np.ones_like(hrf_params[1], dtype='float32')*hrf_params[0] * spm_hrf( tr=self.stimulus.TR, oversampling=1, time_length=40)[...,np.newaxis], hrf_params[1] * spm_time_derivative( tr=self.stimulus.TR, oversampling=1, time_length=40)[...,np.newaxis], hrf_params[2] * spm_dispersion_derivative( tr=self.stimulus.TR, oversampling=1, time_length=40)[...,np.newaxis]], dtype='float32').sum( axis=0) return hrf.T
[docs] def convolve_timecourse_hrf(self, tc, hrf): """ Convolve neural timecourses with single or multiple hrfs. Parameters ---------- tc : ndarray, 1D or 2D The timecourse(s) to be convolved. hrf : ndarray, 1D or 2D The HRF. Can be single, or a different one for each timecourse. Returns ------- convolved_tc : ndarray Convolved timecourse. """ #scipy fftconvolve does not have padding options so doing it manually pad_length = 20 pad = np.tile(tc[:,0], (pad_length,1)).T padded_tc = np.hstack((pad,tc)) if hrf.shape[0]>1: assert hrf.shape[0] == tc.shape[0], f"{hrf.shape[0]} HRFs provided vs {tc.shape[0]} timecourses" median_hrf = np.median(hrf, axis=0).reshape(1,-1) if np.all([np.allclose(median_hrf, single_hrf.reshape(1,-1)) for single_hrf in hrf]): convolved_tc = signal.fftconvolve(padded_tc, median_hrf, axes=(-1))[..., pad_length:tc.shape[-1]+pad_length] else: convolved_tc = np.zeros_like(tc) for n_ in range(hrf.shape[0]): convolved_tc[n_,:] = signal.fftconvolve(padded_tc[n_,:],hrf[n_,:])[..., pad_length:tc.shape[-1]+pad_length] else: convolved_tc = signal.fftconvolve(padded_tc, hrf, axes=(-1))[..., pad_length:tc.shape[-1]+pad_length] return convolved_tc
[docs] def create_drifts_and_noise(self, drift_ranges=[[0, 0]], noise_ar=None, noise_ma=(1, 0.0), noise_amplitude=1.0): """add_drifs_and_noise creates noise and drifts of size equal to the predictions Parameters ---------- drift_ranges : list of 2-lists of floats, optional specifies the lower- and upper bounds of the ranges of each of the discrete cosine low-pass components to be generated noise_ar : 2x2 list. argument passed to timecourse.generate_arima_noise (the default is None, for no noise) noise_amplitude : float, optional """ assert hasattr( self, 'predictions'), "please first create the grid to which to add noise" self.random_drifts = generate_random_cosine_drifts( dimensions=self.predictions.shape, amplitude_ranges=drift_ranges) if noise_ar is not None: self.random_noise = generate_arima_noise( ar=noise_ar, ma=noise_ma, dimensions=self.predictions.shape) * noise_amplitude else: self.random_noise = np.zeros_like(self.predictions)
[docs]class Iso2DGaussianModel(Model): """Iso2DGaussianModel To extend please create a setup_XXX_grid function for any new way of defining grids. """ def __init__(self, stimulus, hrf=[1.0, 1.0, 0.0], filter_predictions=False, filter_type='dc', filter_params={}, normalize_RFs=False, **kwargs): """__init__ for Iso2DGaussianModel constructor, sets up stimulus and hrf for this Model Parameters ---------- stimulus : PRFStimulus2D Stimulus object specifying the information about the stimulus, and the space in which it lives. hrf : string, list or numpy.ndarray, optional HRF shape for this Model. Can be 'direct', which implements nothing (for eCoG or later convolution), a list or array of 3, which are multiplied with the three spm HRF basis functions, and an array already sampled on the TR by the user. (the default is None, which implements standard spm HRF) filter_predictions : boolean, optional whether to high-pass filter the predictions, default False filter_type, filter_params : see timecourse.py normalize_RFs : whether or not to normalize the RF volumes (generally not needed). """ super().__init__(stimulus) self.__dict__.update(kwargs) # HRF stuff if isinstance(hrf, str): if hrf == 'direct': # for use with anything like eCoG with instantaneous irf self.hrf = 'direct' #self.stimulus.convolved_design_matrix = np.copy(stimulus.design_matrix) else: # some specific hrf with spm basis set if ((isinstance(hrf, list)) or (isinstance(hrf, np.ndarray))) and len(hrf) == 3: self.hrf_params = np.copy(hrf) if hrf[0] == 1: self.hrf = self.create_hrf(hrf_params=hrf) else: print("WARNING: hrf[0] is not 1. this will confound it with amplitude\ parameters. consider setting it to 1 unless you are absolutely sure of what you are doing.\ this will also prevent you from fitting the HRF.") self.hrf = self.create_hrf(hrf_params=hrf) # some specific hrf already defined at the TR (!) # elif isinstance(hrf, np.ndarray) and len(hrf) > 3: elif isinstance(hrf, np.ndarray) and hrf.shape[0] == 1 and hrf.shape[1] > 3: self.hrf = np.copy(hrf) #self.stimulus.convolved_design_matrix = convolve_stimulus_dm( # stimulus.design_matrix, hrf=self.hrf) # filtering and other stuff self.filter_predictions = filter_predictions self.filter_type = filter_type #settings for filter self.filter_params = filter_params #adding stimulus parameters self.filter_params['task_lengths'] = self.stimulus.task_lengths self.filter_params['task_names'] = self.stimulus.task_names self.filter_params['late_iso_dict'] = self.stimulus.late_iso_dict #normalizing RFs to have volume 1 self.normalize_RFs = normalize_RFs
[docs] def create_grid_predictions(self, mu_x, mu_y, size, hrf_1=None, hrf_2=None): """create_predictions creates predictions for a given set of parameters see return_prediction Parameters ---------- """ n_predictions = len(mu_x) if hrf_1 is not None and hrf_2 is not None: if not hasattr(hrf_1, 'shape') and not hasattr(hrf_2, 'shape'): hrf_1 = hrf_1 * np.ones(n_predictions) hrf_2 = hrf_2 * np.ones(n_predictions) prediction_params = [mu_x, mu_y, size, 1.0*np.ones(n_predictions), 0.0*np.ones(n_predictions), hrf_1, hrf_2] return self.return_prediction(*prediction_params).astype('float32')
[docs] def return_prediction(self, mu_x, mu_y, size, beta, baseline, hrf_1=None, hrf_2=None): """return_prediction returns the prediction for a single set of parameters. As this is to be used during iterative search, it also has arguments beta and baseline. Parameters ---------- mu_x : float x-position of pRF mu_y : float y-position of pRF size : float size of pRF beta : float amplitude of pRF baseline : float baseline of pRF hrf_1, hrf_2 : floats, optional if specified, will be used to create HRF. Returns ------- numpy.ndarray single prediction given the model """ if hrf_1 is None or hrf_2 is None: current_hrf = self.hrf else: current_hrf = self.create_hrf([1.0, hrf_1, hrf_2]) # create the single rf rf = np.rot90(gauss2D_iso_cart(x=self.stimulus.x_coordinates[..., np.newaxis], y=self.stimulus.y_coordinates[..., np.newaxis], mu=(mu_x, mu_y), sigma=size, normalize_RFs=self.normalize_RFs).T, axes=(1,2)) dm = self.stimulus.design_matrix if type(current_hrf) == str: if current_hrf == 'direct': tc = stimulus_through_prf(rf, dm, self.stimulus.dx) else: tc = self.convolve_timecourse_hrf(stimulus_through_prf(rf, dm, self.stimulus.dx), current_hrf) if not self.filter_predictions: return baseline[..., np.newaxis] + beta[..., np.newaxis] * tc else: return baseline[..., np.newaxis] + beta[..., np.newaxis] * filter_predictions( tc, self.filter_type, self.filter_params)
[docs]class CSS_Iso2DGaussianModel(Iso2DGaussianModel):
[docs] def create_grid_predictions(self, gaussian_params, nn, hrf_1=None, hrf_2=None): """create_predictions creates predictions for a given set of parameters [description] Parameters ---------- gaussian_params: ndarray size (3) containing prf position and size. nn: ndarrays containing the range of grid values for other CSS model parameters (exponent) """ n_predictions = len(nn) if hrf_1 is not None and hrf_2 is not None: if not hasattr(hrf_1, 'shape') and not hasattr(hrf_2, 'shape'): hrf_1 = hrf_1 * np.ones(n_predictions) hrf_2 = hrf_2 * np.ones(n_predictions) prediction_params = [gaussian_params[0]*np.ones(n_predictions), gaussian_params[1]*np.ones(n_predictions), gaussian_params[2]*np.ones(n_predictions)* np.sqrt(nn), 1.0*np.ones(n_predictions), 0.0*np.ones(n_predictions), nn, hrf_1, hrf_2] return self.return_prediction(*prediction_params).astype('float32')
[docs] def return_prediction(self, mu_x, mu_y, size, beta, baseline, n, hrf_1=None, hrf_2=None): """return_prediction returns the prediction for a single set of parameters. As this is to be used during iterative search, it also has arguments beta and baseline. Parameters ---------- mu_x : float x-position of pRF mu_y : float y-position of pRF size : float size of pRF beta : float, optional amplitude of pRF (the default is 1) baseline : float, optional baseline of pRF (the default is 0) n : float, optional exponent of pRF (the default is 1, which is a linear Gaussian) hrf_1, hrf_2 : floats, optional if specified, will be used to create HRF. Returns ------- numpy.ndarray single prediction given the model """ if hrf_1 is None or hrf_2 is None: current_hrf = self.hrf else: current_hrf = self.create_hrf([1.0, hrf_1, hrf_2]) # create the single rf rf = np.rot90(gauss2D_iso_cart(x=self.stimulus.x_coordinates[..., np.newaxis], y=self.stimulus.y_coordinates[..., np.newaxis], mu=(mu_x, mu_y), sigma=size, normalize_RFs=self.normalize_RFs).T, axes=(1,2)) dm = self.stimulus.design_matrix if type(current_hrf) == str: if current_hrf == 'direct': tc = stimulus_through_prf(rf, dm, self.stimulus.dx)**n[..., np.newaxis] else: tc = self.convolve_timecourse_hrf(stimulus_through_prf(rf, dm, self.stimulus.dx)**n[..., np.newaxis], current_hrf) if not self.filter_predictions: return baseline[..., np.newaxis] + beta[..., np.newaxis] * tc else: return baseline[..., np.newaxis] + beta[..., np.newaxis] * filter_predictions( tc, self.filter_type, self.filter_params)
[docs]class Norm_Iso2DGaussianModel(Iso2DGaussianModel): """Norm_Iso2DGaussianModel Redefining class for normalization model """
[docs] def create_grid_predictions(self, mu_x,mu_y,size, sa, ss, nb, sb, hrf_1=None, hrf_2=None): """create_predictions creates predictions for a given set of parameters [description] Parameters ---------- gaussian_params: ndarray size (3) containing prf position and size. sa,ss,nb,sb: ndarrays containing the range of grid values for other norm model parameters (surroud amplitude (C), surround size (sigma_2), neural baseline (B), surround baseline (D)) hrf_1, hrf_2 : floats, optional if specified, will be used to create HRF. """ n_predictions = len(sa) if hrf_1 is not None and hrf_2 is not None: if not hasattr(hrf_1, 'shape') and not hasattr(hrf_2, 'shape'): hrf_1 = hrf_1 * np.ones(n_predictions) hrf_2 = hrf_2 * np.ones(n_predictions) prediction_params = [mu_x, mu_y, size, 1.0*np.ones(n_predictions), 0.0*np.ones(n_predictions), sa, ss, nb, sb, hrf_1, hrf_2] return self.return_prediction(*prediction_params)
[docs] def return_prediction(self, mu_x, mu_y, prf_size, prf_amplitude, bold_baseline, srf_amplitude, srf_size, neural_baseline, surround_baseline, hrf_1=None, hrf_2=None ): """return_prediction [summary] returns the prediction for a single set of parameters. Parameters ---------- mu_x : float x position mu_y : float y position prf_size : float sigma_1 prf_amplitude : float Norm Param A bold_baseline : float BOLD baseline (generally kept fixed) neural_baseline : float Norm Param B srf_amplitude : float Norm Param C srf_size : float sigma_2 surround_baseline : float Norm Param D hrf_1, hrf_2 : floats, optional if specified, will be used to create HRF. Returns ------- numpy.ndarray prediction(s) given the model """ if hrf_1 is None or hrf_2 is None: current_hrf = self.hrf else: current_hrf = self.create_hrf([1.0, hrf_1, hrf_2]) # slight memory load improvement to avoid holding all rfs in memory activation_part = stimulus_through_prf(np.rot90(gauss2D_iso_cart(x=self.stimulus.x_coordinates[..., np.newaxis], y=self.stimulus.y_coordinates[..., np.newaxis], mu=(mu_x, mu_y), sigma=prf_size, normalize_RFs=self.normalize_RFs).T, axes=(1,2)), self.stimulus.design_matrix, self.stimulus.dx).astype('float32') # surround receptive field (denominator) normalization_part = stimulus_through_prf(np.rot90(gauss2D_iso_cart(x=self.stimulus.x_coordinates[..., np.newaxis], y=self.stimulus.y_coordinates[..., np.newaxis], mu=(mu_x, mu_y), sigma=srf_size, normalize_RFs=self.normalize_RFs).T, axes=(1,2)), self.stimulus.design_matrix, self.stimulus.dx).astype('float32') # create normalization model timecourse if type(current_hrf) == str: if current_hrf == 'direct': tc = ((prf_amplitude[..., np.newaxis] * activation_part + neural_baseline[..., np.newaxis]) /\ (srf_amplitude[..., np.newaxis] * normalization_part + surround_baseline[..., np.newaxis]) \ - neural_baseline[..., np.newaxis]/surround_baseline[..., np.newaxis]).astype('float32') else: tc = self.convolve_timecourse_hrf(((prf_amplitude[..., np.newaxis] * activation_part + neural_baseline[..., np.newaxis]) /\ (srf_amplitude[..., np.newaxis] * normalization_part + surround_baseline[..., np.newaxis]) \ - neural_baseline[..., np.newaxis]/surround_baseline[..., np.newaxis]).astype('float32') , current_hrf.astype('float32')) if not self.filter_predictions: return (bold_baseline[..., np.newaxis] + tc).astype('float32') else: return (bold_baseline[..., np.newaxis] + filter_predictions( tc, self.filter_type, self.filter_params)).astype('float32')
[docs]class DoG_Iso2DGaussianModel(Iso2DGaussianModel): """redefining class for difference of Gaussians in iterative fit. """
[docs] def create_grid_predictions(self, gaussian_params, sa, ss, hrf_1=None, hrf_2=None): """create_predictions creates predictions for a given set of parameters [description] Parameters ---------- gaussian_params: ndarray size (3) containing prf position and size. sa,ss: ndarrays containing the range of grid values for other DoG model parameters (surroud amplitude, surround size (sigma_2)) hrf_1, hrf_2 : floats, optional if specified, will be used to create HRF. """ n_predictions = len(sa) if hrf_1 is not None and hrf_2 is not None: if not hasattr(hrf_1, 'shape') and not hasattr(hrf_2, 'shape'): hrf_1 = hrf_1 * np.ones(n_predictions) hrf_2 = hrf_2 * np.ones(n_predictions) prediction_params = [gaussian_params[0]*np.ones(n_predictions), gaussian_params[1]*np.ones(n_predictions), gaussian_params[2]*np.ones(n_predictions), 1.0*np.ones(n_predictions), 0.0*np.ones(n_predictions), sa, ss, hrf_1, hrf_2] return self.return_prediction(*prediction_params).astype('float32')
[docs] def return_prediction(self, mu_x, mu_y, prf_size, prf_amplitude, bold_baseline, srf_amplitude, srf_size, hrf_1=None, hrf_2=None ): """return_prediction returns the prediction for a single set of parameters. As this is to be used during iterative search, it also has arguments beta and baseline. Parameters ---------- mu_x : float x-position of pRF mu_y : float y-position of pRF prf_size : float size of pRF prf_amplitude : float Amplitude (scaling) of pRF bold_baseline : float BOLD baseline (generally kept fixed) srf_amplitude : float Surround pRF amplitude srf_size : float Surround pRF size hrf_1, hrf_2 : floats, optional if specified, will be used to create HRF. Returns ------- numpy.ndarray single prediction given the model """ if hrf_1 is None or hrf_2 is None: current_hrf = self.hrf else: current_hrf = self.create_hrf([1.0, hrf_1, hrf_2]) # create the rfs prf = np.rot90(gauss2D_iso_cart(x=self.stimulus.x_coordinates[..., np.newaxis], y=self.stimulus.y_coordinates[..., np.newaxis], mu=(mu_x, mu_y), sigma=prf_size, normalize_RFs=self.normalize_RFs).T, axes=(1,2)) # surround receptive field srf = np.rot90(gauss2D_iso_cart(x=self.stimulus.x_coordinates[..., np.newaxis], y=self.stimulus.y_coordinates[..., np.newaxis], mu=(mu_x, mu_y), sigma=srf_size, normalize_RFs=self.normalize_RFs).T, axes=(1,2)) dm = self.stimulus.design_matrix if type(current_hrf) == str: if current_hrf == 'direct': tc = prf_amplitude[..., np.newaxis] * stimulus_through_prf(prf, dm, self.stimulus.dx) - \ srf_amplitude[..., np.newaxis] * stimulus_through_prf(srf, dm, self.stimulus.dx) else: tc = self.convolve_timecourse_hrf(prf_amplitude[..., np.newaxis] * stimulus_through_prf(prf, dm, self.stimulus.dx) - \ srf_amplitude[..., np.newaxis] * stimulus_through_prf(srf, dm, self.stimulus.dx), current_hrf) if not self.filter_predictions: return bold_baseline[..., np.newaxis] + tc else: return bold_baseline[..., np.newaxis] + filter_predictions( tc, self.filter_type, self.filter_params)
[docs]class CFGaussianModel(): """A class for constructing gaussian connective field models. """ def __init__(self,stimulus): """__init__ Parameters ---------- stimulus: A CFstimulus object. """ self.stimulus=stimulus
[docs] def create_rfs(self): """create_rfs creates rfs for the grid search Returns ---------- grid_rfs: The receptive field profiles for the grid. vert_centres_flat: A vector that defines the vertex centre associated with each rf profile. sigmas_flat: A vector that defines the CF size associated with each rf profile. """ assert hasattr(self, 'sigmas'), "please define a grid of CF sizes first." if self.func=='cart': # Make the receptive fields extend over the distances controlled by each of the sigma. self.grid_rfs = np.array([gauss1D_cart(self.stimulus.distance_matrix, 0, s) for s in self.sigmas]) # Reshape. self.grid_rfs=self.grid_rfs.reshape(-1, self.grid_rfs.shape[-1]) # Flatten out the variables that define the centres and the sigmas. self.vert_centres_flat=np.tile(self.stimulus.subsurface_verts,self.sigmas.shape) self.sigmas_flat=np.repeat(self.sigmas,self.stimulus.distance_matrix.shape[0])
[docs] def stimulus_times_prfs(self): """stimulus_times_prfs creates timecourses for each of the rfs in self.grid_rfs Returns ---------- predictions: The predicted timecourse that is the dot product of the data in the source subsurface and each rf profile. """ assert hasattr(self, 'grid_rfs'), "please create the rfs first" self.predictions = stimulus_through_prf( self.grid_rfs, self.stimulus.design_matrix, 1)
[docs] def create_grid_predictions(self,sigmas,func='cart'): """Creates the grid rfs and rf predictions """ self.sigmas=sigmas self.func=func self.create_rfs() self.stimulus_times_prfs()
[docs] def return_prediction(self,sigma,beta,baseline,vert): """return_prediction Creates a prediction given a sigma, beta, baseline and vertex centre. Returns ---------- A prediction for this parameter combination. """ beta=np.array(beta) baseline=np.array(baseline) # Find the row of the distance matrix that corresponds to that vertex. idx=np.where(self.stimulus.subsurface_verts==vert)[0][0] # We can grab the row of the distance matrix corresponding to this vertex and make the rf. rf=np.array([gauss1D_cart(self.stimulus.distance_matrix[idx], 0, sigma)]) # Dot with the data to make the predictions. neural_tc = stimulus_through_prf(rf, self.stimulus.design_matrix, 1) return baseline[..., np.newaxis] + beta[..., np.newaxis] * neural_tc