具体到这个例子中,有如下过程:
需要估计模型参数:即均值和方差
对模型参数\(\Theta\)做极大似然估计:
先计算隐变量的期望,再估计模型参数:
现在有两个硬币A和B,要估计的参数是它们各自翻正面(head)的概率。观察的过程是先随机选A或者B,然后扔10次。以上步骤重复5次。
具体参考这里:
对于第一次的抛,测试输出是否和论文一致:
# 硬币投掷结果观测序列
observations = np.array([[1, 0, 0, 0, 1, 1, 0, 1, 0, 1],
[1, 1, 1, 1, 0, 1, 1, 1, 1, 1],
[1, 0, 1, 1, 1, 1, 1, 0, 1, 1],
[1, 0, 1, 0, 0, 0, 1, 1, 0, 0],
[0, 1, 1, 1, 0, 1, 1, 1, 0, 1]])
coin_A_pmf_observation_1 = stats.binom.pmf(5,10,0.6)
coin_B_pmf_observation_1 = stats.binom.pmf(5,10,0.5)
normalized_coin_A_pmf_observation_1 = coin_A_pmf_observation_1/(coin_A_pmf_observation_1+coin_B_pmf_observation_1)
# 更新在当前参数下A、B硬币产生的正反面次数
counts['A']['H'] += weight_A * num_heads
counts['A']['T'] += weight_A * num_tails
counts['B']['H'] += weight_B * num_heads
counts['B']['T'] += weight_B * num_tails
# 在初始化的theta下,AB分别产生正反面的次数被估计出来了
# 可以基于结果更新theta了
new_theta_A = counts['A']['H'] / (counts['A']['H'] + counts['A']['T'])
new_theta_B = counts['B']['H'] / (counts['B']['H'] + counts['B']['T'])
# new_theta_A=0.713
# new_theta_B=0.581
# 是和论文一致的
完整的模型:
def em_single(priors, observations):
"""
EM算法单次迭代
Arguments
---------
priors : [theta_A, theta_B]
observations : [m X n matrix]
Returns
--------
new_priors: [new_theta_A, new_theta_B]
:param priors:
:param observations:
:return:
"""
counts = {'A': {'H': 0, 'T': 0}, 'B': {'H': 0, 'T': 0}}
theta_A = priors[0]
theta_B = priors[1]
# E step
for observation in observations:
len_observation = len(observation)
num_heads = observation.sum()
num_tails = len_observation - num_heads
contribution_A = stats.binom.pmf(num_heads, len_observation, theta_A)
contribution_B = stats.binom.pmf(num_heads, len_observation, theta_B) # 两个二项分布
weight_A = contribution_A / (contribution_A + contribution_B)
weight_B = contribution_B / (contribution_A + contribution_B)
# 更新在当前参数下A、B硬币产生的正反面次数
counts['A']['H'] += weight_A * num_heads
counts['A']['T'] += weight_A * num_tails
counts['B']['H'] += weight_B * num_heads
counts['B']['T'] += weight_B * num_tails
# M step
new_theta_A = counts['A']['H'] / (counts['A']['H'] + counts['A']['T'])
new_theta_B = counts['B']['H'] / (counts['B']['H'] + counts['B']['T'])
return [new_theta_A, new_theta_B]
def em(observations, prior, tol=1e-6, iterations=10000):
"""
EM算法
:param observations: 观测数据
:param prior: 模型初值
:param tol: 迭代结束阈值
:param iterations: 最大迭代次数
:return: 局部最优的模型参数
"""
import math
iteration = 0
while iteration < iterations:
new_prior = em_single(prior, observations)
delta_change = np.abs(prior[0] - new_prior[0])
if delta_change < tol:
break
else:
prior = new_prior
iteration += 1
return [new_prior, iteration]
# 调用EM算法
print em(observations, [0.6, 0.5])
# [[0.79678875938310978, 0.51958393567528027], 14] # 这个结果与论文一致