python 从曲线中检测异常值的算法

c9qzyr3d  于 2023-06-28  发布在  Python
关注(0)|答案(2)|浏览(167)

考虑下面的两个数字。

从视觉上看,我可以发现,与曲线的整体形状相比,有几个数据点不合适。有什么好的建议来检测这些点并协调它们的值,使它们与其余的数据点相适应?

a0zr77ik

a0zr77ik1#

关于这个主题,有一整套变点检测文献。
推荐你importruptures
它以这种方式表述这个问题:

  • 世界上有一个生成过程,使用某些模型参数。
  • 在某个时刻,参数发生了变化,因此过程输出也发生了变化。
  • 围绕变化 Package 成本函数,最能解释我们观察到的变化的最小变化点集是什么?
syqv5f0l

syqv5f0l2#

首先,我不认为你的问题实际上是检测 * 离群值 *,根据你提供的例子,看起来你是在试图最大限度地减少时间序列中的偏差或尺度变化(我建议更新你的问题标题,以便其他人可以找到这个问题)。
所以你问了两个不同的问题如何识别信号中的不连续性,然后如何纠正它。正如在评论中发布的,np.diff将是识别不连续性边缘的一个很好的解决方案。
在纠正数据方面,您可以尝试以下几种解决方案,具体取决于您的数据:

方法一:如果一个简单的垂直移动就足够了,那么使用相邻的“好”数据的导数来计算“坏”数据垂直移动的距离
方法二:如果垂直移位不够精确(可能“坏”数据的斜率不同),则在“坏”数据的两侧找到导数并对其进行插值,然后应用这些插值移位
方法三:删除“坏”数据,尝试找到一个充分的多项式拟合,然后用它来估计缺失的数据。
注意:对于“坏”数据位于数据集边缘的情况(如上图),只能使用方法1的一个版本

在这个例子中,我用两个移位部分制作了一个简单的信号。您可以在第一个图中看到np.diff中的尖峰如何定位移位部分的边缘。下面是一些示例代码:

import numpy as np
import matplotlib.pyplot as plt

## make an arbitrary signal curve
x = np.arange(0,20,0.1)
f1 = .1
f2 = .12
y = np.sin(2*np.pi*f1*x) + np.sin(np.pi*f2*x)

##------------------
## add "jumps" in signal
y[30:50] = y[30:50] - 0.3
y[120:130] = y[120:130] + np.linspace(1,1.3,10)

## compute the first derivative to identify jumps
deriv = np.diff(y)

## find index locations of jumps
jump_threshold = 0.2
jump_idxs = np.where(np.abs(deriv)>jump_threshold)[0]


##------------------
## METHOD 1
## use adjacent derivatives to rectify data
deriv_match1 = deriv[jump_idxs[0]-1]
deriv_match2 = deriv[jump_idxs[2]-1]

## find the vertical shift amount required to satisfy the average derivative
vert_match1 = y[jump_idxs[0]] + deriv_match1
vert_shift1 = y[jump_idxs[0]+1] - vert_match1

vert_match2 = y[jump_idxs[2]] + deriv_match2
vert_shift2 = y[jump_idxs[2]+1] - vert_match2

## apply shift to data
y_shifted = np.copy(y)
y_shifted[jump_idxs[0]+1:jump_idxs[1]+1] = y_shifted[jump_idxs[0]+1:jump_idxs[1]+1] - vert_shift1
y_shifted[jump_idxs[2]+1:jump_idxs[3]+1] = y_shifted[jump_idxs[2]+1:jump_idxs[3]+1] - vert_shift2

##------------------
## METHOD 2
## match each side of the bad data individually, interpolate shifts
deriv_match1_1 = deriv[jump_idxs[1]+1]
deriv_match2_1 = deriv[jump_idxs[3]+1]

## find vertical shifts on other side of bad data
vert_match1_1 = y[jump_idxs[1]+1] + deriv_match1_1
vert_shift1_1 = y[jump_idxs[1]] - vert_match1_1

vert_match2_1 = y[jump_idxs[3]+1] + deriv_match2_1
vert_shift2_1 = y[jump_idxs[3]] - vert_match2_1

## interpolate shifts
y_shifted_1 = np.copy(y)
interp_shifts_1 = np.interp(np.arange(jump_idxs[0]+1,jump_idxs[1]+1), 
                            xp=[jump_idxs[0]+1,jump_idxs[1]+1],
                            fp=[vert_shift1,vert_shift1_1])
interp_shifts_2 = np.interp(np.arange(jump_idxs[2]+1,jump_idxs[3]+1), 
                            xp=[jump_idxs[2]+1,jump_idxs[3]+1],
                            fp=[vert_shift2,vert_shift2_1])
## apply shift to data
y_shifted_1[jump_idxs[0]+1:jump_idxs[1]+1] = y_shifted_1[jump_idxs[0]+1:jump_idxs[1]+1] - interp_shifts_1
y_shifted_1[jump_idxs[2]+1:jump_idxs[3]+1] = y_shifted_1[jump_idxs[2]+1:jump_idxs[3]+1] - interp_shifts_2

##------------------
## METHOD 3
## polynomial fit to data
good_data = np.copy(y)
good_locs = np.copy(x)
good_data = np.delete(good_data, np.arange(jump_idxs[2], jump_idxs[3]+2))
good_data = np.delete(good_data, np.arange(jump_idxs[0], jump_idxs[1]+2))
good_locs = np.delete(good_locs, np.arange(jump_idxs[2], jump_idxs[3]+2))
good_locs = np.delete(good_locs, np.arange(jump_idxs[0], jump_idxs[1]+2))

curve_fit = np.poly1d(np.polyfit(good_locs, good_data, deg=9))
x_fits = np.copy(x)
y_fits = []
for i in x_fits:
    y_fits.append(curve_fit(i))

##------------------
## PLOTS
fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(8,6))
twinx = ax[0,0].twinx()
ax[0,0].scatter(x, y, s=5)
ax[0,0].set(ylim=(-1, 3), ylabel="signal / curve", axisbelow=True)
twinx.plot(x[:-1], deriv, color='red', label="diff")
twinx.set_ylim(-5, 1)
twinx.set_ylabel("diff", rotation=270, labelpad=8)
ax[0,0].grid("both", alpha=0.2)
for idx in jump_idxs:
    ax[0,0].axvline(x[idx], ls=":", color="green")
    
    
ax[0,1].scatter(x, y_shifted, s=5, label="rectified data")
for idx in jump_idxs:
    ax[0,1].axvline(x[idx], ls=":", color="green")
ax[0,1].grid("both", alpha=0.2)
ax[0,1].set_title("Method 1")

ax[1,0].scatter(x, y_shifted_1, s=5, label="rectified data")
for idx in jump_idxs:
    ax[1,0].axvline(x[idx], ls=":", color="green")
ax[1,0].grid("both", alpha=0.2)
ax[1,0].set_title("Method 2")

ax[1,1].scatter(good_locs, good_data, s=5, zorder=10)
ax[1,1].plot(x_fits, y_fits, color="cyan", label='polynomial fit')
for idx in jump_idxs:
    ax[1,1].axvline(x[idx], ls=":", color="green")
ax[1,1].grid("both", alpha=0.2)
ax[1,1].set_title("Method 3")

twinx.legend()
ax[0,1].legend()
ax[1,0].legend()
ax[1,1].legend()
plt.tight_layout()
plt.show()

相关问题