TensorFlow - 线性回归

来自CloudWiki
跳转至: 导航搜索

前期准备

TensorFlow 相关 API 可以到在TensorFlow:概念和语法、*TensorFlow: 张量的运算中学习。

建立模型

如下为我们进行某项实验获得的一些实验数据:

输入 	输出
1 	4.8
2 	8.5
3 	10.4
6 	21
8 	25.3
...

我们将这些数据放到一个二维图上可以看的更直观一些,如下,这些数据在图中表现为一些离散的点:

Tf2-1.png

我们需要根据现有的这些数据归纳出一个通用模型,通过这个模型我们可以预测其他的输入值产生的输出值。

如下图,我们选择的模型既可以是红线表示的鬼都看不懂的曲线模型,也可以是蓝线表示的线性模型,在概率统计理论的分析中,这两种模型符合真实模型的概率是一样的。

Tf2-2.png

根据 “奥卡姆剃刀原则-若有多个假设与观察一致,则选最简单的那个,蓝线表示的线性模型更符合我们的直观预期。 如果用 x 表示输入, y

表示输出,线性模型可以用下面的方程表示:

y=W×x+b

即使我们选择了直线模型,可以选择的模型也会有很多,如下图的三条直线都像是一种比较合理的模型,只是W和b参数不同。这时我们需要设计一个损失模型(loss model),来评估一下哪个模型更合理一些,并找到一个最准确的模型。 损失函数越小,说明模型与真实的数据最越接近,我们就说这个模型越准确。

Tf2-3.png

如下图每条黄线代表线性模型计算出来的值与实际输出值之间的差值:

Tf2-4.png

我们用y′表示实验得到的实际输出,用下面的方程表示我们的损失模型:

loss=∑n=1N(yn−y′n)2

显然,损失模型里得到的loss越小,说明我们的线性模型越准确。

所以,我们的计算目标,就是通过不断的迭代计算,使得模型里的损失函数越来越小,即构建的模型越来越接近于真实的模型。

用TensorFlow实现模型

创建源文件

创建源文件linear_regression_model.py,并在源文件中定义类linearRegressionModel:

#!/usr/bin/python
# -*- coding: utf-8 -*
import tensorflow as tf
import numpy as np

class linearRegressionModel:
  
  #对类中的变量进行初始化
  def __init__(self,x_dimen):
    self.x_dimen = x_dimen #变量x的维数
    self._index_in_epoch = 0#访问数据集的指针
    self.constructModel()#创建模型
    self.sess = tf.Session()#创建一个会话
    self.sess.run(tf.global_variables_initializer())

  #权重初始化
  def weight_variable(self,shape):
    initial = tf.truncated_normal(shape,stddev = 0.1)
    return tf.Variable(initial)

  #偏置项初始化
  def bias_variable(self,shape):
    initial = tf.constant(0.1,shape = shape)
    return tf.Variable(initial)

构建模型

在类中新建构造函数constructModel:

 def constructModel(self):#构建模型
    self.x = tf.placeholder(tf.float32, [None,self.x_dimen]) #横坐标值,用占位符x表示
    self.y = tf.placeholder(tf.float32,[None,1])#纵坐标值,用占位符y表示 y= w * x+ b
    self.w = self.weight_variable([self.x_dimen,1])#权重值w  
    self.b = self.bias_variable([1])#偏移值 b
    self.y_prec = tf.nn.bias_add(tf.matmul(self.x, self.w), self.b) #y= w * x+ b   

定义损失函数(loss)的优化器

在构造函数constructModel中继续添加:

   mse = tf.reduce_mean(tf.squared_difference(self.y_prec, self.y))#求真实值和预测值的差平方
    l2 = tf.reduce_mean(tf.square(self.w))#求权重的平均值
    self.loss = mse + 0.15*l2 #损失函数
    self.train_step = tf.train.AdamOptimizer(0.1).minimize(self.loss) # 创建一个优化器,学习率为0.1,对数据进行分步训练

抓取数据

每次选取100个样本,如果选完,重新打乱:

 def next_batch(self,batch_size):
    start = self._index_in_epoch
    self._index_in_epoch += batch_size
    if self._index_in_epoch > self._num_datas:
        perm = np.arange(self._num_datas)
        np.random.shuffle(perm)
        self._datas = self._datas[perm]
        self._labels = self._labels[perm]
        start = 0
        self._index_in_epoch = batch_size
        assert batch_size <= self._num_datas
    end = self._index_in_epoch
    return self._datas[start:end],self._labels[start:end]

训练模型

以使损失函数逐渐减小的方式训练模型,逐渐使模型达到最优:

 def train(self,x_train,y_train,x_test,y_test):
    self._datas = x_train
    self._labels = y_train
    self._num_datas = x_train.shape[0]
    for i in range(5000):
        batch = self.next_batch(100)#每次选取100个样本
        self.sess.run(self.train_step,feed_dict={self.x:batch[0],self.y:batch[1]})
        if i%10 == 0:
            train_loss = self.sess.run(self.loss,feed_dict={self.x:batch[0],self.y:batch[1]})#开始训练,每隔10步,打印一下现在的损失函数
            print('step %d,test_loss %f' % (i,train_loss))

用模型来测试新数据

 def predict_batch(self,arr,batch_size):
    for i in range(0,len(arr),batch_size):
        yield arr[i:i + batch_size]#一次性取batch_size个数据

  def predict(self, x_predict):
    pred_list = []
    for x_test_batch in self.predict_batch(x_predict,100):
      pred = self.sess.run(self.y_prec, {self.x:x_test_batch})#对测试数组进行批量预测
      pred_list.append(pred)#将预测的值添加到列表中
    return np.vstack(pred_list)#返回列表

测试模型并与sklearn库做对比

示例代码:/home/ubuntu/run.py

#!/usr/bin/python
# -*- coding: utf-8 -*

from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
from sklearn.datasets import make_regression
from sklearn.linear_model import LinearRegression
from linear_regression_model import linearRegressionModel as lrm

if __name__ == '__main__':
    x, y = make_regression(7000) # X为样本特征,y为样本输出, 共7000个样本,每个样本1个特征
    x_train,x_test,y_train, y_test = train_test_split(x, y, test_size=0.5) #将样本分为测试数据和训练数据
    y_lrm_train = y_train.reshape(-1, 1)#调整矩阵的维数为1列矩阵
    y_lrm_test = y_test.reshape(-1, 1)#调整矩阵的维数为1列矩阵

    linear = lrm(x.shape[1])
    linear.train(x_train, y_lrm_train,x_test,y_lrm_test)#调用刚刚编写的tensorFlow的线性回归模型进行训练
    y_predict = linear.predict(x_test)#对测试数据进行测试,求出预测值
    print("Tensorflow R2: ", r2_score(y_predict.ravel(), y_lrm_test.ravel()))#将预测值与真实值做一个比较,求出正确率

    lr = LinearRegression()
    y_predict = lr.fit(x_train, y_train).predict(x_test)#用python自带的sklearn库进行训练
    print("Sklearn R2: ", r2_score(y_predict, y_test)) #采用r2_score评分函数

完整代码

linear_regression_model.py

现在您可以在 /home/ubuntu 目录下创建源文件 linear_regression_model.py,内容可参考:

示例代码:/home/ubuntu/linear_regression_model.py

#!/usr/bin/python
# -*- coding: utf-8 -*
import tensorflow as tf
import numpy as np

class linearRegressionModel:
  
  #对类中的变量进行初始化
  def __init__(self,x_dimen):
    self.x_dimen = x_dimen #变量x的维数
    self._index_in_epoch = 0#访问数据集的指针
    self.constructModel()
    self.sess = tf.Session()#创建一个会话
    self.sess.run(tf.global_variables_initializer())

  #权重初始化
  def weight_variable(self,shape):
    initial = tf.truncated_normal(shape,stddev = 0.1)
    return tf.Variable(initial)

  #偏置项初始化
  def bias_variable(self,shape):
    initial = tf.constant(0.1,shape = shape)
    return tf.Variable(initial)

  #每次选取100个样本,如果选完,重新打乱
  def next_batch(self,batch_size):
    start = self._index_in_epoch
    self._index_in_epoch += batch_size
    if self._index_in_epoch > self._num_datas:
        perm = np.arange(self._num_datas)
        np.random.shuffle(perm)
        self._datas = self._datas[perm]
        self._labels = self._labels[perm]
        start = 0
        self._index_in_epoch = batch_size
        assert batch_size <= self._num_datas
    end = self._index_in_epoch
    return self._datas[start:end],self._labels[start:end]

  def constructModel(self):#构建模型
    self.x = tf.placeholder(tf.float32, [None,self.x_dimen]) #横坐标值,用占位符x表示
    self.y = tf.placeholder(tf.float32,[None,1])#纵坐标值,用占位符y表示 y= w * x+ b
    self.w = self.weight_variable([self.x_dimen,1])#权重值w  
    self.b = self.bias_variable([1])#偏移值 b
    self.y_prec = tf.nn.bias_add(tf.matmul(self.x, self.w), self.b) #y= w * x+ b

    mse = tf.reduce_mean(tf.squared_difference(self.y_prec, self.y))#求真实值和预测值的差平方
    l2 = tf.reduce_mean(tf.square(self.w))#求权重的平均值
    self.loss = mse + 0.15*l2 #损失函数
    self.train_step = tf.train.AdamOptimizer(0.1).minimize(self.loss) # 创建一个优化器,学习率为0.1,对数据进行分步训练

  def train(self,x_train,y_train,x_test,y_test):
    self._datas = x_train
    self._labels = y_train
    self._num_datas = x_train.shape[0]
    for i in range(5000):
        batch = self.next_batch(100)#每次选取100个样本
        self.sess.run(self.train_step,feed_dict={self.x:batch[0],self.y:batch[1]})
        if i%10 == 0:
            train_loss = self.sess.run(self.loss,feed_dict={self.x:batch[0],self.y:batch[1]})#开始训练,每隔10步,打印一下现在的损失函数
            print('step %d,test_loss %f' % (i,train_loss))

  def predict_batch(self,arr,batch_size):
    for i in range(0,len(arr),batch_size):
        yield arr[i:i + batch_size]#一次性取batch_size个数据

  def predict(self, x_predict):
    pred_list = []
    for x_test_batch in self.predict_batch(x_predict,100):
      pred = self.sess.run(self.y_prec, {self.x:x_test_batch})#对测试数组进行批量预测
      pred_list.append(pred)#将预测的值添加到列表中
    return np.vstack(pred_list)#返回列表

训练模型并和 sklearn 库线性回归模型对比

示例代码:

现在您可以在 /home/ubuntu 目录下创建源文件 run.py,内容可参考: 示例代码:/home/ubuntu/run.py

#!/usr/bin/python
# -*- coding: utf-8 -*

from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
from sklearn.datasets import make_regression
from sklearn.linear_model import LinearRegression
from linear_regression_model import linearRegressionModel as lrm

if __name__ == '__main__':
    x, y = make_regression(7000) # X为样本特征,y为样本输出, 共7000个样本,每个样本1个特征
    x_train,x_test,y_train, y_test = train_test_split(x, y, test_size=0.5) #将样本分为测试数据和训练数据
    y_lrm_train = y_train.reshape(-1, 1)#调整矩阵的维数为1列矩阵
    y_lrm_test = y_test.reshape(-1, 1)#调整矩阵的维数为1列矩阵

    linear = lrm(x.shape[1])
    linear.train(x_train, y_lrm_train,x_test,y_lrm_test)#调用刚刚编写的tensorFlow的线性回归模型进行训练
    y_predict = linear.predict(x_test)#对测试数据进行测试,求出预测值
    print("Tensorflow R2: ", r2_score(y_predict.ravel(), y_lrm_test.ravel()))#将预测值与真实值做一个比较,求出正确率

    lr = LinearRegression()
    y_predict = lr.fit(x_train, y_train).predict(x_test)#用python自带的sklearn库进行训练
    print("Sklearn R2: ", r2_score(y_predict, y_test)) #采用r2_score评分函数

然后执行:

cd /home/ubuntu;
python run.py

执行结果:

step 2410,test_loss 26.531937
step 2420,test_loss 26.542793
step 2430,test_loss 26.533974
step 2440,test_loss 26.530540
step 2450,test_loss 26.551474
step 2460,test_loss 26.541542
step 2470,test_loss 26.560783
step 2480,test_loss 26.538080
step 2490,test_loss 26.535666
('Tensorflow R2: ', 0.99999612588302389)
('Sklearn R2: ', 1.0)

完成

   任务时间:时间未知

恭喜,您已完成本实验内容

参考文档:https://blog.csdn.net/geyunfei_/article/details/78782804

https://cloud.tencent.com/developer/labs/lab/10191