考虑下面的两个数字。
从视觉上看,我可以发现,与曲线的整体形状相比,有几个数据点不合适。有什么好的建议来检测这些点并协调它们的值,使它们与其余的数据点相适应?
a0zr77ik1#
关于这个主题,有一整套变点检测文献。推荐你importruptures。它以这种方式表述这个问题:
import
syqv5f0l2#
首先,我不认为你的问题实际上是检测 * 离群值 *,根据你提供的例子,看起来你是在试图最大限度地减少时间序列中的偏差或尺度变化(我建议更新你的问题标题,以便其他人可以找到这个问题)。所以你问了两个不同的问题如何识别信号中的不连续性,然后如何纠正它。正如在评论中发布的,np.diff将是识别不连续性边缘的一个很好的解决方案。在纠正数据方面,您可以尝试以下几种解决方案,具体取决于您的数据:
np.diff
方法一:如果一个简单的垂直移动就足够了,那么使用相邻的“好”数据的导数来计算“坏”数据垂直移动的距离方法二:如果垂直移位不够精确(可能“坏”数据的斜率不同),则在“坏”数据的两侧找到导数并对其进行插值,然后应用这些插值移位方法三:删除“坏”数据,尝试找到一个充分的多项式拟合,然后用它来估计缺失的数据。注意:对于“坏”数据位于数据集边缘的情况(如上图),只能使用方法1的一个版本
在这个例子中,我用两个移位部分制作了一个简单的信号。您可以在第一个图中看到np.diff中的尖峰如何定位移位部分的边缘。下面是一些示例代码:
import numpy as npimport matplotlib.pyplot as plt## make an arbitrary signal curvex = np.arange(0,20,0.1)f1 = .1f2 = .12y = np.sin(2*np.pi*f1*x) + np.sin(np.pi*f2*x)##------------------## add "jumps" in signaly[30:50] = y[30:50] - 0.3y[120:130] = y[120:130] + np.linspace(1,1.3,10)## compute the first derivative to identify jumpsderiv = np.diff(y)## find index locations of jumpsjump_threshold = 0.2jump_idxs = np.where(np.abs(deriv)>jump_threshold)[0]##------------------## METHOD 1## use adjacent derivatives to rectify dataderiv_match1 = deriv[jump_idxs[0]-1]deriv_match2 = deriv[jump_idxs[2]-1]## find the vertical shift amount required to satisfy the average derivativevert_match1 = y[jump_idxs[0]] + deriv_match1vert_shift1 = y[jump_idxs[0]+1] - vert_match1vert_match2 = y[jump_idxs[2]] + deriv_match2vert_shift2 = y[jump_idxs[2]+1] - vert_match2## apply shift to datay_shifted = np.copy(y)y_shifted[jump_idxs[0]+1:jump_idxs[1]+1] = y_shifted[jump_idxs[0]+1:jump_idxs[1]+1] - vert_shift1y_shifted[jump_idxs[2]+1:jump_idxs[3]+1] = y_shifted[jump_idxs[2]+1:jump_idxs[3]+1] - vert_shift2##------------------## METHOD 2## match each side of the bad data individually, interpolate shiftsderiv_match1_1 = deriv[jump_idxs[1]+1]deriv_match2_1 = deriv[jump_idxs[3]+1]## find vertical shifts on other side of bad datavert_match1_1 = y[jump_idxs[1]+1] + deriv_match1_1vert_shift1_1 = y[jump_idxs[1]] - vert_match1_1vert_match2_1 = y[jump_idxs[3]+1] + deriv_match2_1vert_shift2_1 = y[jump_idxs[3]] - vert_match2_1## interpolate shiftsy_shifted_1 = np.copy(y)interp_shifts_1 = np.interp(np.arange(jump_idxs[0]+1,jump_idxs[1]+1), xp=[jump_idxs[0]+1,jump_idxs[1]+1], fp=[vert_shift1,vert_shift1_1])interp_shifts_2 = np.interp(np.arange(jump_idxs[2]+1,jump_idxs[3]+1), xp=[jump_idxs[2]+1,jump_idxs[3]+1], fp=[vert_shift2,vert_shift2_1])## apply shift to datay_shifted_1[jump_idxs[0]+1:jump_idxs[1]+1] = y_shifted_1[jump_idxs[0]+1:jump_idxs[1]+1] - interp_shifts_1y_shifted_1[jump_idxs[2]+1:jump_idxs[3]+1] = y_shifted_1[jump_idxs[2]+1:jump_idxs[3]+1] - interp_shifts_2##------------------## METHOD 3## polynomial fit to datagood_data = np.copy(y)good_locs = np.copy(x)good_data = np.delete(good_data, np.arange(jump_idxs[2], jump_idxs[3]+2))good_data = np.delete(good_data, np.arange(jump_idxs[0], jump_idxs[1]+2))good_locs = np.delete(good_locs, np.arange(jump_idxs[2], jump_idxs[3]+2))good_locs = np.delete(good_locs, np.arange(jump_idxs[0], jump_idxs[1]+2))curve_fit = np.poly1d(np.polyfit(good_locs, good_data, deg=9))x_fits = np.copy(x)y_fits = []for i in x_fits: y_fits.append(curve_fit(i))##------------------## PLOTSfig, ax = plt.subplots(nrows=2, ncols=2, figsize=(8,6))twinx = ax[0,0].twinx()ax[0,0].scatter(x, y, s=5)ax[0,0].set(ylim=(-1, 3), ylabel="signal / curve", axisbelow=True)twinx.plot(x[:-1], deriv, color='red', label="diff")twinx.set_ylim(-5, 1)twinx.set_ylabel("diff", rotation=270, labelpad=8)ax[0,0].grid("both", alpha=0.2)for idx in jump_idxs: ax[0,0].axvline(x[idx], ls=":", color="green") ax[0,1].scatter(x, y_shifted, s=5, label="rectified data")for idx in jump_idxs: ax[0,1].axvline(x[idx], ls=":", color="green")ax[0,1].grid("both", alpha=0.2)ax[0,1].set_title("Method 1")ax[1,0].scatter(x, y_shifted_1, s=5, label="rectified data")for idx in jump_idxs: ax[1,0].axvline(x[idx], ls=":", color="green")ax[1,0].grid("both", alpha=0.2)ax[1,0].set_title("Method 2")ax[1,1].scatter(good_locs, good_data, s=5, zorder=10)ax[1,1].plot(x_fits, y_fits, color="cyan", label='polynomial fit')for idx in jump_idxs: ax[1,1].axvline(x[idx], ls=":", color="green")ax[1,1].grid("both", alpha=0.2)ax[1,1].set_title("Method 3")twinx.legend()ax[0,1].legend()ax[1,0].legend()ax[1,1].legend()plt.tight_layout()plt.show()
import numpy as np
import matplotlib.pyplot as plt
## make an arbitrary signal curve
x = np.arange(0,20,0.1)
f1 = .1
f2 = .12
y = np.sin(2*np.pi*f1*x) + np.sin(np.pi*f2*x)
##------------------
## add "jumps" in signal
y[30:50] = y[30:50] - 0.3
y[120:130] = y[120:130] + np.linspace(1,1.3,10)
## compute the first derivative to identify jumps
deriv = np.diff(y)
## find index locations of jumps
jump_threshold = 0.2
jump_idxs = np.where(np.abs(deriv)>jump_threshold)[0]
## METHOD 1
## use adjacent derivatives to rectify data
deriv_match1 = deriv[jump_idxs[0]-1]
deriv_match2 = deriv[jump_idxs[2]-1]
## find the vertical shift amount required to satisfy the average derivative
vert_match1 = y[jump_idxs[0]] + deriv_match1
vert_shift1 = y[jump_idxs[0]+1] - vert_match1
vert_match2 = y[jump_idxs[2]] + deriv_match2
vert_shift2 = y[jump_idxs[2]+1] - vert_match2
## apply shift to data
y_shifted = np.copy(y)
y_shifted[jump_idxs[0]+1:jump_idxs[1]+1] = y_shifted[jump_idxs[0]+1:jump_idxs[1]+1] - vert_shift1
y_shifted[jump_idxs[2]+1:jump_idxs[3]+1] = y_shifted[jump_idxs[2]+1:jump_idxs[3]+1] - vert_shift2
## METHOD 2
## match each side of the bad data individually, interpolate shifts
deriv_match1_1 = deriv[jump_idxs[1]+1]
deriv_match2_1 = deriv[jump_idxs[3]+1]
## find vertical shifts on other side of bad data
vert_match1_1 = y[jump_idxs[1]+1] + deriv_match1_1
vert_shift1_1 = y[jump_idxs[1]] - vert_match1_1
vert_match2_1 = y[jump_idxs[3]+1] + deriv_match2_1
vert_shift2_1 = y[jump_idxs[3]] - vert_match2_1
## interpolate shifts
y_shifted_1 = np.copy(y)
interp_shifts_1 = np.interp(np.arange(jump_idxs[0]+1,jump_idxs[1]+1),
xp=[jump_idxs[0]+1,jump_idxs[1]+1],
fp=[vert_shift1,vert_shift1_1])
interp_shifts_2 = np.interp(np.arange(jump_idxs[2]+1,jump_idxs[3]+1),
xp=[jump_idxs[2]+1,jump_idxs[3]+1],
fp=[vert_shift2,vert_shift2_1])
y_shifted_1[jump_idxs[0]+1:jump_idxs[1]+1] = y_shifted_1[jump_idxs[0]+1:jump_idxs[1]+1] - interp_shifts_1
y_shifted_1[jump_idxs[2]+1:jump_idxs[3]+1] = y_shifted_1[jump_idxs[2]+1:jump_idxs[3]+1] - interp_shifts_2
## METHOD 3
## polynomial fit to data
good_data = np.copy(y)
good_locs = np.copy(x)
good_data = np.delete(good_data, np.arange(jump_idxs[2], jump_idxs[3]+2))
good_data = np.delete(good_data, np.arange(jump_idxs[0], jump_idxs[1]+2))
good_locs = np.delete(good_locs, np.arange(jump_idxs[2], jump_idxs[3]+2))
good_locs = np.delete(good_locs, np.arange(jump_idxs[0], jump_idxs[1]+2))
curve_fit = np.poly1d(np.polyfit(good_locs, good_data, deg=9))
x_fits = np.copy(x)
y_fits = []
for i in x_fits:
y_fits.append(curve_fit(i))
## PLOTS
fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(8,6))
twinx = ax[0,0].twinx()
ax[0,0].scatter(x, y, s=5)
ax[0,0].set(ylim=(-1, 3), ylabel="signal / curve", axisbelow=True)
twinx.plot(x[:-1], deriv, color='red', label="diff")
twinx.set_ylim(-5, 1)
twinx.set_ylabel("diff", rotation=270, labelpad=8)
ax[0,0].grid("both", alpha=0.2)
for idx in jump_idxs:
ax[0,0].axvline(x[idx], ls=":", color="green")
ax[0,1].scatter(x, y_shifted, s=5, label="rectified data")
ax[0,1].axvline(x[idx], ls=":", color="green")
ax[0,1].grid("both", alpha=0.2)
ax[0,1].set_title("Method 1")
ax[1,0].scatter(x, y_shifted_1, s=5, label="rectified data")
ax[1,0].axvline(x[idx], ls=":", color="green")
ax[1,0].grid("both", alpha=0.2)
ax[1,0].set_title("Method 2")
ax[1,1].scatter(good_locs, good_data, s=5, zorder=10)
ax[1,1].plot(x_fits, y_fits, color="cyan", label='polynomial fit')
ax[1,1].axvline(x[idx], ls=":", color="green")
ax[1,1].grid("both", alpha=0.2)
ax[1,1].set_title("Method 3")
twinx.legend()
ax[0,1].legend()
ax[1,0].legend()
ax[1,1].legend()
plt.tight_layout()
plt.show()
2条答案
按热度按时间a0zr77ik1#
关于这个主题,有一整套变点检测文献。
推荐你
import
ruptures。它以这种方式表述这个问题:
syqv5f0l2#
首先,我不认为你的问题实际上是检测 * 离群值 *,根据你提供的例子,看起来你是在试图最大限度地减少时间序列中的偏差或尺度变化(我建议更新你的问题标题,以便其他人可以找到这个问题)。
所以你问了两个不同的问题如何识别信号中的不连续性,然后如何纠正它。正如在评论中发布的,
np.diff
将是识别不连续性边缘的一个很好的解决方案。在纠正数据方面,您可以尝试以下几种解决方案,具体取决于您的数据:
方法一:如果一个简单的垂直移动就足够了,那么使用相邻的“好”数据的导数来计算“坏”数据垂直移动的距离
方法二:如果垂直移位不够精确(可能“坏”数据的斜率不同),则在“坏”数据的两侧找到导数并对其进行插值,然后应用这些插值移位
方法三:删除“坏”数据,尝试找到一个充分的多项式拟合,然后用它来估计缺失的数据。
注意:对于“坏”数据位于数据集边缘的情况(如上图),只能使用方法1的一个版本
在这个例子中,我用两个移位部分制作了一个简单的信号。您可以在第一个图中看到
np.diff
中的尖峰如何定位移位部分的边缘。下面是一些示例代码: