考虑下面的两个数字。
从视觉上看,我可以发现,与曲线的整体形状相比,有几个数据点不合适。有什么好的建议来检测这些点并协调它们的值,使它们与其余的数据点相适应?
a0zr77ik1#
关于这个主题,有一整套变点检测文献。推荐你importruptures。它以这种方式表述这个问题:
import
syqv5f0l2#
首先,我不认为你的问题实际上是检测 * 离群值 *,根据你提供的例子,看起来你是在试图最大限度地减少时间序列中的偏差或尺度变化(我建议更新你的问题标题,以便其他人可以找到这个问题)。所以你问了两个不同的问题如何识别信号中的不连续性,然后如何纠正它。正如在评论中发布的,np.diff将是识别不连续性边缘的一个很好的解决方案。在纠正数据方面,您可以尝试以下几种解决方案,具体取决于您的数据:
np.diff
方法一:如果一个简单的垂直移动就足够了,那么使用相邻的“好”数据的导数来计算“坏”数据垂直移动的距离方法二:如果垂直移位不够精确(可能“坏”数据的斜率不同),则在“坏”数据的两侧找到导数并对其进行插值,然后应用这些插值移位方法三:删除“坏”数据,尝试找到一个充分的多项式拟合,然后用它来估计缺失的数据。注意:对于“坏”数据位于数据集边缘的情况(如上图),只能使用方法1的一个版本
在这个例子中,我用两个移位部分制作了一个简单的信号。您可以在第一个图中看到np.diff中的尖峰如何定位移位部分的边缘。下面是一些示例代码:
import numpy as np import matplotlib.pyplot as plt ## make an arbitrary signal curve x = np.arange(0,20,0.1) f1 = .1 f2 = .12 y = np.sin(2*np.pi*f1*x) + np.sin(np.pi*f2*x) ##------------------ ## add "jumps" in signal y[30:50] = y[30:50] - 0.3 y[120:130] = y[120:130] + np.linspace(1,1.3,10) ## compute the first derivative to identify jumps deriv = np.diff(y) ## find index locations of jumps jump_threshold = 0.2 jump_idxs = np.where(np.abs(deriv)>jump_threshold)[0] ##------------------ ## METHOD 1 ## use adjacent derivatives to rectify data deriv_match1 = deriv[jump_idxs[0]-1] deriv_match2 = deriv[jump_idxs[2]-1] ## find the vertical shift amount required to satisfy the average derivative vert_match1 = y[jump_idxs[0]] + deriv_match1 vert_shift1 = y[jump_idxs[0]+1] - vert_match1 vert_match2 = y[jump_idxs[2]] + deriv_match2 vert_shift2 = y[jump_idxs[2]+1] - vert_match2 ## apply shift to data y_shifted = np.copy(y) y_shifted[jump_idxs[0]+1:jump_idxs[1]+1] = y_shifted[jump_idxs[0]+1:jump_idxs[1]+1] - vert_shift1 y_shifted[jump_idxs[2]+1:jump_idxs[3]+1] = y_shifted[jump_idxs[2]+1:jump_idxs[3]+1] - vert_shift2 ##------------------ ## METHOD 2 ## match each side of the bad data individually, interpolate shifts deriv_match1_1 = deriv[jump_idxs[1]+1] deriv_match2_1 = deriv[jump_idxs[3]+1] ## find vertical shifts on other side of bad data vert_match1_1 = y[jump_idxs[1]+1] + deriv_match1_1 vert_shift1_1 = y[jump_idxs[1]] - vert_match1_1 vert_match2_1 = y[jump_idxs[3]+1] + deriv_match2_1 vert_shift2_1 = y[jump_idxs[3]] - vert_match2_1 ## interpolate shifts y_shifted_1 = np.copy(y) interp_shifts_1 = np.interp(np.arange(jump_idxs[0]+1,jump_idxs[1]+1), xp=[jump_idxs[0]+1,jump_idxs[1]+1], fp=[vert_shift1,vert_shift1_1]) interp_shifts_2 = np.interp(np.arange(jump_idxs[2]+1,jump_idxs[3]+1), xp=[jump_idxs[2]+1,jump_idxs[3]+1], fp=[vert_shift2,vert_shift2_1]) ## apply shift to data y_shifted_1[jump_idxs[0]+1:jump_idxs[1]+1] = y_shifted_1[jump_idxs[0]+1:jump_idxs[1]+1] - interp_shifts_1 y_shifted_1[jump_idxs[2]+1:jump_idxs[3]+1] = y_shifted_1[jump_idxs[2]+1:jump_idxs[3]+1] - interp_shifts_2 ##------------------ ## METHOD 3 ## polynomial fit to data good_data = np.copy(y) good_locs = np.copy(x) good_data = np.delete(good_data, np.arange(jump_idxs[2], jump_idxs[3]+2)) good_data = np.delete(good_data, np.arange(jump_idxs[0], jump_idxs[1]+2)) good_locs = np.delete(good_locs, np.arange(jump_idxs[2], jump_idxs[3]+2)) good_locs = np.delete(good_locs, np.arange(jump_idxs[0], jump_idxs[1]+2)) curve_fit = np.poly1d(np.polyfit(good_locs, good_data, deg=9)) x_fits = np.copy(x) y_fits = [] for i in x_fits: y_fits.append(curve_fit(i)) ##------------------ ## PLOTS fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(8,6)) twinx = ax[0,0].twinx() ax[0,0].scatter(x, y, s=5) ax[0,0].set(ylim=(-1, 3), ylabel="signal / curve", axisbelow=True) twinx.plot(x[:-1], deriv, color='red', label="diff") twinx.set_ylim(-5, 1) twinx.set_ylabel("diff", rotation=270, labelpad=8) ax[0,0].grid("both", alpha=0.2) for idx in jump_idxs: ax[0,0].axvline(x[idx], ls=":", color="green") ax[0,1].scatter(x, y_shifted, s=5, label="rectified data") for idx in jump_idxs: ax[0,1].axvline(x[idx], ls=":", color="green") ax[0,1].grid("both", alpha=0.2) ax[0,1].set_title("Method 1") ax[1,0].scatter(x, y_shifted_1, s=5, label="rectified data") for idx in jump_idxs: ax[1,0].axvline(x[idx], ls=":", color="green") ax[1,0].grid("both", alpha=0.2) ax[1,0].set_title("Method 2") ax[1,1].scatter(good_locs, good_data, s=5, zorder=10) ax[1,1].plot(x_fits, y_fits, color="cyan", label='polynomial fit') for idx in jump_idxs: ax[1,1].axvline(x[idx], ls=":", color="green") ax[1,1].grid("both", alpha=0.2) ax[1,1].set_title("Method 3") twinx.legend() ax[0,1].legend() ax[1,0].legend() ax[1,1].legend() plt.tight_layout() plt.show()
2条答案
按热度按时间a0zr77ik1#
关于这个主题,有一整套变点检测文献。
推荐你
import
ruptures。它以这种方式表述这个问题:
syqv5f0l2#
首先,我不认为你的问题实际上是检测 * 离群值 *,根据你提供的例子,看起来你是在试图最大限度地减少时间序列中的偏差或尺度变化(我建议更新你的问题标题,以便其他人可以找到这个问题)。
所以你问了两个不同的问题如何识别信号中的不连续性,然后如何纠正它。正如在评论中发布的,
np.diff
将是识别不连续性边缘的一个很好的解决方案。在纠正数据方面,您可以尝试以下几种解决方案,具体取决于您的数据:
方法一:如果一个简单的垂直移动就足够了,那么使用相邻的“好”数据的导数来计算“坏”数据垂直移动的距离
方法二:如果垂直移位不够精确(可能“坏”数据的斜率不同),则在“坏”数据的两侧找到导数并对其进行插值,然后应用这些插值移位
方法三:删除“坏”数据,尝试找到一个充分的多项式拟合,然后用它来估计缺失的数据。
注意:对于“坏”数据位于数据集边缘的情况(如上图),只能使用方法1的一个版本
在这个例子中,我用两个移位部分制作了一个简单的信号。您可以在第一个图中看到
np.diff
中的尖峰如何定位移位部分的边缘。下面是一些示例代码: