Source code for nussl.separation.factorization.rpca

import numpy as np

from .. import MaskSeparationBase
from ..benchmark import HighLowPassFilter


[docs]class RPCA(MaskSeparationBase): """ Implements foreground/background separation using RPCA. Huang, Po-Sen, et al. "Singing-voice separation from monaural recordings using robust principal component analysis." Acoustics, Speech and Signal Processing (ICASSP), 2012 IEEE International Conference on. IEEE, 2012. Args: input_audio_signal (AudioSignal): The AudioSignal object that has the audio data that RPCA will be run on. high_pass_cutoff (float, optional): Value (in Hz) for the high pass cutoff filter. Defaults to 100. num_iterations (int, optional): how many iterations to run RPCA for. Defaults to 100. epsilon (float, optional): Stopping criterion for RPCA convergence. Defaults to 1e-7. mask_type (str, optional): Type of mask to use. Defaults to 'soft'. mask_threshold (float, optional): Threshold for mask. Defaults to 0.5. """ def __init__(self, input_audio_signal, high_pass_cutoff=100, num_iterations=100, epsilon=1e-7, mask_type='soft', mask_threshold=0.5): super().__init__( input_audio_signal=input_audio_signal, mask_type=mask_type, mask_threshold=mask_threshold) self.high_pass_cutoff = high_pass_cutoff self.epsilon = epsilon self.num_iterations = num_iterations self.gain = 1 self.error = None self.magnitude_spectrogram = None def run(self): high_low = HighLowPassFilter(self.audio_signal, self.high_pass_cutoff) high_pass_masks = high_low.run() self.magnitude_spectrogram = np.abs(self.stft) background_masks = [] foreground_masks = [] for ch in range(self.audio_signal.num_channels): background_mask = self._compute_rpca_mask( self.magnitude_spectrogram[..., ch]) foreground_mask = 1 - background_mask background_masks.append(background_mask) foreground_masks.append(foreground_mask) background_masks = np.stack(background_masks, axis=-1) foreground_masks = np.stack(foreground_masks, axis=-1) _masks = np.stack([background_masks, foreground_masks], axis=-1) self.result_masks = [] for i in range(_masks.shape[-1]): mask_data = _masks[..., i] if self.mask_type == self.MASKS['binary']: mask_data = _masks[..., i] == np.max(_masks, axis=-1) if i == 0: mask_data = np.maximum(mask_data, high_pass_masks[i].mask) elif i == 1: mask_data = np.minimum(mask_data, high_pass_masks[i].mask) mask = self.mask_type(mask_data) self.result_masks.append(mask) return self.result_masks def _compute_rpca_mask(self, magnitude_spectrogram): low_rank, sparse_matrix = self.decompose(magnitude_spectrogram) bg_mask = self.gain * np.abs(sparse_matrix) <= np.abs(low_rank) return bg_mask.astype(float) def decompose(self, magnitude_spectrogram): # compute rule of thumb values of lagrange multiplier and svd-threshold _lambda = 1 / np.sqrt(np.max(magnitude_spectrogram.shape)) # initialize low rank and sparse matrices to all zeros low_rank = np.zeros(magnitude_spectrogram.shape) sparse_matrix = np.zeros(magnitude_spectrogram.shape) # get singular values for magnitude_spectrogram two_norm = np.linalg.svd(magnitude_spectrogram, full_matrices=False, compute_uv=False)[0] inf_norm = np.linalg.norm(magnitude_spectrogram.flatten(), np.inf) / _lambda dual_norm = np.max([two_norm, inf_norm]) residuals = magnitude_spectrogram / dual_norm # tunable parameters mu = 1.25 / two_norm mu_bar = mu * 1e7 rho = 1.5 error = np.inf converged = False num_iteration = 0 while not converged and num_iteration < self.num_iterations: num_iteration += 1 low_rank = self.svd_threshold(magnitude_spectrogram - sparse_matrix + residuals / mu, 1 / mu) sparse_matrix = self.shrink(magnitude_spectrogram - low_rank + residuals / mu, _lambda / mu) residuals += mu * (magnitude_spectrogram - low_rank - sparse_matrix) mu = np.min([mu * rho, mu_bar]) error = np.linalg.norm(magnitude_spectrogram - low_rank - sparse_matrix, ord='fro') / np.linalg.norm(magnitude_spectrogram, ord='fro') if error < self.epsilon: converged = True self.error = error return low_rank, sparse_matrix @staticmethod def shrink(matrix, tau): return np.sign(matrix) * np.maximum(np.abs(matrix) - tau, 0) def svd_threshold(self, matrix, tau): u, sigma, v = np.linalg.svd(matrix, full_matrices=False) shrunk = self.shrink(sigma, tau) thresholded_singular_values = np.dot(u, np.dot(np.diag(shrunk), v)) return thresholded_singular_values