import numpy as np
from scipy import signal
from .. import MaskSeparationBase
from ...core import utils
from ...core import constants
[docs]class Duet(MaskSeparationBase):
"""
The DUET algorithm was originally proposed by S.Rickard and F.Dietrich for DOA
estimation and further developed for BSS and demixing by A. Jourjine, S.Rickard,
and O. Yilmaz.
DUET extracts sources using the symmetric attenuation and relative delay between
two channels. The symmetric attenuation is calculated from the ratio of the two
channels' stft amplitudes, and the delay is the arrival delay between the two
sensors used to record the audio signal. These two values are clustered as peaks on
a histogram to determine where each source occurs. This implementation of DUET
creates and returns Mask objects after the run() function, which can then be
applied to the original audio signal to extract each individual source.
References:
[1] Rickard, Scott. "The DUET blind source separation algorithm."
Blind Speech Separation. Springer Netherlands, 2007. 217-241.
[2] Yilmaz, Ozgur, and Scott Rickard. "Blind separation of speech mixtures
via time-frequency masking."
Signal Processing, IEEE transactions on 52.7 (2004): 1830-1847.
Args:
input_audio_signal (np.array): a 2-row Numpy matrix containing samples of the
two-channel mixture.
num_sources (int): Number of sources to find.
attenuation_min (int): Minimum distance in utils.find_peak_indices, change if
not enough peaks are identified.
attenuation_max (int): Used for creating a histogram without outliers.
num_attenuation_bins (int): Number of bins for attenuation.
delay_min (int): Lower bound on delay, used as minimum distance in
utils.find_peak_indices.
delay_max (int): Upper bound on delay, used for creating a histogram without
outliers.
num_delay_bins (int): Number of bins for delay.
peak_threshold (float): Value in [0, 1] for peak picking.
attenuation_min_distance (int): Minimum distance between peaks wrt attenuation.
delay_min_distance (int): Minimum distance between peaks wrt delay.
p (int): Weight the histogram with the symmetric attenuation estimator.
q (int): Weight the histogram with the delay estimato
Notes:
On page 8 of his paper, Rickard recommends p=1 and q=0 as a default starting
point and p=.5, q=0 if one source is more dominant.
Attributes:
stft_ch0 (np.array): A Numpy matrix containing the stft data of channel 0.
stft_ch1 (np.array): A Numpy matrix containing the stft data of channel 1.
frequency_matrix (np.array): A Numpy matrix containing the frequencies of
analysis.
symmetric_atn (np.array): A Numpy matrix containing the symmetric attenuation
between the two channels.
delay (np.array): A Numpy matrix containing the delay between the two channels.
num_time_bins (np.array): The number of time bins for the frequency matrix and
mask arrays.
num_frequency_bins (int): The number of frequency bins for the mask arrays.
attenuation_bins (int): A Numpy array containing the attenuation bins for the
histogram.
delay_bins (np.array): A Numpy array containing the delay bins for the histogram.
normalized_attenuation_delay_histogram (np.array): A normalized Numpy matrix
containing the attenuation delay histogram, which has peaks for each source.
attenuation_delay_histogram (np.array): A non-normalized Numpy matrix containing
the attenuation delay histogram, which has peaks for each source.
peak_indices (np.array): A Numpy array containing the indices of the peaks for
the histogram.
separated_sources (np.array): A Numpy array of arrays containing each
separated source.
"""
def __init__(self, input_audio_signal, num_sources,
attenuation_min=-3, attenuation_max=3, num_attenuation_bins=50,
delay_min=-3, delay_max=3, num_delay_bins=50,
peak_threshold=0.0, attenuation_min_distance=5, delay_min_distance=5,
p=1, q=0, mask_type='binary'):
super().__init__(
input_audio_signal=input_audio_signal,
mask_type=mask_type)
self.num_sources = num_sources
self.attenuation_min = attenuation_min
self.attenuation_max = attenuation_max
self.num_attenuation_bins = num_attenuation_bins
self.delay_min = delay_min
self.delay_max = delay_max
self.num_delay_bins = num_delay_bins
self.peak_threshold = peak_threshold
self.attenuation_min_distance = attenuation_min_distance
self.delay_min_distance = delay_min_distance
self.p = p
self.q = q
self.stft_ch0 = None
self.stft_ch1 = None
self.frequency_matrix = None
self.symmetric_atn = None
self.delay = None
self.num_time_bins = None
self.num_frequency_bins = None
self.attenuation_bins = None
self.delay_bins = None
self.normalized_attenuation_delay_histogram = None
self.attenuation_delay_histogram = None
self.peak_indices = None
self.delay_peak = None
self.atn_peak = None
self.separated_sources = None
def run(self):
""" Extracts N sources from a given stereo audio mixture (N sources captured via 2 sensors)
Returns:
computed_masks (np.array): A list of binary mask objects that can be used to extract the sources
Example:
.. code-block:: python
:linenos:
#Import input audio signal
input_file_name = '../Input/dev1_female3_inst_mix.wav'
signal = AudioSignal(path_to_input_file=input_file_name)
# Set up and run Duet
duet = Duet(signal, a_min=-3, a_max=3, a_num=50, d_min=-3, d_max=3, d_num=50, threshold=0.2,
a_min_distance=5, d_min_distance=5, num_sources=3)
duet.run()
# plot histogram results
duet.plot(os.path.join('..', 'Output', 'duet_2d.png'))
duet.plot(os.path.join('..', 'Output', 'duet_3d.png'), three_d_plot=True)
# Create output file for each source found
output_name_stem = os.path.join('..', 'Output', 'duet_source')
i = 1
for s in duet.make_audio_signals():
output_file_name = f"{output_name_stem}{i}.wav"
s.write_audio_to_file(output_file_name)
i += 1
"""
self.result_masks = []
# Calculate the stft of both channels and create the frequency matrix (the matrix containing the
# frequencies of analysis of the Fourier transform)
self.stft_ch0, self.stft_ch1, self.frequency_matrix = self._compute_spectrogram(
self.sample_rate)
# Calculate the symmetric attenuation (alpha) and delay (delta) for each
# time-freq. point and return a matrix for each
self.symmetric_atn, self.delay = self._compute_atn_delay(
self.stft_ch0, self.stft_ch1, self.frequency_matrix)
# Make histogram of attenuation-delay values and get the center values for the bins in this histogram
self.normalized_attenuation_delay_histogram, self.attenuation_bins, self.delay_bins = (
self._make_histogram()
)
# Find the location of peaks in the attenuation-delay plane
self.peak_indices = utils.find_peak_indices(
self.normalized_attenuation_delay_histogram, self.num_sources,
threshold=self.peak_threshold,
min_dist=[self.attenuation_min_distance, self.delay_min_distance])
# compute delay_peak, attenuation peak, and attenuation/delay estimates
self.delay_peak, atn_delay_est, self.atn_peak = self._convert_peaks(
self.peak_indices)
# compute masks for separation
computed_masks = self._compute_masks()
return computed_masks
def _compute_spectrogram(self, sample_rate):
""" Creates the STFT matrices for channel 0 and 1, and computes the frequency matrix.
Parameter:
sample_rate (integer): sample rate
Returns:
stft_ch0 (np.matrix): a 2D Numpy matrix containing the stft of channel 0
stft_ch1 (np.matrix): a 2D Numpy matrix containing the stft of channel 1
wmat (np.matrix): a 2D Numpy matrix containing the frequencies of analysis of the Fourier transform
"""
# Compute the stft of the two channel mixtures
self.audio_signal.stft_params = self.stft_params
self.audio_signal.stft()
stft_ch0 = self.audio_signal.get_stft_channel(0)
stft_ch1 = self.audio_signal.get_stft_channel(1)
# Compute the freq. matrix for later use in phase calculations
n_time_bins = len(self.audio_signal.time_bins_vector)
wmat = np.array(np.tile(np.mat(
self.audio_signal.freq_vector).T, (1, n_time_bins))) * (
2 * np.pi / sample_rate)
wmat += constants.EPSILON
return stft_ch0, stft_ch1, wmat
@staticmethod
def _compute_atn_delay(stft_ch0, stft_ch1, frequency_matrix):
# Calculate the symmetric attenuation (alpha) and delay (delta) for each
# time-freq. point
inter_channel_ratio = (stft_ch1 + constants.EPSILON) / (stft_ch0 + constants.EPSILON)
attenuation = np.abs(inter_channel_ratio) # relative attenuation between the two channels
symmetric_attenuation = attenuation - 1 / attenuation # symmetric attenuation
relative_delay = -np.imag(np.log(inter_channel_ratio)) / (2 * np.pi * frequency_matrix) # relative delay
return symmetric_attenuation, relative_delay
def _make_histogram(self):
"""Receives the stft of the two channel mixtures and the frequency matrix to a create
a smooth and normalized histogram.
Parameters:
stft_ch0 (complex np.array): a 2D Numpy matrix containing the stft of channel 0
stft_ch1 (complex np.array): a 2D Numpy matrix containing the stft of channel 1
symmetric_atn (np.array): the symmetric attenuation between two channels
delay (np.array): the time delay between 2 channels
wmat(np.array): a 2D Numpy matrix containing the frequency matrix of the signal
Returns:
histogram (np.array): a smooth and normalized histogram
atn_bins (np.array): The range of attenuation values distributed into bins
delay_bins (np.array): The range of delay values distributed into bins
"""
# calculate the weighted histogram
time_frequency_weights = (np.abs(self.stft_ch0) * np.abs(self.stft_ch1)) ** self.p * \
(np.abs(self.frequency_matrix)) ** self.q
# only consider time-freq. points yielding estimates in bounds
attenuation_premask = np.logical_and(self.attenuation_min < self.symmetric_atn,
self.symmetric_atn < self.attenuation_max)
delay_premask = np.logical_and(self.delay_min < self.delay, self.delay < self.delay_max)
attenuation_delay_premask = np.logical_and(attenuation_premask, delay_premask)
nonzero_premask = np.nonzero(attenuation_delay_premask)
symmetric_attenuation_vector = self.symmetric_atn[nonzero_premask]
delay_vector = self.delay[nonzero_premask]
time_frequency_weights_vector = time_frequency_weights[nonzero_premask]
bins_array = np.array([self.num_attenuation_bins, self.num_delay_bins])
range_array = np.array([[self.attenuation_min, self.attenuation_max], [self.delay_min, self.delay_max]])
# compute the histogram
histogram, atn_bins, delay_bins = np.histogram2d(symmetric_attenuation_vector, delay_vector,
bins=bins_array, range=range_array,
weights=time_frequency_weights_vector)
# Save non-normalized as an option for plotting later
self.attenuation_delay_histogram = histogram
# Scale histogram from 0 to 1
histogram /= histogram.max()
# smooth the normalized histogram - local average 3-by-3 neighboring bins
histogram = self._smooth_matrix(histogram, np.array([3]))
return histogram, atn_bins, delay_bins
def _convert_peaks(self, peak_indices):
"""Receives the attenuation and delay bins and computes the delay/attenuation
peaks based on the peak finder indices.
Returns:
delay_peak(np.array): The delay peaks determined from the histogram
atn_delay_est (np.array): The estimated symmetric attenuation and delay values
atn_peak (np.array): Attenuation converted from symmetric attenuation
"""
atn_indices = [x[0] for x in peak_indices]
delay_indices = [x[1] for x in peak_indices]
symmetric_atn_peak = self.attenuation_bins[atn_indices]
delay_peak = self.delay_bins[delay_indices]
atn_delay_est = np.column_stack((symmetric_atn_peak, delay_peak))
# convert symmetric_atn to atn_peak using formula from Rickard
atn_peak = (symmetric_atn_peak + np.sqrt(symmetric_atn_peak ** 2 + 4)) / 2
return delay_peak, atn_delay_est, atn_peak
def _compute_masks(self):
"""Receives the attenuation and delay peaks and computes a mask to be applied to the signal for source
separation.
"""
# compute masks for separation
best_so_far = np.inf * np.ones_like(self.stft_ch0, dtype=float)
for i in range(0, self.num_sources):
mask_array = np.zeros_like(self.stft_ch0, dtype=bool)
phase = np.exp(-1j * self.frequency_matrix * self.delay_peak[i])
score = np.abs(self.atn_peak[i] * phase * self.stft_ch0 - self.stft_ch1) ** 2 / (1 + self.atn_peak[i] ** 2)
mask = (score < best_so_far)
mask_array[mask] = True
background_mask = self.mask_type(np.array(mask_array))
self.result_masks.append(background_mask)
self.result_masks[0].mask = np.logical_xor(self.result_masks[i].mask, self.result_masks[0].mask)
best_so_far[mask] = score[mask]
# Compute first mask based on what the other masks left remaining
self.result_masks[0].mask = np.logical_not(self.result_masks[0].mask)
return self.result_masks
@staticmethod
def _smooth_matrix(matrix, kernel):
"""Performs two-dimensional convolution in order to smooth the values of matrix elements.
(similar to low-pass filtering)
Parameters:
matrix (np.array): a 2D Numpy matrix to be smoothed
kernel (np.array): a 2D Numpy matrix containing kernel values
Note:
if Kernel is of size 1 by 1 (scalar), a Kernel by Kernel matrix of 1/Kernel**2 will be used as the matrix
averaging kernel
Output:
smoothed_matrix (np.array): a 2D Numpy matrix containing a smoothed version of Mat (same size as Mat)
"""
# check the dimensions of the Kernel matrix and set the values of the averaging
# matrix, kernel_matrix
kernel_matrix = np.ones((kernel[0], kernel[0])) / kernel[0] ** 2
krow, kcol = np.shape(kernel_matrix)
# adjust the matrix dimension for convolution
copy_row = int(np.floor(krow / 2)) # number of rows to copy on top and bottom
copy_col = int(np.floor(kcol / 2)) # number of columns to copy on either side
# TODO: This is very ugly. Make this readable.
# form the augmented matrix (rows and columns added to top, bottom, and sides)
matrix = np.mat(matrix) # make sure Mat is a Numpy matrix
augmented_matrix = np.vstack(
[
np.hstack(
[matrix[0, 0] * np.ones((copy_row, copy_col)),
np.ones((copy_row, 1)) * matrix[0, :],
matrix[0, -1] * np.ones((copy_row, copy_col))
]),
np.hstack(
[matrix[:, 0] * np.ones((1, copy_col)),
matrix,
matrix[:, -1] * np.ones((1, copy_col))]),
np.hstack(
[matrix[-1, 1] * np.ones((copy_row, copy_col)),
np.ones((copy_row, 1)) * matrix[-1, :],
matrix[-1, -1] * np.ones((copy_row, copy_col))
]
)
]
)
# perform two-dimensional convolution between the input matrix and the kernel
smooted_matrix = signal.convolve2d(augmented_matrix, kernel_matrix[::-1, ::-1], mode='valid')
return smooted_matrix