V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
hplar
V2EX  ›  程序员

150 行代码,用 numpy 手写了一个简单的神经网络

  •  1
     
  •   hplar · 2019-07-12 14:22:54 +08:00 · 1991 次点击
    这是一个创建于 1961 天前的主题,其中的信息可能已经有所发展或是发生改变。
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import numpy as np
    from numpy import linalg as LA
    
    class Activation:
        def f(self, x, **args):
            raise NotImplementedError("Should have implemented this")
    
        def grad(self, y, dy):
            raise NotImplementedError("Should have implemented this")
    
    class Linear(Activation):
        def f(self, x):
            return x
    
        def grad(self, y, dy):
            return dy
    
    class Sigmoid(Activation):
        def f(self, x):
            return 1/(1+np.exp(-x))
    
        def grad(self, y, dy):
            return y*(1-y)*dy
    
    class Relu(Activation):
        def f(self, x):
            return x*(x>0)
    
        def grad(self, y, dy):
            return dy*(y>0)
    
    class Softmax(Activation):
        def f(self, x, axis=1):
            x = x-np.max(x, axis=axis, keepdims=True)
            return np.exp(x)/np.sum(np.exp(x), axis=axis, keepdims=True)
    
        def grad(self, y, dy):
            return y/(y.shape[0])+y*dy
    
    class Dense:
        activation_map = {
            'relu': Relu,
            'softmax': Softmax,
            'sigmoid': Sigmoid,
            'linear': Linear,
        }
        def __init__(self, output_dim, input_dim=0, activation='relu'):
            self.output_dim = output_dim
            self.input_dim = input_dim
            if activation in self.activation_map:
                self.activation = self.activation_map[activation]()
            else:
                raise Exception('activation %s not implemented' % activation)
    
        def initialize_parameter(self):
            self.w = np.random.randn(self.input_dim, self.output_dim)*np.sqrt(6/(self.input_dim+self.output_dim))
            self.b = np.zeros((1, self.output_dim))
    
        def initialize_optimizer(self, optimizer, l_rate):
            self.optimizer = optimizer
            self.l_rate = l_rate
            if self.optimizer == 'adam':
                self.t, self.s_w, self.r_w, self.s_b, self.r_b = 0, 0, 0, 0, 0
                self.rho1, self.rho2, self.delta = 0.9, 0.999, 1e-8
            else:
                raise Exception('optimizer %s not implemented' % self.optimizer)
    
        def forward(self, x):
            self.x = x
            self.h = np.dot(self.x, self.w)+self.b
            self.a = self.activation.f(self.h)
            return self.a
    
        def backward(self, da):
            self.da = da
            self.dh = self.activation.grad(self.a, self.da)
            self.dw = np.dot(self.x.T, self.dh)
            self.db = (1/self.x.shape[0])*np.sum(self.dh, axis=0, keepdims=True)
            self.dx = np.dot(self.dh, self.w.T)
            return self.dx
    
        def update_parameter(self):
            if self.optimizer == 'adam':
                self.t = self.t+1
                self.s_w = self.rho1*self.s_w+(1-self.rho1)*self.dw
                self.r_w = self.rho2*self.r_w+(1-self.rho2)*(self.dw**2)
                s_w_ = self.s_w/(1-self.rho1**self.t)
                r_w_ = self.r_w/(1-self.rho2**self.t)
                self.w = self.w-self.l_rate*s_w_/(np.sqrt(r_w_)+self.delta)
                self.s_b = self.rho1*self.s_b+(1-self.rho1)*self.db
                self.r_b = self.rho2*self.r_b+(1-self.rho2)*(self.db**2)
                s_b_ = self.s_b/(1-self.rho1**self.t)
                r_b_ = self.r_b/(1-self.rho2**self.t)
                self.b = self.b-self.l_rate*s_b_/(np.sqrt(r_b_)+self.delta)
            else:
                raise Exception('optimizer %s not implemented' % self.optimizer)
    
    class Sequential:
        def __init__(self):
            self.layers = []
            self.loss = 'categorical_crossentropy'
            self.optimizer = 'adam'
    
        def add(self, layer):
            self.layers.append(layer)
    
        def compile(self, loss='categorical_crossentropy', optimizer='adam', l_rate=0.001):
            self.loss = loss
            self.optimizer = optimizer
            for idx in range(len(self.layers)-1):
                self.layers[idx+1].input_dim = self.layers[idx].output_dim
            for layer in self.layers:
                layer.initialize_optimizer(optimizer, l_rate)
    
        def forward_propagation(self, x, y):
            for layer in self.layers:
                a = layer.forward(x)
                x = a
            if self.loss == 'categorical_crossentropy':
                loss = -(1/y.shape[0])*np.sum(np.log(a)*y)
            elif self.loss == 'mse':
                loss = 0.5*(1/y.shape[0])*np.square(LA.norm(a-y))
            else:
                raise Exception('loss %s not implemented' % self.loss)
            return a, loss
    
        def backward_propagation(self, a, y):
            if self.loss == 'categorical_crossentropy':
                da = -(1/y.shape[0])*(y/a)
            elif self.loss == 'mse':
                da = (1/y.shape[0])*(a-y)
            else:
                raise Exception('loss %s not implemented' % self.loss)
            for layer in self.layers[::-1]:
                da = layer.backward(da)
                layer.update_parameter()
    
        def fit(self, x, y, epochs=10, batch_size=200):
            for layer in self.layers:
                layer.initialize_parameter()
            batch_count = int(x.shape[0]/batch_size)
            for i in range(epochs):
                for j in range(batch_count):
                    start, end = j*batch_size, (j+1)*batch_size
                    a, _ = self.forward_propagation(x[start:end], y[start:end])
                    self.backward_propagation(a, y[start:end])
                _, loss = self.forward_propagation(x, y)
                print("epoch %d/%d: loss %f" % (i+1, epochs, loss))
    
        def print_parameters(self):
            for idx, layer in enumerate(self.layers):
                print('layer %d parameters:' % (idx+1))
                print(layer.w, layer.b)
    
    # A simple linear regression demo
    if __name__ == '__main__':
        w, b = np.array([[1.0], [2.0], [3.0]]), 5
        x = np.random.randn(300, 3)*100
        noise = np.random.randn(300, 1)*0.1
        y = np.dot(x, w)+noise+b
        model = Sequential()
        model.add(Dense(1, input_dim=3, activation='linear'))
        model.compile(loss='mse', optimizer='adam')
        model.fit(x, y, epochs=5000, batch_size=100)
        model.print_parameters()
    
    4 条回复    2019-07-12 21:51:53 +08:00
    2pen
        1
    2pen  
       2019-07-12 14:29:41 +08:00
    看不懂 再贱
    shyrock
        2
    shyrock  
       2019-07-12 15:35:32 +08:00
    不错。适合教学。
    owenliang
        3
    owenliang  
       2019-07-12 16:06:41 +08:00   ❤️ 1
    还好,有一本薄薄的书讲的非常清楚,看完就能实现,请参考我的博客:。

    yuerblog.cc/2019/04/14/%e6%89%8b%e5%86%99python%e7%a5%9e%e7%bb%8f%e7%bd%91%e7%bb%9c/
    bwael
        4
    bwael  
       2019-07-12 21:51:53 +08:00
    @owenliang 好书,同荐
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1344 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 17:40 · PVG 01:40 · LAX 09:40 · JFK 12:40
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.