Source code for prfpy.timecourse

import numpy as np
import scipy as sp
import scipy.signal as signal
from statsmodels.tsa.arima_process import arma_generate_sample


[docs]def convolve_stimulus_dm(stimulus, hrf): """convolve_stimulus_dm convolve_stimulus_dm convolves an N-D (N>=2) stimulus array with an hrf Parameters ---------- stimulus : numpy.ndarray, N-D (N>=2) stimulus experimental design, with the final dimension being time hrf : numpy.ndarray, 1D contains kernel for convolution """ hrf_shape = np.ones(len(stimulus.shape), dtype=np.int) hrf_shape[-1] = hrf.shape[-1] return signal.fftconvolve(stimulus, hrf.reshape(hrf_shape), mode='full', axes=(-1))[..., :stimulus.shape[-1]]
[docs]def stimulus_through_prf(prfs, stimulus, dx, mask=None): """stimulus_through_prf dot the stimulus and the prfs Parameters ---------- prfs : numpy.ndarray the array of prfs. stimulus : numpy.ndarray the stimulus design matrix, either convolved with hrf or not. mask : numpy.ndarray a mask in feature space, of dimensions equal to the spatial dimensions of both stimulus and receptive field """ assert prfs.shape[1:] == stimulus.shape[:-1], \ """prf array dimensions {prfdim} and input stimulus array dimensions {stimdim} must have same dimensions""".format( prfdim=prfs.shape[1:], stimdim=stimulus.shape[:-1]) if mask == None: prf_r = prfs.reshape((prfs.shape[0], -1)) stim_r = stimulus.reshape((-1, stimulus.shape[-1])) else: assert prfs.shape[1:] == mask.shape and mask.shape == stimulus.shape[:-1], \ """mask dimensions {maskdim}, prf array dimensions {prfdim}, and input stimulus array dimensions {stimdim} must have same dimensions""".format( maskdim=mask.shape, prfdim=prfs.shape[1:], stimdim=stimulus.shape[:-1]) prf_r = prfs[:, mask] stim_r = stimulus[mask, :] return prf_r @ stim_r * (dx ** len(stimulus.shape[:-1]))
[docs]def filter_predictions(predictions, filter_type, filter_params): """ Generic filtering function, calling the different types of filters implemented. Parameters ---------- See individual filters for description. Returns ------- numpy.ndarray filtered version of the array """ if filter_type == 'sg': return sgfilter_predictions(predictions, **filter_params) elif filter_type == 'dc': return dcfilter_predictions(predictions, **filter_params) else: print("unknown filter option selected, using unfiltered prediction") return predictions
[docs]def dcfilter_predictions(predictions, first_modes_to_remove=5, last_modes_to_remove_percent=0, add_mean=True, task_lengths=None, task_names=None, late_iso_dict=None, **kwargs): """dcfilter_predictions discrete cosine filter predictions, to conform to data filtering Parameters ---------- predictions : numpy.ndarray array containing predictions, last dimension is time first_modes_to_remove : int, optional Number of low-frequency eigenmodes to remove (highpass) last_modes_to_remove_percent : int, optional Percentage of high-frequency eigenmodes to remove (lowpass) add_mean : bool, optional whether to add the mean of the time-courses back to the signal after filtering (True, default) or not (False) task_lengths : list of ints, optional If there are multiple tasks, specify their lengths in TRs. The default is None. task_names : list of str, optional Task names. The default is None. late_iso_dict : dict, optional Dictionary whose keys correspond to task_names. Entries are ndarrays containing the TR indices used to compute the BOLD baseline for each task. The default is None. Returns ------- numpy.ndarray filtered version of the array """ if task_lengths is None: task_lengths = [predictions.shape[-1]] # first assess that the number and sizes of chunks are compatible with the predictions assert np.sum(task_lengths) == predictions.shape[-1], "Task lengths \ are incompatible with the number of prediction timepoints." baselines = dict() filtered_predictions = np.zeros_like(predictions) start = 0 for i, task_length in enumerate(task_lengths): stop = start+task_length try: coeffs = sp.fft.dct(predictions, norm='ortho', axis=-1) coeffs[:, :first_modes_to_remove] = 0 if last_modes_to_remove_percent>0: last_modes_to_remove = int(task_length*last_modes_to_remove_percent/100) coeffs[:, -last_modes_to_remove:] = 0 filtered_predictions[..., start:stop] = sp.fft.idct(coeffs, norm='ortho', axis=-1) except: print("Error occurred during predictions discrete cosine filtering.\ Using unfiltered prediction instead") filtered_predictions = predictions if add_mean: filtered_predictions[..., start:stop] += np.mean( predictions[..., start:stop], axis=-1)[..., np.newaxis] if late_iso_dict is not None: baselines[task_names[i]] = np.median(filtered_predictions[..., start:stop][...,late_iso_dict[task_names[i]]], axis=-1) start += task_length if late_iso_dict is not None: baseline_full = np.median([baselines[task_name] for task_name in task_names], axis=0) start = 0 for i, task_length in enumerate(task_lengths): stop = start+task_length baseline_diff = baseline_full - baselines[task_names[i]] filtered_predictions[..., start:stop] += baseline_diff[...,np.newaxis] start += task_length filtered_predictions -= baseline_full[...,np.newaxis] return filtered_predictions
[docs]def sgfilter_predictions(predictions, window_length=201, polyorder=3, highpass=True, add_mean=True, task_lengths=None, task_names=None, late_iso_dict=None, **kwargs): """sgfilter_predictions savitzky golay filter predictions, to conform to data filtering Parameters ---------- predictions : numpy.ndarray array containing predictions, last dimension is time window_length : int, optional window length for SG filter (the default is 201, which is ok for prf experiments, and a bit long for event-related experiments) polyorder : int, optional polynomial order for SG filter (the default is 3, which performs well for fMRI signals when the window length is longer than 2 minutes) highpass : bool, optional whether to use the sgfilter as highpass (True, default) or lowpass (False) add_mean : bool, optional whether to add the mean of the time-courses back to the signal after filtering (True, default) or not (False) task_lengths : list of ints, optional If there are multiple tasks, specify their lengths in TRs. The default is None. task_names : list of str, optional Task names. The default is None. late_iso_dict : dict, optional Dictionary whose keys correspond to task_names. Entries are ndarrays containing the TR indices used to compute the BOLD baseline for each task. The default is None. Raises ------ ValueError when window_length is even Returns ------- numpy.ndarray filtered version of the array """ if window_length != 'adaptive': if window_length % 2 != 1: raise ValueError # window_length should be odd if task_lengths is None: task_lengths = [predictions.shape[-1]] # first assess that the number and sizes of chunks are compatible with the predictions assert np.sum(task_lengths) == predictions.shape[-1], "Task lengths \ are incompatible with the number of prediction timepoints." lp_filtered_predictions = np.zeros_like(predictions) if highpass: hp_filtered_predictions = np.zeros_like(predictions) baselines = dict() start = 0 for i, task_length in enumerate(task_lengths): if window_length == 'adaptive': if task_length % 2 != 1: current_window_length = task_length - 1 else: current_window_length = task_length else: current_window_length = window_length stop = start+task_length try: lp_filtered_predictions[..., start:stop] = signal.savgol_filter( predictions[..., start:stop], window_length=current_window_length, polyorder=polyorder) except: print("Error occurred during predictions savgol filtering.\ Using unfiltered prediction instead") if add_mean: if highpass: lp_filtered_predictions[..., start:stop] -= np.mean( predictions[..., start:stop], axis=-1)[..., np.newaxis] else: lp_filtered_predictions[..., start:stop] += np.mean( predictions[..., start:stop], axis=-1)[..., np.newaxis] if highpass: hp_filtered_predictions[..., start:stop] = predictions[..., start:stop]\ - lp_filtered_predictions[..., start:stop] if late_iso_dict is not None: baselines[task_names[i]] = np.median(hp_filtered_predictions[..., start:stop][...,late_iso_dict[task_names[i]]], axis=-1) start += task_length if late_iso_dict is not None and highpass: baseline_full = np.median([baselines[task_name] for task_name in task_names], axis=0) start = 0 for i, task_length in enumerate(task_lengths): stop = start+task_length baseline_diff = baseline_full - baselines[task_names[i]] hp_filtered_predictions[..., start:stop] += baseline_diff[...,np.newaxis] start += task_length hp_filtered_predictions -= baseline_full[...,np.newaxis] if highpass: return hp_filtered_predictions else: return lp_filtered_predictions
[docs]def generate_random_legendre_drifts(dimensions=(1000, 120), amplitude_ranges=[[500, 600], [-50, 50], [-20, 20], [-10, 10], [-5, 5]]): """generate_random_legendre_drifts generate_random_legendre_drifts generates random slow drifts Parameters ---------- dimensions : tuple, optional shape of the desired data, latter dimension = timepoints the default is (1000,120), which creates 1000 timecourses for a brief fMRI run amplitude_ranges : list, optional Amplitudes of each of the components. Ideally, this should follow something like 1/f. the default is [[500,600],[-50,50],[-20,20],[-10,10],[-5,5]] Returns ------- numpy.ndarray legendre poly drifts with dimensions [dimensions] numpy.ndarray random multiplication factors that created the drifts """ nr_polys = len(amplitude_ranges) drifts = np.polynomial.legendre.legval( x=np.arange(dimensions[-1]), c=np.eye(nr_polys)).T drifts = (drifts-drifts.mean(0))/drifts.mean(0) drifts[:, 0] = np.ones(drifts[:, 0].shape) random_factors = np.array([ar[0] + (ar[1]-ar[0])/2.0 + (np.random.rand(dimensions[0])-0.5) * (ar[1]-ar[0]) for ar in amplitude_ranges]) return np.dot(drifts, random_factors), random_factors
[docs]def generate_random_cosine_drifts(dimensions=(1000, 120), amplitude_ranges=[[500, 600], [-50, 50], [-20, 20], [-10, 10], [-5, 5]]): """generate_random_cosine_drifts generate_random_cosine_drifts generates random slow drifts Parameters ---------- dimensions : tuple, optional shape of the desired data, latter dimension = timepoints the default is (1000,120), which creates 1000 timecourses for a brief fMRI run amplitude_ranges : list, optional Amplitudes of each of the components. Ideally, this should follow something like 1/f. the default is [[500,600],[-50,50],[-20,20],[-10,10],[-5,5]] Returns ------- numpy.ndarray discrete cosine drifts with dimensions [dimensions] numpy.ndarray random multiplication factors that created the drifts """ nr_freqs = len(amplitude_ranges) x = np.linspace(0, np.pi, dimensions[-1]) drifts = np.array([np.cos(x*f) for f in range(nr_freqs)]).T random_factors = np.array([ar[0] + (ar[1]-ar[0])/2.0 + (np.random.rand(dimensions[0])-0.5) * (ar[1]-ar[0]) for ar in amplitude_ranges]) return np.dot(drifts, random_factors), random_factors
[docs]def generate_arima_noise(ar=(1, 0.4), ma=(1, 0.0), dimensions=(1000, 120), **kwargs): """generate_arima_noise generate_arima_noise creates temporally correlated noise Parameters ---------- ar : tuple, optional arima autoregression parameters for statsmodels generation of noise (the default is (1,0.4), which should be a reasonable setting for fMRI noise) ma : tuple, optional arima moving average parameters for statsmodels generation of noise (the default is (1,0.0), which should be a reasonable setting for fMRI noise) dimensions : tuple, optional the first dimension is the nr of separate timecourses, the second dimension is the timeseries length. (the default is (1000,120), a reasonable if brief length for an fMRI run) **kwargs are passed on to statsmodels.tsa.arima_process.arma_generate_sample Returns ------- numpy.ndarray noise of requested dimensions and properties """ return np.array([arma_generate_sample(ar, ma, dimensions[1], **kwargs) for _ in range(dimensions[0])])