引言
最近实验室老师让我去预测景区内代步车辆的投放量,于是乎,本着“一心一意地输出年富力强的劳动力”这份初心,我就屁颠屁颠地去找资料,然后发现了Holt-Winters模型 , 感觉这个模型可以有,于是就去研究一番,并总结成这篇博客了。
原理分析
移动平均(The simple moving average (SMA))
直观上,最简单的平滑时间序列的方法是实现一个无权重的移动平均,目前已知的方法是用窗口函数,平滑统计量 St就是最近k个观察值的均值。公式如下:
这样的方法存在明显的缺陷,当k比较小时,预测的数据平滑效果不明显,而且突出反映了数据最近的变化;当k较大时,虽然有较好的平滑效果,但是预测的数据存在延迟。而且最少需要k个值(窗口有限)。
加权移动平均
一种稍微复杂的方法是先选择一组权重因子来计算加权移动平均
在实践中,通常在选择权重因子时,赋予时间序列中的最新数据更大的权重,并减少对旧数据的权重。这个方法也需要最少k个值,并且计算复杂。
简单指数平滑法
幸运地是有一种方法可以避免上述问题,它叫做指数平滑法。最简单的指数平滑法如下:
)
为什么被称为“指数”平滑法
从它的递推公式就能发现:
若要预测Xt后m天,公式如下:
三次指数平滑
三次指数平滑将时间序列的季节性这一特征也考虑进去了。
季节性被定义为时间序列数据的趋势,它表现出每一个周期重复自身的行为,就像任何周期函数一样。“季节”这个词用来表示行为每隔时间段L就开始自我重复。在自然界中有不同类型的季节性“累加性”(additive)和“累乘性“(multiplicative),就像加法和乘法是数学的基本运算。
如果每个12月都比每个11月多卖出1000套公寓,我们就说这样的季节趋势是“累加性”的。可以用绝对增长来表示。如果我们在夏季比冬季多卖出10%的公寓,那么季节趋势在自然中是“累乘性”的。
累乘性公式如下:
累加性公式如下:
对三次指数平滑法而言,我们必须初始化一个完整的“季节”Ci的值,不过我们可以简单地设置为全1(针对累乘式)或全0(针对累加式)。只有当序列的长度较短时,我们才需要慎重考虑初始值的选取。
我们这里讲的Holt-Winters模型就是三次指数平滑法。哇,终于切入正题了。
所有的指数平滑法都要更新上一时间步长的计算结果,并使用当前时间步长的数据中包含的新信息。它们通过“混合”新信息和旧信息来实现,而相关的新旧信息的权重由一个可调整的拌和参数来控制。各种方法的不同之处在于它们跟踪的量的个数和对应的拌和参数的个数。三次指数平滑法,功能最强大,既能体现趋势性又能体现季节性,所以三次指数平滑法的参数最多,有三个。
python代码实现
我们知道HoltWinters模型有三个可调参数,我们的目的就是训练出有效的α,β, γ。我们有两种方法,一种就是自己取值来试试,一种就是采用数值优化的思想,比如前面我们提到的最小二乘来最小化误差来求参数(注意不一定能全局收敛!这个问题实在是让人头痛。。。)我们就采用最小二乘法(L-BFGS)。
RMSE的实现
from __future__ import division
from sys import exit
from math import sqrt
from numpy import array
from scipy.optimize import fmin_l_bfgs_b
def RMSE(params, *args):
Y = args[0]
type = args[1]
rmse = 0
if type == 'linear':
alpha, beta = params
a = [Y[0]]
b = [Y[1] - Y[0]]
y = [a[0] + b[0]]
for i in range(len(Y)):
a.append(alpha * Y[i] + (1 - alpha) * (a[i] + b[i]))
b.append(beta * (a[i + 1] - a[i]) + (1 - beta) * b[i])
y.append(a[i + 1] + b[i + 1])
else:
alpha, beta, gamma = params
m = args[2]
a = [sum(Y[0:m]) / float(m)]
b = [(sum(Y[m:2 * m]) - sum(Y[0:m])) / m ** 2]
if type == 'additive':
s = [Y[i] - a[0] for i in range(m)]
y = [a[0] + b[0] + s[0]]
for i in range(len(Y)):
a.append(alpha * (Y[i] - s[i]) + (1 - alpha) * (a[i] + b[i]))
b.append(beta * (a[i + 1] - a[i]) + (1 - beta) * b[i])
s.append(gamma * (Y[i] - a[i] - b[i]) + (1 - gamma) * s[i])
y.append(a[i + 1] + b[i + 1] + s[i + 1])
elif type == 'multiplicative':
s = [Y[i] / a[0] for i in range(m)]
y = [(a[0] + b[0]) * s[0]]
for i in range(len(Y)):
a.append(alpha * (Y[i] / s[i]) + (1 - alpha) * (a[i] + b[i]))
b.append(beta * (a[i + 1] - a[i]) + (1 - beta) * b[i])
s.append(gamma * (Y[i] / (a[i] + b[i])) + (1 - gamma) * s[i])
y.append((a[i + 1] + b[i + 1]) * s[i + 1])
else:
exit('Type must be either linear, additive or multiplicative')
rmse = sqrt(sum([(m - n) ** 2 for m, n in zip(Y, y[:-1])]) / len(Y))
return rmse
线性实现
def linear(x, fc, alpha = None, beta = None):
Y = x[:]
if (alpha == None or beta == None):
initial_values = array([0.3, 0.1])
boundaries = [(0, 1), (0, 1)]
type = 'linear'
parameters = fmin_l_bfgs_b(RMSE, x0 = initial_values, args = (Y, type), bounds = boundaries, approx_grad = True)
alpha, beta = parameters[0]
a = [Y[0]]
b = [Y[1] - Y[0]]
y = [a[0] + b[0]]
rmse = 0
for i in range(len(Y) + fc):
if i == len(Y):
Y.append(a[-1] + b[-1])
a.append(alpha * Y[i] + (1 - alpha) * (a[i] + b[i]))
b.append(beta * (a[i + 1] - a[i]) + (1 - beta) * b[i])
y.append(a[i + 1] + b[i + 1])
rmse = sqrt(sum([(m - n) ** 2 for m, n in zip(Y[:-fc], y[:-fc - 1])]) / len(Y[:-fc]))
return Y[-fc:], alpha, beta, rmse
累加性
def additive(x, m, fc, alpha = None, beta = None, gamma = None):
Y = x[:]
if (alpha == None or beta == None or gamma == None):
initial_values = array([0.3, 0.1, 0.1])
boundaries = [(0, 1), (0, 1), (0, 1)]
type = 'additive'
parameters = fmin_l_bfgs_b(RMSE, x0 = initial_values, args = (Y, type, m), bounds = boundaries, approx_grad = True)
alpha, beta, gamma = parameters[0]
a = [sum(Y[0:m]) / float(m)]
b = [(sum(Y[m:2 * m]) - sum(Y[0:m])) / m ** 2]
s = [Y[i] - a[0] for i in range(m)]
y = [a[0] + b[0] + s[0]]
rmse = 0
for i in range(len(Y) + fc):
if i == len(Y):
Y.append(a[-1] + b[-1] + s[-m])
a.append(alpha * (Y[i] - s[i]) + (1 - alpha) * (a[i] + b[i]))
b.append(beta * (a[i + 1] - a[i]) + (1 - beta) * b[i])
s.append(gamma * (Y[i] - a[i] - b[i]) + (1 - gamma) * s[i])
y.append(a[i + 1] + b[i + 1] + s[i + 1])
rmse = sqrt(sum([(m - n) ** 2 for m, n in zip(Y[:-fc], y[:-fc - 1])]) / len(Y[:-fc]))
return Y[-fc:], alpha, beta, gamma, rmse
累乘性
def multiplicative(x, m, fc, alpha = None, beta = None, gamma = None):
Y = x[:]
if (alpha == None or beta == None or gamma == None):
initial_values = array([0.0, 1.0, 0.0])
boundaries = [(0, 1), (0, 1), (0, 1)]
type = 'multiplicative'
parameters = fmin_l_bfgs_b(RMSE, x0 = initial_values, args = (Y, type, m), bounds = boundaries, approx_grad = True)
alpha, beta, gamma = parameters[0]
a = [sum(Y[0:m]) / float(m)]
b = [(sum(Y[m:2 * m]) - sum(Y[0:m])) / m ** 2]
s = [Y[i] / a[0] for i in range(m)]
y = [(a[0] + b[0]) * s[0]]
rmse = 0
for i in range(len(Y) + fc):
if i == len(Y):
Y.append((a[-1] + b[-1]) * s[-m])
a.append(alpha * (Y[i] / s[i]) + (1 - alpha) * (a[i] + b[i]))
b.append(beta * (a[i + 1] - a[i]) + (1 - beta) * b[i])
s.append(gamma * (Y[i] / (a[i] + b[i])) + (1 - gamma) * s[i])
y.append((a[i + 1] + b[i + 1]) * s[i + 1])
rmse = sqrt(sum([(m - n) ** 2 for m, n in zip(Y[:-fc], y[:-fc - 1])]) / len(Y[:-fc]))
return Y[-fc:], alpha, beta, gamma, rmse
实验结果
直接构造了个类sin的函数,时间段为[-5,5],预测时间段[5-10](样本数据比较粗糙,多多担待。。。)