江湖十年

江湖十年 查看完整档案

杭州编辑  |  填写毕业院校  |  填写所在公司/组织 jianghushinian.cn/ 编辑
编辑

野生程序员·博客地址:https://jianghushinian.cn/

个人动态

江湖十年 发布了文章 · 4月9日

Python WSGI 规范

作为 Python Web 开发者来说,在开发程序阶段一般是不会接触到 WSGI 这个名词的,但当程序开发完成,考虑上线部署的时候,WSGI 规范是一个绕不开的话题,本文将介绍何为 WSGI。

WSGI 全拼 Web Server Gateway Interface,是为 Python 语言定义的 Web 服务器和 Web 应用程序(或框架)之间的一种通用编程接口。翻译成白话就是说 WSGI 是一个协议,就像 HTTP 协议定义了客户端和服务端数据传输的规范,WSGI 协议定义了 Web 服务器和 Web 应用程序之间协同工作的规范。

Python Web 应用部署方案

FlaskDjango 等 Web 框架都提供了内置的 Web Server,本地开发阶段可以使用 flask runpython manage.py runserver 来分别启动 FlaskDjango 内置的 Server。

在生产环境部署应用时,通常不会使用框架内置的 Server,而是使用 GunicornuWSGI 来部署,以获得更好的性能。部署过 Python Web 应用的同学应该对如下部署架构有所了解,左侧是浏览器,右侧是服务器。在服务器内部,首先通过 Nginx 来监听 80/443 端口,当接收到来自客户端的请求时,Nginx 会将请求转发到监听 5000 端口的 Gunicorn/uWSGI Server,接着请求会通过 WSGI 协议被传递到 Flask/Django 框架,在框架内部处理请求逻辑后,会将响应信息按照原路返回。
Python Web Server Deploy

你可能会问,Nginx 性能很高,为什么不将应用直接部署到 Nginx 上,而是中间通过 Gunicorn/uWSGI 做一层转发呢?因为 Nginx 没有遵循 WSGI 规范,并不能像 Gunicorn/uWSGI 这样很容易的与 Flask/Django 框架结合起来。

WSGI 规范

根据 Python Web 应用部署架构,我们知道了 WSGI 所处的位置,接下来看下 WSGI 规范具体定义了哪些内容。

如同 HTTP 协议有一个客户端和一个服务端,WSGI 协议有一个 Application 端和一个 Server 端,其中 Application 就是指 FlaskDjango 这些 Web 框架,而 Server 就是指 GunicornuWSGI 等 Web 服务器。

WSGI 协议规定 Application 端需要实现成一个可调用对象(函数、类等),其接口如下:

def simple_app(environ, start_response):
    status = '200 OK'
    response_headers = [('Content-type', 'text/plain')]
    start_response(status, response_headers)
    return ['Hello world!\n']

simple_app 就是一个最简单的 Application,它需要接收两个参数,environ 是一个 dict,其中保存了所有 HTTP 请求相关的信息,由 Server 端提供,start_response 是一个可调用对象,同样由 Server 端提供,simple_app内部需要调用一次 start_response,并将 状态码响应头 当作参数传递给它,simple_app 最终会返回一个可迭代对象作为 HTTP Body 内容返回给客户端。

我们已经知道了 Application 端接口,接下来看下一个符合 WSGI 协议的 Server 端实现:

import os


def wsgi_server(application):
    environ = dict(os.environ.items())

    def start_response(status, response_headers):
        print(f'status: {status}')
        print(f'response_headers: {response_headers}')

    result = application(environ, start_response)
    for data in result:
        print(f'response_body: {data}')

示例中 Server 端同样使用函数来实现,wsgi_server 接收一个 application 作为参数,在其内部构造了 environstart_response 两个对象,这里使用环境变量信息来模拟 HTTP 请求信息构造 environ 字典,start_response 同样被定义为一个函数,供 application 在内部对其进行调用,wsgi_server 函数最后会调用 application 并对其进行打印。

现在有了 Application 端和 Server 端,我们可以来测试一下这个简单的 WSGI 程序示例。只需要将 simple_app 作为参数传递给 wsgi_server 并调用 wsgi_server 即可:

wsgi_server(simple_app)

执行以上代码,将得到如下打印:

status: 200 OK
response_headers: [('Content-type', 'text/plain')]
response_body: Hello world!

以上,我们分别实现了符合 WSGI 规范的 Application 端和 Server 端,虽然程序看起来比较简陋,但不论多么复杂的 Python Web 框架和 Server 都同样遵循此规范。

WSGI 实际应用

学习了 WSGI 规范,我们可以来验证下平时使用的 Python Web 框架是否真的遵循此规范,这里以 Flask 框架源码为例,可以在 https://github.com/pallets/flask/blob/master/src/flask/app.py 查看 Flask 的定义:

class Flask(Scaffold):
    ...

    def __call__(self, environ, start_response):
        """The WSGI server calls the Flask application object as the
        WSGI application. This calls :meth:`wsgi_app`, which can be
        wrapped to apply middleware.
        """
        return self.wsgi_app(environ, start_response)

Flask 类内部通过实现 __call__ 方法,使得 Flask 实例对象成为一个可调用对象,其接口实现同样符合 WSGI Application 规范。

首发地址:https://jianghushinian.cn/

查看原文

赞 0 收藏 0 评论 0

江湖十年 发布了文章 · 2月22日

Python 可变类型作为函数默认参数时的副作用

在 Python 中定义函数时,可以为其指定 默认参数,这样就不必在每次调用函数时都传递参数进去,并且可以简化我们的代码。

在定义函数时,如果使用了 可变类型 来作为函数的 默认参数,往往会产生一些副作用。来看下面一段代码。

def foo(li=[]):
    li.append(1)
    print(li)

foo()
foo()
foo()

你可能想得到如下的结果:

[1]
[1]
[1]

但实际上,结果却是:

[1]
[1, 1]
[1, 1, 1]

根据结果来看,似乎每次的函数调用,li 列表都记录了上一次的调用结果,而并没有使用默认参数 []。但是当我们在每次调用函数时,都传递一个空列表 [] 进去,就不会出现副作用,能够正确得到我们想要的结果。

def foo(li=[]):
    li.append(1)
    print(li)

foo([])
foo([])
foo([])
[1]
[1]
[1]

为什么会产生这样的结果呢?其实这个问题的原因是 Python 中函数的特性所决定的。由于函数也是 对象,而 对象 可以有自己的属性,所以函数也有自己的属性。

foo 函数被创建时,它的 默认参数 就已经被保存在它的 __defaults__ 属性中了。而函数只会被创建一次,以后每次执行 foo() 的时候,都只会调用函数,并不会去重新创建一个函数。所以函数的 默认参数 也只会计算一次,无论之后被调用多少次,默认参数 始终都是同一个 对象

我们用 id() 函数打印出默认参数的内存地址就能够看出来。

def foo(li=[]):
    li.append(1)
    print(li, id(li))

foo()
foo()
foo()
[1] 48904632
[1, 1] 48904632
[1, 1, 1] 48904632

可以看出,三次调用函数后,其内部打印的 li 地址是相同的,所以它们其实是同一个 对象

知道了问题的原因,那么如果解决它呢?我们可以用 None 来作为函数的 默认参数,在调用 foo 函数时,在函数体内部可以通过判断 li 是否为 None 来决定是否需要使用 默认值。这样,当调用 foo() 函数并且没有传递参数时,再给 li 赋一个 默认值 即可。

def foo(li=None):
    if li is None:
        li = []
    li.append(1)
    print(li)

foo()
foo()
foo()
[1]
[1]
[1]

这样既不用担心 可变类型 作为函数 默认参数 时的副作用,也不用在每次调用 foo 函数时都传递一个参数进去,能够很好的解决这个问题。

所以,当我们给函数定义 默认参数 时,应该尽量使用 不可变类型 以免产生意想不到的副作用。当然,除非你明确知道你需要用 可变类型 的特性来达到某些目的。

查看原文

赞 0 收藏 0 评论 0

江湖十年 发布了文章 · 2月18日

Python 运算符 + 与 +=

Python 中如果需要对一个变量进行增量运算通常有两种写法,a = a + ba += b。虽然两种方法能够得到同样的结果,但两者却并不完全等价。

加法运算符 +

对于 a = a + b 这条语句来说,实际上执行了两步操作,首先执行 a + b 求出两者的 ,然后再将结果通过赋值运算符 = 赋值给变量 a

来看下面的示例:

>>> a = 100
>>> b = 200
>>> id(a)
1796532544
>>> a = a + b
>>> a
300
>>> id(a)
17756816
>>> a = [11, 22, 33]
>>> b = [44, 55, 66]
>>> id(a)
54675856
>>> a = a + b
>>> a
[11, 22, 33, 44, 55, 66]
>>> id(a)
54676416

以上分别使用 Python 中的 不可变类型可变类型 各列举了一个示例,并且在执行 a = a + b 语句的前后分别打印了变量 aid,可以看到无论对于 不可变类型 还是 可变类型,最终变量 aid 值都会改变,说明变量 a 在执行 a = a + b 以后指向了一片新的内存地址。这也比较好理解,因为 a 已经被重新赋值了。

增量赋值运算符 +=

对于运算符 +=,我们通常管它叫作 增量赋值运算符,因为它即完成了 相加 操作,又完成了 赋值 操作。

同样,我们还是分别用 intlist 两种数据类型来做演示:

>>> a = 100
>>> b = 200
>>> id(a)
1796532544
>>> a += b
>>> a
300
>>> id(a)
17756784
>>> a = [11, 22, 33]
>>> b = [44, 55, 66]
>>> id(a)
48777616
>>> a += b
>>> a
[11, 22, 33, 44, 55, 66]
>>> id(a)
48777616

对于 不可变类型 的操作,a += b 表现出来的结果和 a = a + b 相同。而对于 可变类型 的操作却并不完全一样。虽然最终变量 a 相同,但执行 += 操作后,变量 a 的内存地址并没有改变,也就是说 += 操作对于 可变类型 来说实际上是 就地更改。对于 list 的操作,实际上 a += b 等价于 a.extend(b)

__add____iadd__ 方法

实际上,在 Python 中,加法运算符 + 对应着对象的 __add__ 方法,增量赋值运算符 += 对应着对象的 __iadd__ 方法。

无论对于 不可变类型 还是 可变类型,当执行 a = a + b 时,都会调用 a__add__ 方法。而对于 a += b 的操作来说,当 a不可变类型 时同样会调用 a__add__ 方法,当 a可变类型 时会调用 a__iadd__ 方法进行 就地更改,如果 a 没有实现 __iadd__ 方法,那么才调用 a__add__ 方法。

我们可以用 Python 内置的 hasattr 函数来验证上面的说法。

>>> hasattr(int, '__add__')
True
>>> hasattr(int, '__iadd__')
False
>>> hasattr(list, '__add__')
True
>>> hasattr(list, '__iadd__')
True

对于 不可变类型 来说,因为对象本身不可变,如果做相加操作必然会创建新的对象,所以也就没有 __iadd__ 方法。而对于 可变类型 来说,对象本身可变,所以实现了 __iadd__ 方法。

在我们自己定义的类型中如果需要实现以上两个方法,也要遵循 Python 现有的规范,一定要注意自己实现的类型是否可变,根据类型再来确定是否需要实现 __iadd__ 方法。

查看原文

赞 0 收藏 0 评论 0

江湖十年 发布了文章 · 2月11日

Python 中对象的比较操作 == 与 is

Python 中对象的比较有两种方式 ==is。两种方式都能判断操作符两侧的变量值是否相等,那么它们的区别是什么呢?通过下面的介绍我们来一探究竟。

比较操作符通常用于条件语句,如下示例:

if a == b:
    pass
if a is False:
    pass

==is 的区别

== 操作符比较对象的值是否相等。小明有一块 劳力士 手表,小李也有一块同款 劳力士 手表,这时我们就认为这两块手表相等。

小明的手表 = 劳力士
小李的手表 = 劳力士
小明的手表 == 小李的手表

is 操作符比较对象的身份标识是否相等,即对象在内存中的地址是否相同,如果两个对象的身份标识相等,就说明它们是同一个对象。小明的爸爸称呼小明叫 儿子,小明的女朋友称呼小明叫 老公,但这两个称呼都代表 小明 这个人,即为同一个对象。

爸爸的儿子 = 小明
女朋友的老公 = 小明
爸爸的儿子 is 女朋友的老公

接下来就用代码来展示一下 ==is 的区别:

>>> a = [1, 2, 3]
>>> b = [1, 2, 3]
>>> a == b
True
>>> a is b
False
>>> id(a)
24603664
>>> id(b)
24603144
>>> a = [1, 2, 3]
>>> b = a
>>> a == b
True
>>> a is b
True
>>> id(a)
24604144
>>> id(b)
24604144

在 Python 中 id 函数接收一个对象作为参数,并返回该对象在内存中的地址。
由以上代码可以分析出:== 操作符只比较两个对象的值是否相等,但不比较两个对象是否为同一个对象;而 is 操作符并不是比较两个对象的值是否相等,而是会确认两个对象是否为同一个对象,如果为同一个对象,那么它们的值自然相等。

Python 小整数对象池

以上两段代码已经能够体现出 ==is 的区别,不过 Python 中也有一些特殊情况,来看下面例子:

>>> a = 5
>>> b = 5
>>> a == b
True
>>> a is b
True
>>> id(a)
1730274128
>>> id(b)
1730274128
>>> a = 257
>>> b = 257
>>> a == b
True
>>> a is b
False
>>> id(a)
48558688
>>> id(b)
48558720

以上代码看起来就很怪异了,同样的比较操作,只是换了一个数字结果就不同了。
其实出现以上结果的原因在于 Python 自身。Python 出于性能上的考虑,在解释器启动的时候就已经将 -5256 的整数创建到内存中了。而当我们需要创建值在 -5256int 数字的时候,Python 并不会新开辟一块内存去创建数字,而是直接将已存在的对象返回。
但是如果新创建的数字不在这个范围,Python 就会为每个变量单独开辟自己的内存空间。

Python intern 机制

再来看下面关于字符串比较的例子:

>>> a = 'hello world'
>>> b = 'hello world'
>>> a == b
True
>>> a is b
False
>>> id(a)
49465408
>>> id(b)
49465448
>>> a = 'hello'
>>> b = 'hello'
>>> a == b
True
>>> a is b
True
>>> id(a)
49429152
>>> id(b)
49429152

想必根据之前数字比较的例子,你大概也能猜测到以上代码结果不同的原因了。事实上,以上结果同样是 Python 出于对性能的考虑,不过这次 Python 并没有预先将 hello 字符串创建到内存中,而是使用了一种叫 intern 的机制。
关于 intern 机制在这里我们不去深究,以后有机会专门写一篇博客来介绍。总之你需要知道在某些场景下,Python 会对字符串开启 intern 机制来提高性能,从而导致出现上面示例代码的结果。

==is 各自的适用场景

什么时候用 ==、什么时候用 is 呢?
当我们需要比较一个变量与一个 单例 的时候,应该使用 is,其他情况通常使用 ==
例如拿一个变量去跟 TrueFalse 进行比较的时候就应该使用 is,因为用 is 的比较的速度要比用 == 更快。
is 比较对象的时候,只需要判断它们是否处于同一块内存地址即可,而用 == 比较更慢的原因在于当用 == 去比较对象的时候会调用对象的 __eq__() 方法,而 __eq__() 方法通常会被重载,执行其内部逻辑往往会多花一些时间。

以下就是一个重载对象 __eq__() 方法的例子:

class MyList(object):
    def __init__(self, *args):
        self._list = [*args]

    def __eq__(self, other):
        result = False
        for i in self._list:
            for j in other._list:
                if i == j:
                    break
            else:
                break
        else:
            result = True
        return result

li_1 = MyList(1, 2, 3)
li_2 = MyList(1, 2, 3)
print(li_1 == li_2)  # True

你可以自行尝试修改 __eq__() 方法内部的逻辑来观察其结果。

查看原文

赞 0 收藏 0 评论 0

江湖十年 发布了文章 · 2月9日

Python for...else... 语句

条件语句在编程语言中再寻常不过了,只要你需要编写程序,那么几乎一定会用到 if ... else ... 这样的条件语句进行条件判断。所以通用编程语言都会提供 if ... else ... 这样的语法。Python 也不例外,不过 Python 不只有 if ... else ... 这样的语法,它还提供了 for ... else ...、try ... except ... else ... 等这样的语法。今天我们只讨论 for ... else ... 语法,来看看 Python 的 else 语句跟在 for 后面与跟在 if 后面有什么异同。

先看一段代码示例:

for i in [1, 2, 3]:
    print(i)
else:
    print('done')

执行结果:

1
2
3
done

如果不执行这段代码,只看这段代码的写法,也许你会猜测它的执行结果应该只有 1、2、3 三个数,而没有 done。因为在 if ... else ... 的逻辑里,如果 if 的条件为真,那么就不会执行 else 语句下的代码块。
但根据打印结果来看,显然不符合我们的预期。带着疑问,我们再看下面一段示例代码:

for i in []:
    print(i)
else:
    print('done')

执行结果:

done

这次的结果只有一个 done,貌似这段代码看起来更合理些。因为 for 遍历的列表为空,所以条件不成立,那么就该执行 else 的逻辑,真的是这样吗?我们接着看下面示例。
我们知道 for 语句块中可以出现 continuebreak 两个关键字,那么我们先来测试一下,如果 for 语句块中包含 continue 会是什么结果。

for i in [1, 2, 3]:
    if i == 2:
        continue
    print(i)
else:
    print('done')

执行结果:

1
3
done

这个结果和第一段代码示例差不多,只不过在 i == 2 的时候,跳过了本次循环,继续执行下一次循环。
最后我们再来看看如果 for 语句块中包含 break 会出现什么结果。

for i in [1, 2, 3]:
    if i == 2:
        break
    print(i)
else:
    print('done')

执行结果:

1

根据上面这段代码的打印结果,我想你也许已经明白了 Python 中 for ... else ... 语句的执行特点。没错,实际上如果按照 if ... else ... 是互斥的逻辑来理解 for ... else ...,那么其实 for 语句块内部的 break 才真正跟 else 是一对条件语句。
如果 for 循环中执行了 break 语句,那么就不会再执行 else 语句块内部的代码了。

知道了 Python 的 for ... else ... 语句的语法,但是它有什么用呢?我最开始接触 for ... else ... 语句的时候觉得它完全没用,以至于我写了很久的 Python 代码都没有用到过,甚至几乎已经忘记了 Python 提供的这个语法。直到我开始写 ERP 项目的时候,我才发现 for ... else ... 的真正用途。因为写 ERP 系统会遇到各种复杂的业务需求,如果能够用 for ... else ... 解决问题,代码逻辑看起来就会清晰不少,这大大增加了项目的可维护性。

假如用 for 循环遍历一个可迭代对象,如果可迭代对象内部有一个元素符合条件,那么就执行一个逻辑,然后 break 掉,如果全部都不符合,就执行 else 的语句去执行另一个逻辑,示例代码如下:

for i in 可迭代对象:
    if 条件判断:
        执行一个逻辑
        break
else:
    执行另一个逻辑

这种代码显然比下面这种不用 for ... else ... 的写法可读性更强,并且更加符合 Pythonic 的写法。

flag = False
for i in 可迭代对象:
    if 条件判断:
        执行一个逻辑
        flag = True
        break

if not flag:
    执行另一个逻辑

以上,就是我对 for ... else ... 的理解以及使用体会,希望在你遇到同样的问题时,能够对你有所帮助。

查看原文

赞 0 收藏 0 评论 0

江湖十年 发布了文章 · 2月4日

用 Python 实现一个简易版 HTTP 客户端

此文为《用 Python 撸一个 Web 服务器》系列教程的一个补充,这个系列教程介绍了如何使用 Python 内置的 socket 库实现一个简易版的 Web 服务器。

之所以写这篇文章,是因为我发现很多人并不清楚 HTTP 客户端的概念,以为只有浏览器才叫 HTTP 客户端。事实上并非如此,我们在 Web 开发中常见的 Postman爬虫程序curl 命令行工具 等,这些都可以称为 HTTP 客户端。

服务器程序示例

这里以一个 Hello World 程序来作为示例服务器,实现如下:

# server.py

import socket
import threading


def process_connection(client):
    """处理客户端连接"""
    # 接收客户端发来的数据
    data = b''
    while True:
        chunk = client.recv(1024)
        data += chunk
        if len(chunk) < 1024:
            break

    # 打印从客户端接收的数据
    print(f'data: {data}')
    # 给客户端发送响应数据
    client.sendall(b'HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n<h1>Hello World</h1>')

    # 关闭客户端连接对象
    client.close()


def main():
    # 创建 socket 对象
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 允许端口复用
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # 绑定 IP 和端口
    sock.bind(('127.0.0.1', 8000))
    # 开始监听
    sock.listen(5)

    while True:
        # 等待客户端请求
        client, addr = sock.accept()
        print(f'client type: {type(client)}\naddr: {addr}')

        # 创建新的线程来处理客户端连接
        t = threading.Thread(target=process_connection, args=(client,))
        t.start()


if __name__ == '__main__':
    main()

服务器端程序不做过多解释,如有不明白的地方可以参考 用 Python 撸一个 Web 服务器-第2章:Hello-World 一节。

极简客户端

知道了如何用 socket 库实现服务器端程序,那么理解客户端程序的实现就非常容易了。客户端程序代码实现如下:

# client.py

import socket

# 创建 socket 对象
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 指定服务器 IP 和端口,进行连接
sock.connect(('127.0.0.1', 8000))
# 向 URL "/" 发送 GET 请求
sock.send(b'GET / HTTP/1.1\r\nHost: 127.0.0.1:8000\r\n\r\n')

# 接收服务端响应数据
data = b''
while True:
    chunk = sock.recv(1024)
    data += chunk
    if len(chunk) < 1024:
        break
# 打印响应数据
print(data)

# 关闭连接
sock.close()

相对来说客户端程序要简单一些,创建 socket 对象的代码与服务器端程序并无差别,客户端 socket 对象根据 IP 和端口来连接指定的服务器,建立好连接后就可以发送数据了,这里根据 HTTP 协议构造了一个针对 / URL 路径的 GET 请求,为了简单起见,请求头中仅携带了 HTTP 协议规定的必传字段 Host,请求发送成功后便可以接收服务器端响应,最后别忘了关闭 socket连接。

仅用几行代码,我们就实现了一个极简的 HTTP 客户端程序,接下来对其进行测试。

首先在终端中使用 Python 运行服务器端程序:python3 server.py。然后在另一个终端中使用 Python 运行客户端程序:python3 client.py

可以看到客户端打印结果如下:

b'HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n<h1>Hello World</h1>'

以上,我们实现了一个极简的 HTTP 客户端。

参考 requests 实现客户端

用 Python 写过爬虫的同学,一定听说或使用过 requests 库,以下是使用 requests 访问 Hello World 服务端程序的示例代码:

# demo_requests.py

import requests

response = requests.request('GET', 'http://127.0.0.1:8000/')
print(response.status_code)  # 响应状态码
print('--------------------')
print(response.headers)  # 响应头
print('--------------------')
print(response.text)  # 响应体

在终端中使用 python3 demo_requests.py 运行此程序,将打印如下结果:

200
--------------------
{'Content-Type': 'text/html'}
--------------------
<h1>Hello World</h1>

接下来修改我们上面实现的极简 HTTP 客户端程序,使其能够支持 response.status_coderesponse.headersresponse.text功能。

# client.py

import socket
from urllib.parse import urlparse


class HTTPClient(object):
    """HTTP 客户端"""

    def __init__(self):
        # 创建 socket 对象
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 初始化数据
        self.status_code = 200
        self.headers = {}
        self.text = ''

    def __del__(self):
        # 关闭连接
        self.sock.close()

    def connect(self, ip, port):
        """建立连接"""
        self.sock.connect((ip, port))

    def request(self, method, url):
        """请求"""
        # URL 解析
        parse_result = urlparse(url)
        ip = parse_result.hostname
        port = parse_result.port or 80
        host = parse_result.netloc
        path = parse_result.path
        # 建立连接
        self.connect(ip, port)
        # 构造请求数据
        send_data = f'{method} {path} HTTP/1.1\r\nHost: {host}\r\n\r\n'.encode('utf-8')
        # 发送请求
        self.sock.send(send_data)
        # 接收服务端响应的数据
        data = self.recv_data()
        # 解析响应数据
        self.parse_data(data)

    def recv_data(self):
        """接收数据"""
        data = b''
        while True:
            chunk = self.sock.recv(1024)
            data += chunk
            if len(chunk) < 1024:
                break
        return data.decode('utf-8')

    def parse_data(self, data):
        """解析数据"""
        header, self.text = data.split('\r\n\r\n', 1)
        status_line, header = header.split('\r\n', 1)
        for item in header.split('\r\n'):
            k, v = item.split(': ')
            self.headers[k] = v
        self.status_code = status_line.split(' ')[1]


if __name__ == '__main__':
    client = HTTPClient()
    client.request('GET', 'http://127.0.0.1:8000/')
    print(client.status_code)
    print('--------------------')
    print(client.headers)
    print('--------------------')
    print(client.text)

代码实现比较简单,我写了较为详细的注释,相信你能够看懂。其中使用了内置函数 urlparse ,此函数能够根据 URL 格式规则将 URL 拆分成多个部分。

在终端中使用 python3 client.py 运行此程序,打印结果与使用 requests 的结果完全相同。

200
--------------------
{'Content-Type': 'text/html'}
--------------------
<h1>Hello World</h1>

仅用几十行代码,我们就实现了一个简易版的 HTTP 客户端程序,并且还实现了类似 requests 库的功能。

接下来你可以尝试用它去访问现实世界中真实的 URL,比如访问 http://httpbin.org/get,看看打印结果如何。

P.S.

Web 开发本质是围绕着 HTTP 协议进行的,HTTP 协议是 Web 开发的基石。所以对于何为 HTTP 服务端、何为 HTTP 客户端的概念不够清晰的话,实际上都是对 HTTP 协议不够理解。

最后,给大家留一道作业题,实现 requests 库的 response.json() 方法。

联系我:

查看原文

赞 0 收藏 0 评论 0

江湖十年 发布了文章 · 2月3日

用 Python 撸一个 Web 服务器-第9章:项目总结

项目总结

本教程带大家一起实现了一个 Todo List 程序,包含基础的增删改查功能,和用户登录认证。这也是 Web 开发中最常见的需求。

我画了一张思维导图,帮助你从宏观的角度来概览 Todo List 程序,加深你对 Web 开发的理解。

项目整体思路参考 MVC 设计模式。有意设计 utils/ 目录专门用来存储一些工具方法,这些工具方法通常比较独立,尽量不从外部模块导入依赖,其他模块需要某个工具方法时从 utils/ 目录下导入即可使用,这样就避免了循环导入的问题。

诚然,Todo List 程序还有很多待完善的地方,它不符合 Python 界的 WSGI 规范,也没有考虑 Web 安全防范、性能等方面的问题,项目可能看起来比较 Low。但这足以说明 Web 开发的本质,无论多么复杂的 Web 程序,都离不开这些底层原理。

结束 or 开始

九篇文章,带你撸了一个 Web 服务器,希望让读者能够通过这个微小的 Web 程序原型明白 Web 服务器的工作原理。只有明白基本原理,再去学习一些概念、框架才会得心应手。

教程到这里就结束了,但对于 Web 开发才刚刚开始,Web 开发还有非常多的知识等着我们去学习探索。每一段旅程的结束都是下一段旅程的开始,愿你在学习技术的道路上不再孤独。

后记

作者编程水平有限,教程中可能有表达不够准确或错误的地方,欢迎读者能提出宝贵意见。

希望读者读完此系列文章能够有所收获。

联系我:

查看原文

赞 0 收藏 0 评论 0

江湖十年 发布了文章 · 2月2日

用 Python 撸一个 Web 服务器-第8章:用户管理

用户登录原理

用户登录与注册功能几乎已成为 Web 应用的标配。所以我们有必要给 Todo List 程序增加一个用户管理模块,以此来学习用户登录原理。

HTTP 协议是无状态的,这意味着每个完整的 HTTP 请求 —— 响应过程都是相对独立的,Web 服务器无法分辨前后两次连续请求是否为同一个用户(客户端)发送过来的。为了让服务器能够记住用户,就有了一种叫作 Cookie 的技术。我画了一张图来描述 Cookie 的工作过程:

首先浏览器向服务器 / 路径发送了一个 GET 请求,服务器返回响应给浏览器时,在响应头中设置了两个键为 Set-Cookie 的响应首部字段,接下来浏览器收到 Set-Cookie 响应首部字段后会自动将其保存下来,在之后的所有请求中浏览器都会自动加上 Cookie 请求首部字段。

这就是 Cookie 的大致工作原理,总结起来 Cookie 有如下几个特点:

  • Cookie 需要服务器和浏览器共同配合使用,服务器通过 Set-Cookie 响应首部字段来设置 Cookie,并且可以同时设置多个 Set-Cookie 响应首部字段。浏览器收到 Set-Cookie 响应首部字段时,会自动保存 Cookie 内容,在之后的请求中就会通过自动加上 Cookie 请求首部字段来携带 Cookie 内容到服务器。
  • Cookie 总是以键值对的形式存在,如 a=123b=456,多个 Cookie 使用 ; 分隔。
  • Cookie 并不限制请求方法,任何 HTTP 请求方法都可以使用 Cookie
  • 出于安全考虑,Cookie 有域名限制。任何厂商的浏览器在保存 Cookie 时都会记录这个 Cookie 是属于哪个域名的,在给服务器发送请求时,只有这个域名下的 Cookie 才会被携带。如访问 127.0.0.1:8000 域名时浏览器就只会携带这个域名下的 Cookie,如果此时浏览器也保存了 www.jd.com 域名下的 Cookie,是不会被携带到 127.0.0.1:8000 服务器的。
  • Cookie 可以设置过期时间,像上图这样只设置了 Cookie 的键值为 a=123 这种情况,浏览器默认的处理机制是当浏览器关闭时自动删除这个 Cookie。如果需要设置过期时间则可以这样设置 Set-Cookie: a=123; Max-Age=60,此时 a=123 这条 Cookie 的过期时间就是 60 秒,60 秒后浏览器会自动将保存的这条 Cookie 删除。

如何在 Chrome 浏览器中查看 Cookie 呢?打开 Chrome 浏览器开发者工具,选择 Application 选项卡,点击 Cookies 就能看到当前域名下记录的所有 Cookie 信息。

如果利用 Cookie 实现用户登录,那么流程大致如下:

  1. 浏览器访问 Web 应用的登录页面。
  2. 用户在登录页面输入用户名、密码,点击登录按钮进行登录。
  3. 服务器端收到浏览器传输过来的用户名、密码,然后到数据库中查找是否存在这个用户,身份验证一旦通过,就在返回给浏览器的响应中加入 Set-Cookie: username=zhangsan 响应首部字段。
  4. 浏览器在收到带有 Set-Cookie: username=zhangsan 响应首部字段的响应后,会将 username=zhangsan 保存下来。
  5. 当浏览器再次请求 Web 应用的某个页面时,就会自动携带 Cookie: username=zhangsan 请求首部字段。
  6. 此时服务器端收到浏览器请求,通过解析 Cookie 请求首部字段,服务器就知道这个请求是 zhangsan 发送过来的了。

虽然使用 Cookie 能够实现用户登录,但是这种直接将用户信息 username=zhangsan 存储到 Cookie 的方式并不可靠。因为用户名很容易暴露,也很容易被猜到。假如网站的攻击者知道了我们系统中有 zhangsan 这个用户存在,那么他并不需要知道 zhangsan 这个用户的密码,只需要在浏览器中将 username=zhangsan 这个 Cookie 添加到 127.0.0.1:8000 域名下(在 Chorme 开发者工具中可以手动更改 Cookie),浏览器在下次请求 127.0.0.1:8000 服务器时就会自动携带这个 Cookie,服务器收到请求后就会认为请求是 zhangsan 这个用户发送过来的,这样网站攻击者就骗过了服务器的登录机制。故此,为了解决这个问题,又有人提出了一个叫作 Session 的概念。

Session 并不是一个具体的技术实现,而是一种思想。在利用 Cookie 实现用户登录时,是直接将用户信息 username=zhangsan 以明文的形式存储到浏览器 Cookie 中的。而采用 Session 机制后,则可以将用户信息保存到服务器端(可以是任何存储介质),例如可以保存成 JSON 对象放到文件中:

{
    "6d78289e237c48719201640736226e39": "zhangsan",
    "3cdb8c488abb4eccad2c7cc8c3c9d065": "lisi"
}

对象的键是一个随机字符串,叫作 session_id,值为用户名。这样在服务器返回响应时,不再直接将用户名以明文的方式发送给浏览器,而是将 session_id 放到响应首部字段中 Set-Cookie: session_id=6d78289e237c48719201640736226e39 发送给浏览器。这种改变对于浏览器端来说并没有任何本质变化,浏览器依旧将这个 Cookie 保存到本地,下次发送请求时自动携带。但 Session 机制的加入,使得用户登录机制变得更加安全。因为 session_id 是一个随机生成的字符串,恶意用户即使知道用户名也无法伪造这个 session_id

不过加入了 Session 机制以后,服务器验证用户 Cookie 的机制就要稍作修改。以前服务器只需要解析浏览器传过来的 Cookie,就能得到用户名 zhangsan,但现在解析 Cookie 后得到的是 session_id,服务器还需要再到存储了所有 SessionJSON 对象中查找这个 session_id 键所对应的值,这样就得到了当前登录的用户名。

加入 Session 机制以后,登录流程大致如下:

  1. 浏览器访问 Web 应用的登录页面。
  2. 用户在登录页面输入用户名、密码,点击登录按钮进行登录。
  3. 服务器端收到浏览器传输过来的用户名、密码,然后到数据库中查找是否存在这个用户,身份验证通过以后,为 zhangsan 这个用户生成一个随机的 session_id,然后将 session_id 和用户名以键值对的形式存储到 JSON 文件中,接着在返回给浏览器的响应中加入 Set-Cookie: session_id=6d78289e237c48719201640736226e39 响应首部字段。
  4. 浏览器在收到带有 Set-Cookie: session_id=6d78289e237c48719201640736226e39 响应首部字段的响应后,会将 session_id=6d78289e237c48719201640736226e39 保存下来。
  5. 当浏览器再次请求 Web 应用的某个页面时,就会自动携带 Cookie: session_id=6d78289e237c48719201640736226e39 请求首部字段。
  6. 此时服务器端收到浏览器请求,通过解析 Cookie 请求首部字段得到 session_id,然后再到存储了所有 SessionJSON 文件中查找 6d78289e237c48719201640736226e39 这个 session_id 所对应的用户名为 zhangsan ,服务器就知道这个请求是 zhangsan 发送过来的了。

以上就是最常见的采用 Session + Cookie 的方式实现用户登录的机制,Session 只是一种思想,也可以不搭配 Cookie 来使用,而用户登录的实现方式也有多种,有兴趣的读者可以根据自己的需求自行探索。

用户管理功能设计

知道了用户登录原理,我们再来分析下要为 Todo List 程序增加用户管理功能,应该如何实现:

  • 对于模型层,需要新增 UserSession 两个模型类分别处理用户和 Session
  • 对于视图层,需要新增 register.html(注册)、login.html(登录)两个 HTML 页面。
  • 对于控制器层,需要实现 register(注册)、login(登录)两个视图函数。

除了对 MVC 模式中每一层需要增加的功能部分外。我们还需要增加 user.jsonsession.json两个 JSON 文件,来分别存储用户信息和 Session 信息。

另外,Todo List 程序在加入用户管理功能后,只有已登录用户才可查看管理自己的 todo。所以还要对 todo 管理部分现有的视图函数做些修改。

用户管理功能编码实现

根据以上对用户管理功能的分析,设计当前的Todo List 程序目录结构如下:

todo_list
├── server.py
├── tests
│   ├── test_controllers.py
└── todo
    ├── __init__.py
    ├── config.py
    ├── controllers
    │   ├── __init__.py
    │   ├── auth.py
    │   ├── static.py
    │   └── todo.py
    ├── db
    │   ├── session.json
    │   ├── todo.json
    │   └── user.json
    ├── logs
    │   └── todo.log
    ├── models
    │   ├── __init__.py
    │   ├── session.py
    │   ├── todo.py
    │   └── user.py
    ├── static
    │   ├── css
    │   │   └── style.css
    │   └── favicon.ico
    ├── templates
    │   ├── auth
    │   │   ├── login.html
    │   │   └── register.html
    │   └── todo
    │       ├── edit.html
    │       └── index.html
    └── utils
        ├── __init__.py
        ├── error.py
        ├── http.py
        ├── logging.py
        └── templating.py

Session 模型类编写在 models/session.py 文件中:

# todo_list/todo/models/session.py

import uuid
import datetime

from . import Model


class Session(Model):
    """
    Session 模型类
    """

    def __init__(self, **kwargs):
        # 为了安全起见,Session id 不使用自增数字,而使用 uuid
        self.id = kwargs.get('id')
        if self.id is None:
            self.id = uuid.uuid4().hex

        self.user_id = kwargs.get('user_id', -1)
        self.expire_in = kwargs.get('expire_in')
        
        if self.expire_in is None:
            now = datetime.datetime.now()
            expire_in = now + datetime.timedelta(days=1)
            self.expire_in = expire_in.strftime('%Y-%m-%d %H:%M:%S')

    def is_expired(self):
        """判断 Session 是否过期"""
        now = datetime.datetime.now()
        return datetime.datetime.strptime(self.expire_in, '%Y-%m-%d %H:%M:%S') <= now

    def save(self):
        """覆写父类的 save 方法,保存时过滤掉已经过期的 Session"""
        models = [model.__dict__ for model in self.all()
                  if model.id != self.id and not model.is_expired()]
        if not self.is_expired():
            models.append(self.__dict__)
        self._save_db(models)

Session 模型类继承自 Model。与 Todo 模型类只实现了 __init__ 方法不同,Session 模型类还实现了 is_expiredsave 两个方法。is_expired 方法用来判断当前 Session 是否过期,因为通常来说用户的登录时间都是有期限的。save 方法在保存当前 Session 对象时过滤掉已经过期的 Session

我设计了如下 JSON 对象用来存储用户登录的 Session 数据:

{
    "id": "6d78289e237c48719201640736226e39",
    "user_id": 2,
    "expire_in": "2020-05-31 22:27:55"
}

id 即为 session_iduser_id 对应当前登录用户的 id,这样就能通过这个 Session 对象查找到对应用户,expire_in 表示这条 Session 的过期时间。

为了实现随机的 session_id,在 Session 模型的 __init__ 方法中, 通过 uuid 来获取一个随机字符串。

User 模型类编写在 models/user.py 文件中:

# todo/models/user.py

import hashlib

from . import Model
from todo.config import SECRET


class User(Model):
    """
    User 模型类
    """

    def __init__(self, **kwargs):
        self.id = kwargs.get('id')
        self.username = kwargs.get('username', '')
        self.password = kwargs.get('password', '')

    @classmethod
    def generate_password(cls, raw_password):
        """生成密码"""
        md5 = hashlib.md5()
        md5.update(SECRET.encode('utf-8'))
        md5.update(raw_password.encode('utf-8'))
        return md5.hexdigest()

    @classmethod
    def validate_password(cls, raw_password, password):
        """验证密码"""
        md5 = hashlib.md5()
        md5.update(SECRET.encode('utf-8'))
        md5.update(raw_password.encode('utf-8'))
        return md5.hexdigest() == password

出于安全考虑,密码通常不会以明文的形式存储到数据库中。这样假使我们的 Web 应用被拖库,非明文密码能减小用户被撞库的风险。

由于密码不使用明文存储到文件中,所以 User 模型中实现了生成密码和检查密码两个方法。调用 generate_password 方法传入原始密码,得到的是加密后的字符串,可以将其存储到文件中,这个字符串无法解密。验证时,将原始密码(用户输入的密码)和加密后的字符串一同传入 validate_password 方法,即可验证原始密码是否正确。

这里采用了 md5 算法对用户密码进行加密。 md5 算法是一种被广泛使用的散列算法,但其有被碰撞的风险,所以代码中对其进行了 加盐 处理,这样就能够大大降低碰撞概率。

在进行密码验证时,并不需要对文件中存储的加密密码进行解密,只要对原始密码使用同样的方法进行加密,然后比较两个加密后的字符串是否相等即可。因为 md5 算法对于相同的输入一定会得到相同的输出,也就是说对同样的数据每次加密结果一致。

严格来讲,md5 并不属于加密算法,而是散列算法。因为通过加密算法加密后的数据是可以解密的,而通过散列算法得到的是一个信息摘要,不能通过这个摘要反向得到原始数据。但很多人都习惯把 md5 算法称作加密算法,故此,我这里也采用了加密算法来的叫法来介绍它。更多的关于 md5 算法的知识读者可自行搜索相关资料进行学习。

用户信息将会存储到 db/user.json 文件中,格式如下:

{
    "id": 1,
    "username": "user",
    "password": "7fff062fcb96c6f041df7dbc3fa0dcaf"
}

创建好了 SessionUser 两个模型,我们接下来实现用户注册功能。

注册页面的 HTML 如下:

<!-- todo_list/todo/templates/auth/register.html -->

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Todo List | Register</title>
    <link rel="stylesheet" href="/static/css/style.css">
</head>
<body>
<h1 class="container">Register</h1>
<div class="container">
    <form class="register" action="/register" method="post">
        <input type="text" name="username" placeholder="Username">
        <input type="password" name="password" placeholder="Password">
        <button>注册</button>
    </form>
</div>
</body>
</html>

将注册页面的 CSS 代码追加到 style.css 文件中:

/* todo_list/todo/static/css/style.css */

.register {
    width: 100%;
    max-width: 600px;
    text-align: center;
}

.register input {
    width: 100%;
    height: 40px;
    padding: 0 4px;
}

.register button {
    margin-top: 20px;
}

在控制器层,编写一个 register 视图函数用来处理用户注册的业务逻辑:

# todo_list/todo/controllers/auth.py

def register(request):
    """注册视图函数"""
    if request.method == 'POST':
        # 获取表单中的用户名和密码
        form = request.form
        logger(f'form: {form}')
        username = form.get('username')
        raw_password = form.get('password')

        # 验证用户名和密码是否合法
        if not username or not raw_password:
            return '无效的用户名或密码'.encode('utf-8')
        user = User.find_by(username=username, ensure_one=True)
        if user:
            return '用户名已存在'.encode('utf-8')

        # 对密码进行散列计算,创建并保存用户信息
        password = User.generate_password(raw_password)
        user = User(username=username, password=password)
        user.save()
        # 注册成功后重定向到登录页面
        return redirect('/login')

    return render_template('auth/register.html')

注册视图函数可以接收两种请求,GETPOST。如果为 GET 请求,则说明用户要访问注册页面,直接返回注册页面对应的 HTML。如果为 POST 请求,则说明用户点击了注册页面的注册按钮,需要处理注册逻辑。

用户注册成功后,会被重定向到登录页面,所以接下来我们要实现用户登录功能。

登录页面的 HTML 如下:

<!-- todo_list/todo/templates/auth/login.html -->

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Todo List | Login</title>
    <link rel="stylesheet" href="/static/css/style.css">
</head>
<body>
<h1 class="container">Login</h1>
<div class="container">
    <form class="login" action="/login" method="post">
        <input type="text" name="username" placeholder="Username">
        <input type="password" name="password" placeholder="Password">
        <button>登录</button>
    </form>
</div>
</body>
</html>

将登录页面的 CSS 代码追加到 style.css 文件中:

/* todo_list/todo/static/css/style.css */

.login {
    width: 100%;
    max-width: 600px;
    text-align: center;
}

.login input {
    width: 100%;
    height: 40px;
    padding: 0 4px;
}

.login button {
    margin-top: 20px;
}

在控制器层,编写一个 login 视图函数用来处理用户登录的业务逻辑:

# todo_list/todo/controllers/auth.py

def login(request):
    """登录视图函数"""
    # 如果用户已经登录,直接重定向到首页
    if current_user(request):
        return redirect('/index')

    if request.method == 'POST':
        message = '用户名或密码不正确'.encode('utf-8')

        # 获取表单中的用户名和密码
        form = request.form
        logger(f'form: {form}')
        username = form.get('username')
        raw_password = form.get('password')
        
        # 验证用户名和密码是否正确
        if not username or not raw_password:
            return message
        user = User.find_by(username=username, ensure_one=True)
        if not user:
            return message
        password = user.password
        if not User.validate_password(raw_password, password):
            return message

        # 创建 Session 并将 session_id 写入 Cookie 实现登录
        session = Session(user_id=user.id)
        session.save()
        cookies = {
            'session_id': session.id,
        }
        return redirect('/index', cookies=cookies)

    return render_template('auth/login.html')

登录视图函数同样可以接收 GETPOST 两种请求方式。如果为 GET 请求,则说明用户要访问登录页面,直接返回登录页面对应的 HTML。如果为 POST 请求,则说明用户点击了登录页面的登录按钮,需要处理登录逻辑。

登录视图函数里调用了 current_user 函数来判断用户当前是否已经登录,current_user 函数实现如下:

# todo_list/todo/utils/auth.py

def current_user(request):
    """获取当前登录用户"""
    # 从 Cookie 中获取 session_id
    cookies = request.cookies
    logger(f'cookies: {cookies}')
    session_id = cookies.get('session_id')

    # 查找 Session 并验证其是否过期
    session = Session.get(session_id)
    if not session:
        return None
    if session.is_expired():
        session.delete()
        return None

    # 查找当前登录用户
    user = User.get(session.user_id)
    if not user:
        return None
    return user

current_user 函数中通过 request 对象的 cookies 属性获取当前请求中携带的 Cookie 信息,对于 Request 类如何解析请求中携带的 Cookie 信息部分相关的代码可以到本章节的源码仓库进行查看。

为了实现用户登录,需要根据 user_id 创建一个 Session 对象,并将 Session 对象的 session_id 写入浏览器 Cookie 。因此对 redirect 函数和 Response 类做如下修改:

# todo_list/todo/utils/http.py

def redirect(url, status=302, cookies=None):
    """重定向"""
    headers = {
        'Location': url,
    }
    body = ''
    return Response(body, headers=headers, status=status, cookies=cookies)


class Response(object):
    """响应类"""

    # 根据状态码获取原因短语
    reason_phrase = {
        200: 'OK',
        302: 'FOUND',
        405: 'METHOD NOT ALLOWED',
    }

    def __init__(self, body, headers=None, status=200, cookies=None):
        # 默认响应头,指定响应内容的类型为 HTML
        _headers = {
            'Content-Type': 'text/html; charset=utf-8',
        }

        if headers is not None:
            _headers.update(headers)
        self.headers = _headers  # 响应头
        self.body = body  # 响应体
        self.status = status  # 状态码
        self.cookies = cookies  # Cookie

    def __bytes__(self):
        """构造响应报文"""
        # 状态行 'HTTP/1.1 200 OK\r\n'
        header = f'HTTP/1.1 {self.status} {self.reason_phrase.get(self.status, "")}\r\n'
        # 响应首部
        header += ''.join(f'{k}: {v}\r\n' for k, v in self.headers.items())
        # Cookie
        if self.cookies:
            header += 'Set-Cookie: ' + \
                      '; '.join(f'{k}={v}' for k, v in self.cookies.items())
        # 空行
        blank_line = '\r\n'
        # 响应体
        body = self.body

        # body 支持 str 或 bytes 类型
        if isinstance(body, str):
            body = body.encode('utf-8')
        response_message = (header + blank_line).encode('utf-8') + body
        return response_message

这样,当 login 视图函数处理完登录逻辑,执行到最后一行 return redirect('/index', cookies=cookies) 时,就能够实现重定向到首页并完成登录。

现在可以到浏览器中测试注册、登录功能:

用户登录成功后,会被重定向到首页,展示当前用户所有 todo。

目前 todo 和用户还没有做关联,为了使两者联系起来,还需要更改 Todo 模型和存储 todo 的 JSON 对象格式。

Todo 模型的 __init__ 方法需要能够接收 user_id

# todo_list/todo/models/todo.py

from . import Model


class Todo(Model):
    """
    Todo 模型类
    """

    def __init__(self, **kwargs):
        self.id = kwargs.get('id')
        self.user_id = kwargs.get('user_id', -1)
        self.content = kwargs.get('content', '')

存储 todo 的 JSON 对象中需要保存 user_id

{
    "id": 4,
    "user_id": 1,
    "content": "hello world"
}

这样就能通过当前用户的 user_id 查询出与之关联的所有 todo 了。

修改 Todo List 程序首页视图函数,获取当前登录用户,查询与之关联的所有 todo。

# todo_list/todo/controllers/todo.py

def index(request):
    """首页视图函数"""
    user = current_user(request)
    todo_list = Todo.find_by(user_id=user.id, sort=True, reverse=True)
    context = {
        'todo_list': todo_list,
    }
    return render_template('todo/index.html', **context)

登录成功后再次访问程序首页将显示当前用户的 todo:

你可以再注册一个账号,在另一个浏览器中打开 Todo List 程序,试着给另一个用户添加几条 todo,看看效果。

todo 关联了用户以后,对 todo 的新增操作,需要验证用户是否登录。而对于 todo 的删除、修改、查询操作,只有 todo 的创建者才有权限,所以不仅要验证用户是否登录,还要验证操作的 todo 是否属于当前登录用户。

我们当然可以将验证操作都放到视图函数中,但仔细观察,你会发现对 todo 的所有操作有一个共同点,都需要验证用户是否登录。所以更优雅的做法是写一个验证登录的装饰器,这样所有需要验证用户登录的视图函数都只需要打上这个装饰器即可。

# todo_list/todo/utils/auth.py

def login_required(func):
    """验证登录装饰器"""

    @functools.wraps(func)
    def wrapper(request):
        user = current_user(request)
        if not user:
            return redirect('/login')
        result = func(request)
        return result

    return wrapper

以处理 Todo List 程序首页视图函数为例,验证登录装饰器的使用方法如下:

# todo_list/todo/controllers/auth.py

@login_required
def index(request):
    """首页视图函数"""
    user = current_user(request)
    todo_list = Todo.find_by(user_id=user.id, sort=True, reverse=True)
    context = {
        'todo_list': todo_list,
    }
    return render_template('todo/index.html', **context)

你不需要对 index 视图函数内部的代码做任何修改,只需要在函数定义处打上 @login_required 装饰即可。

在视图函数内部通过给 Todo 模型的 find_by 方法传入 user_id 关键字参数来查询只属于当前登录用户的 todo。

与 todo 相关的其他视图函数代码如下,这里不再一一讲解。

# todo_list/todo/controllers/auth.py

@login_required
def new(request):
    """新建 todo 视图函数"""
    form = request.form
    logger(f'form: {form}')

    content = form.get('content')
    if content:
        user = current_user(request)
        if user:
            todo = Todo(content=content, user_id=user.id)
            todo.save()
    return redirect('/index')


@login_required
def edit(request):
    """编辑 todo 视图函数"""
    if request.method == 'POST':
        form = request.form
        logger(f'form: {form}')

        id = int(form.get('id', -1))
        content = form.get('content')

        if id != -1 and content:
            user = current_user(request)
            if user:
                todo = Todo.find_by(id=id, user_id=user.id, ensure_one=True)
                if todo:
                    todo.content = content
                    todo.save()
        return redirect('/index')

    args = request.args
    logger(f'args: {args}')

    id = int(args.get('id', -1))
    if id == -1:
        return redirect('/index')

    user = current_user(request)
    if not user:
        return redirect('/index')

    todo = Todo.find_by(id=id, user_id=user.id, ensure_one=True)
    if not todo:
        return redirect('/index')

    context = {
        'todo': todo,
    }
    return render_template('todo/edit.html', **context)


@login_required
def delete(request):
    """删除 todo 视图函数"""
    form = request.form
    logger(f'form: {form}')

    id = int(form.get('id', -1))
    if id != -1:
        user = current_user(request)
        if user:
            todo = Todo.find_by(id=id, user_id=user.id, ensure_one=True)
            if todo:
                todo.delete()
    return redirect('/index')

完善项目

一个比较完整的 Web 项目应该加入全局异常处理的机制,因为你无法在程序的每个函数中枚举全部可能出现的异常。没有被捕获的异常一旦直接暴露给用户,很可能会泄漏程序的重要信息。

这里我设计了两个异常页面,404 页面用来告诉用户所访问的页面不存在,500 页面用来告诉用户服务器出现了未知错误。

404 页面的 HTML 代码如下:

<!-- todo_list/todo/templates/error/404.html -->

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Todo List | Page Not Found</title>
</head>
<body>
<p>Page Not Found</p>
</body>
</html>

500 页面的 HTML 代码如下:

<!-- todo_list/todo/templates/error/500.html -->

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Todo List | Internal Server Error</title>
</head>
<body>
<p>Internal Server Error</p>
</body>
</html>

templates 模板目录下新建 error/ 目录用来存放 404500 两个全局异常页面。

以下是 404 页面和 500 页面的处理函数:

# todo_list/todo/utils/error.py

from todo.utils.templating import render_template
from utils.http import Response


def page_not_found():
    """处理 400 异常"""
    body = render_template('error/404.html')
    return Response(body, status=400)


def internal_server_error():
    """处理 500 异常"""
    body = render_template('error/500.html')
    return Response(body, status=500)


errors = {
    404: page_not_found,
    500: internal_server_error,
}

因为 server.py 是服务器程序的入口和出口,所以全局捕获异常的代码适合写在此文件。

# todo_list/server.py

def process_connection(client):
    """处理客户端请求"""
    request_bytes = b''
    while True:
        chunk = client.recv(BUFFER_SIZE)
        request_bytes += chunk
        if len(chunk) < BUFFER_SIZE:
            break

    # 请求报文
    request_message = request_bytes.decode('utf-8')
    logger(f'request_message: {request_message}')

    # 解析请求报文
    request = Request(request_message)
    try:
        # 根据请求报文构造响应报文
        response_bytes = make_response(request)
    except Exception as e:
        logger(e)
        # 返回给用户 500 页面
        response_bytes = bytes(errors[500]())
    # 返回响应
    client.sendall(response_bytes)

    # 关闭连接
    client.close()


def make_response(request, headers=None):
    """构造响应报文"""
    # 默认状态码为 200
    status = 200
    # 处理静态资源请求
    if request.path.startswith('/static'):
        route, methods = routes.get('/static')
    else:
        try:
            route, methods = routes.get(request.path)
        except TypeError:
            # 返回给用户 404 页面
            return bytes(errors[404]())

    # 如果请求方法不被允许返回 405 状态码
    if request.method not in methods:
        status = 405
        data = 'Method Not Allowed'
    else:
        # 请求首页时 route 实际上就是我们在 controllers.py 中定义的 index 视图函数
        data = route(request)

    # 如果返回结果为 Response 对象,直接获取响应报文
    if isinstance(data, Response):
        response_bytes = bytes(data)
    else:
        # 返回结果为字符串,需要先构造 Response 对象,然后再获取响应报文
        response = Response(data, headers=headers, status=status)
        response_bytes = bytes(response)

    logger(f'response_bytes: {response_bytes}')
    return response_bytes

当用户访问的 URL 路径没有匹配的视图函数时,可以返回给用户 404 页面。当返回响应之前出现未捕获的异常时,会被 process_connection 函数中的全局异常处理所捕获,可以返回给用户 500 页面。记得将真正的异常信息写入日志,方便排查。

以下是遇到 404 异常或 500 异常时的页面截图:

至此,Todo List 程序的功能就全部开发完成了。最后给读者留一个作业,可以试着实现用户登出功能。

本章源码:chapter8

联系我:

查看原文

赞 0 收藏 0 评论 0

江湖十年 发布了文章 · 2月1日

用 Python 撸一个 Web 服务器-第7章:重构——更好的组织代码

通过前几章的学习,我们完成了 Todo List 程序的 todo 管理部分,实现了对 todo 的增、删、改、查基本操作,这也是几乎所有 Web 程序都具备的功能。我们当然可以按照目前的思路继续来实现用户管理部分,在 models.py 中编写用户相关的模型,在 templates/ 目录下新建用户相关 HTML,在 controllers.py 中编写用户相关的视图函数。但是,随着新功能的加入,把不同功能的代码都写在相同的文件中必然会引起代码的混乱。为实现易维护、易扩展的代码,我们需要对项目的目录结构进行重构。

项目重构

目前为止,我们实现的 Todo List 程序目录结构如下:

todo_list
├── server.py
└── todo
    ├── __init__.py
    ├── config.py
    ├── controllers.py
    ├── db
    │   └── todo.json
    ├── models.py
    ├── static
    │   ├── css
    │   │   └── style.css
    │   └── favicon.ico
    ├── templates
    │   ├── edit.html
    │   └── index.html
    └── utils.py

重构后的目录结构如下:

todo_list
├── server.py
└── todo
    ├── __init__.py
    ├── config.py
    ├── controllers
    │   ├── __init__.py
    │   ├── static.py
    │   └── todo.py
    ├── db
    │   └── todo.json
    ├── models
    │   ├── __init__.py
    │   └── todo.py
    ├── static
    │   ├── css
    │   │   └── style.css
    │   └── favicon.ico
    ├── templates
    │   └── todo
    │       ├── edit.html
    │       └── index.html
    └── utils
        ├── __init__.py
        ├── http.py
        └── templating.py

首先,将原来的 controllers.py 文件换成了 controllers/ 包,在 controllers/ 目录下将视图函数按照功能分别放到不同的文件中,并在 controllers/__init__.py 中将这些视图函数汇集到一起。将读取静态资源的视图函数 static 和读取网页 ICO 图标的视图函数 favicon 都放到 controllers/static.py 中,将 todo 相关的视图函数都放到 controllers/todo.py 中。

同样的,将 models.py 文件换成 models/ 包,将原来的 Todo 模型类放到 models/todo.py 中。不过这里不只是简单的将原来的 Todo 模型代码迁移过来,还对其进行了重构,抽象出一个模型基类 Model 将其放到 models/__init__.py 中,然后 Todo 继承自 Model 模型基类。这样做的好处是等我们编写用户模型时,查找、保存等方法就不需要在用户模型中再写一遍了,只需要让用户模型也继承 Model 模型基类即可。

# todo_list/todo/models/__init__.py

import os
import json

from todo.config import BASE_DIR


class Model(object):
    """
    Model 模型类
    """

    @classmethod
    def _db_path(cls):
        """获取存储模型对象数据的文件的绝对路径"""
        class_name = cls.__name__
        file_name = f'{class_name.lower()}.json'
        path = os.path.join(BASE_DIR, 'db', file_name)
        return path

    @classmethod
    def _load_db(cls):
        """加载 JSON 文件中所有模型对象数据"""
        path = cls._db_path()
        with open(path, 'r', encoding='utf-8') as f:
            return json.load(f)

    @classmethod
    def _save_db(cls, data):
        """将模型对象数据保存到 JSON 文件"""
        path = cls._db_path()
        with open(path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=4)

    @classmethod
    def all(cls, sort=False, reverse=False):
        """查询全部模型对象"""
        # 这一步用来将所有从 JSON 文件中读取的 model 数据转换为 Model 实例化对象,方便后续操作
        models = [cls(**model) for model in cls._load_db()]
        # 对数据按照 id 排序
        if sort:
            models = sorted(models, key=lambda x: x.id, reverse=reverse)
        return models

    @classmethod
    def find_by(cls, limit=-1, ensure_one=False, sort=False, reverse=False, **kwargs):
        """根据传入条件查询模型对象"""
        result = []
        models = [model.__dict__ for model in cls.all(sort=sort, reverse=reverse)]
        for model in models:
            # 根据关键字参数查询 model
            for k, v in kwargs.items():
                if model.get(k) != v:
                    break
            else:
                result.append(cls(**model))

        # 查询给定条数的数据
        if 0 < limit < len(result):
            result = result[:limit]
        # 查询结果集中的第一条数据
        if ensure_one:
            result = result[0] if len(result) > 0 else None

        return result

    @classmethod
    def get(cls, id):
        """通过 id 查询模型对象"""
        result = cls.find_by(id=id, ensure_one=True)
        return result

    def save(self):
        """保存模型对象"""
        # 查找出除 self 以外所有 model
        # model.__dict__ 是保存了所有实例属性的字典
        models = [model.__dict__ for model in self.all(sort=True) if model.id != self.id]

        # 自增 id
        if self.id is None:
            # 如果 model_list 大于 0 说明不是第一条 model,取最后一条 model 的 id 加 1
            if len(models) > 0:
                self.id = models[-1]['id'] + 1
            # 否则说明是第一条 model,id 为 1
            else:
                self.id = 1

        # 将当前 model 追加到 model_list
        models.append(self.__dict__)
        # 将所有 model 保存到文件
        self._save_db(models)

    def delete(self):
        """删除模型对象"""
        model_list = [model.__dict__ for model in self.all() if model.id != self.id]
        self._save_db(model_list)
# todo_list/todo/models/todo.py

from . import Model


class Todo(Model):
    """
    Todo 模型类
    """

    def __init__(self, **kwargs):
        self.id = kwargs.get('id')
        self.content = kwargs.get('content', '')

存放 HTML 模板的 templates/ 目录中又增加了一层目录结构,todo/ 目录用来存放 todo 相关 HTML 模板。

原来的工具集 utils.py 文件也换成了一个 Python 包,根据工具代码的不同类型将其分别放入不同文件。请求类 Request、响应类 Response 和重定向函数 redirect 都放到 utils/http.py 中。模板引擎类 Template、渲染模板函数 render_template 放到 utils/templating.py 中。

至此,项目目录结构重构完成。在这里大部分是采用将原来的单文件改成 Python 包的形式,这样能更好的组织代码结构。

在编写用户管理功能之前,我们先来介绍下 Web 开发过程中两个重要的部分,日志和测试。日志和测试是保证生产环境项目稳定运行的重要保障,日志可以记录程序的异常信息和对程序的运行状况进行监控、分析等,测试则能够有效降低生产环境中程序出现 BUG 的概率。

日志

Todo List 程序之前记录日志的方式是通过 print 函数来实现的,在前几章的代码中可以找到很多 print 语句。不过 print 函数默认将结果输出到屏幕,而生产环境中通常需要将日志输出到文件中保存下来,方便后续对日志进行分析。我们可以通过给 print 函数指定 file 参数(一个文件对象)将其输出内容写入文件。

utils/ 目录下新建 logging.py 用来编写日志记录函数:

# todo_list/utils/logging.py

import os
import datetime

from todo.config import BASE_DIR

path = os.path.join(BASE_DIR, 'logs/todo.log')


def logger(*args, **kwargs):
    """记录日志"""
    now = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
    with open(path, 'a') as f:
        # 将日志输出到屏幕,方便调试,上线时可关掉
        print(now, '-', *args, **kwargs)
        # 将日志输出到文件
        print(now, '-', *args, **kwargs, file=f)

接着在 todo_list/ 目录下新建一个 logs/ 目录用来存放日志。最后将代码中所有 print 语句全部换成 logger 即可。这样日志能够同时输出到屏幕和文件,如果在生产环境则可以只输出到文件。

测试

测试在程序开发过程中占有举足轻重的地位,但很多团队和开发者却对其视而不见,以各种理由忽略测试。尤其是越小的团队越以开发周期短、时间紧为由省略开发程序的测试过程。但我一直认为这是得不偿失的做法,短期内可能加快了程序开发的进度,但长远来看,后期投入的开发维护精力、成本等将会大大增加。并且生产环境一旦出现严重漏洞,将带来不可挽回的损失。

尽管 Todo List 程序非常微小,但还是要对其加入测试。程序测试方法有很多,如单元测试、功能测试、集成测试等。这里着重介绍下单元测试,单元测试是指对软件中的最小可测试单元进行检查和验证。其中所谓的最小可测单元可以是一个函数、一个类等。

todo_list/ 目录下新建 tests/ 目录用来存放所有测试文件,测试代码根据被测代码类型的不同分别放到不同文件中,如测试视图函数的代码全部放到名为 test_controllers.py 的文件中,测试模型的代码全部放到名为 test_models.py 的文件中。一个约定俗成的做法是让所有的测试文件名都以 test_ 开头。

接下来我以测试首页视图函数和新增 todo 视图函数为例,讲解测试代码的编写:

# todo_list/tests/test_controllers.py

import os
import sys

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from todo.utils.http import Request
from todo.controllers import routes
from todo.models.todo import Todo


def test_index():
    """测试首页"""
    request_message = 'GET / HTTP/1.1\r\nHost: 127.0.0.1:8000\r\n\r\n'
    request = Request(request_message)
    route, method = routes.get(request.path)
    r = route(request)

    assert b'Todo List' in bytes(r, encoding='utf-8')
    assert b'/new' in bytes(r, encoding='utf-8')


def test_new():
    """测试新增 todo"""
    # 生成随机 todo 内容
    content = uuid.uuid4().hex
    request_message = f'POST /new HTTP/1.1\r\nHost: 127.0.0.1:8000\r\n\r\ncontent={content}'
    request = Request(request_message)
    route, method = routes.get(request.path)
    r = route(request)
    t = Todo.find_by(content=content, ensure_one=True)
    t.delete()

    assert b'302 FOUND' in bytes(r)
    assert b'/index' in bytes(r)
    assert t.content == content


def main():
    test_index()
    test_new()


if __name__ == '__main__':
    main()

test_index 函数用来测试首页视图函数,为了简化测试代码,测试函数中并没有通过请求 Web Server 来获取响应。首先将请求消息报文 request_message 传递给 Resquest 类构造了一个请求对象,然后根据请求路径 request.path 获取处理该请求的视图函数,接着调用视图函数来获取响应报文。这样做的好处是不需要编写发起请求的客户端程序,但测试覆盖率肯定会有所下降。这是一个选择性的问题,需要考虑时间成本、投入产出比等。测试函数的最后通过断言语句,来断言响应报文中必然包含的内容。

test_new 函数用来测试新增 todo 视图函数,大体逻辑与 test_index 差不多。在生成测试 todo 内容时使用了 UUID,目的是为了生成足够随机的字符串避免与 db/todo.json 中已存在的 todo 内存重复,这样通过 Todo.find_by() 方法查找 todo 时能够确保查询结果正确。还需要注意的一点是在新增 todo 成功后又将其删除了,这样做的目的是为了让测试代码不对原有数据产生影响。理论上,测试代码每次执行的结果都应该相同,并且不应该破坏程序原有的数据。

使用 Python 运行测试文件 python3 test_controllers.py,如果测试代码执行完成后没有任何输出,就说明全部测试通过。测试代码遵循 Linux 设计哲学,没有消息就是最好的消息。如果测试代码执行过程中抛出 AssertionError 异常,则说明测试未通过,要么是被测代码有问题,要么是测试代码本身有问题。

由于篇幅所限,对 Todo List 程序的测试部分讲解就到这里,其他部分的测试代码可以访问本章节源码进行查看。

本章源码:chapter7

联系我:

查看原文

赞 0 收藏 0 评论 0

江湖十年 发布了文章 · 1月31日

用 Python 撸一个 Web 服务器-第6章:完善 Todo List 应用

这一章,我们来完成 todo 管理功能的剩余部分:新增、修改和删除功能。

新增 todo

首先实现 Todo List 程序的新增功能。新增 todo 的逻辑如下:

  1. 在首页顶部的输入框中输入 todo 内容。
  2. 然后点击新建按钮。
  3. 将输入框中的 todo 内容通过 POST 请求传递到服务器端。
  4. 服务器端解析请求中的 todo 内容并存储到文件。
  5. 重新返回到程序首页。

接下来对这些步骤进行具体实现。

首页 HTML 中添加新增 todo 的输入框和新建按钮:

<!-- todo_list/todo/templates/index.html -->

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Todo List</title>
    <!-- 引入外部 CSS 样式 -->
    <link rel="stylesheet" href="/static/css/style.css">
</head>
<body>
<h1 class="container">Todo List</h1>
<div class="container">
    <ul>
        <li>
            <form class="new" action="/new" method="post">
                <input type="text" name="content">
                <button class="save">新建</button>
            </form>
        </li>
        <br>
        {% for todo in todo_list %}
            <li>
                <div>{{ todo.content }}</div>
            </li>
        {% endfor %}
    </ul>
</div>
</body>
</html>

代码中增加了一个 form 标签,用来新增 todo。请求路径地址为 /new,请求方法为 post

将首页的 CSS 代码追加到 style.css 文件中:

/* todo_list/todo/static/css/style.css */

.container ul li:first-child {
    background-color: #ffffff;
    padding: 0;
}

.container button {
    width: 40px;
    height: 28px;
    padding: 4px;
    cursor: pointer;
}

.new {
    width: 100%;
    max-width: 600px;
    height: 40px;
    display: flex;
    justify-content: space-between;
    align-items: center;
}

form input {
    width: 90%;
    height: 100%;
}

此时首页效果如下:

修改 Request 类,使其能够解析 GETPOST 请求中携带的参数:

# todo_list/todo/utils.py

from urllib.parse import unquote_plus

...

class Request(object):
    """请求类"""

    def __init__(self, request_message):
        method, path, headers, args, form = self.parse_data(request_message)
        self.method = method  # 请求方法 GET、POST
        self.path = path  # 请求路径 /index
        self.headers = headers  # 请求头 {'Host': '127.0.0.1:8000'}
        self.args = args  # 查询参数
        self.form = form  # 请求体

    def parse_data(self, data):
        """解析请求数据"""
        # 用请求报文中的第一个 '\r\n\r\n' 做分割,将得到请求头和请求体
        header, body = data.split('\r\n\r\n', 1)
        method, path, headers, args = self._parse_header(header)
        form = self._path_body(body)

        return method, path, headers, args, form

    def _parse_header(self, data):
        """解析请求头"""
        # 拆分请求行和请求首部
        request_line, request_header = data.split('\r\n', 1)

        # 请求行拆包 'GET /index HTTP/1.1' -> ['GET', '/index', 'HTTP/1.1']
        # 因为 HTTP 版本号没什么用,所以用一个下划线 _ 变量来接收
        method, path_query, _ = request_line.split()
        path, args = self._parse_path(path_query)

        # 解析请求首部所有的键值对,组装成字典
        headers = {}
        for header in request_header.split('\r\n'):
            k, v = header.split(': ', 1)
            headers[k] = v

        return method, path, headers, args

    @staticmethod
    def _parse_path(data):
        """解析请求路径、请求参数"""
        args = {}
        # 请求路径和 GET 请求参数格式: /index?edit=1&content=text
        if '?' not in data:
            path, query = data, ''
        else:
            path, query = data.split('?', 1)
            for q in query.split('&'):
                k, v = q.split('=', 1)
                args[k] = v
        return path, args

    @staticmethod
    def _path_body(data):
        """解析请求体"""
        form = {}
        if data:
            # POST 请求体参数格式: username=zhangsan&password=mima
            for b in data.split('&'):
                k, v = b.split('=', 1)
                # 前端页面中通过 form 表单提交过来的数据会被自动编码,使用 unquote_plus 来解码
                form[k] = unquote_plus(v)
        return form

Request 类新增了两个静态方法,_parse_path 方法用来拆分请求路径和请求参数。_path_body 用来提取请求体中的参数并转换为 dict

在服务器端将新增的 todo 内存保存到文件后,程序需要重新回到首页。所以我们需要定义一个重定向函数:

# todo_list/todo/utils.py

...

def redirect(url, status=302):
    """重定向"""
    headers = {
        'Location': url,
    }
    body = ''
    return Response(body, headers=headers, status=status)

重定向原理大致如下:

浏览器收到服务端的响应,如果状态码为 3XX,则表示重定向,浏览器会解析出响应头中的 Location 首部字段的值,然后通过 GET 请求方式访问这个 URL 地址。

重定向状态码最常见的两个就是 301302301 代表永久重定向,302 代表临时重定向。我们项目适合使用临时重定向,所以 redirect 函数的 status 参数默认值为 302

同时还需要修改 Response 响应类,使其能够处理 302 状态码:

# todo_list/todo/utils.py

class Response(object):
    """响应类"""

    # 根据状态码获取原因短语
    reason_phrase = {
        200: 'OK',
        302: 'FOUND',
        405: 'METHOD NOT ALLOWED',
    }
    
    ...

在模型层,给 Todo 模型类新增一个 save 实例方法用来保存 todo 对象到文件中:

# todo_list/todo/models.py

import os
import json

from todo.config import BASE_DIR


class Todo(object):
    """
    Todo 模型类
    """

    def __init__(self, **kwargs):
        self.id = kwargs.get('id')
        self.content = kwargs.get('content', '')

    @classmethod
    def _db_path(cls):
        """获取存储 todo 数据文件的绝对路径"""
        # 返回 'todo_list/todo/db/todo.json' 文件的绝对路径
        path = os.path.join(BASE_DIR, 'db/todo.json')
        return path

    @classmethod
    def _load_db(cls):
        """加载 JSON 文件中所有 todo 数据"""
        path = cls._db_path()
        with open(path, 'r', encoding='utf-8') as f:
            return json.load(f)

    @classmethod
    def _save_db(cls, data):
        """将 todo 数据保存到 JSON 文件"""
        path = cls._db_path()
        with open(path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=4)

    @classmethod
    def all(cls, sort=False, reverse=False):
        """获取全部 todo"""
        # 这一步用来将所有从 JSON 文件中读取的 todo 数据转换为 Todo 实例化对象,方便后续操作
        todo_list = [cls(**todo_dict) for todo_dict in cls._load_db()]
        # 对数据按照 id 排序
        if sort:
            todo_list = sorted(todo_list, key=lambda x: x.id, reverse=reverse)
        return todo_list

    def save(self):
        """保存 todo"""
        # 查找出除 self 以外所有 todo
        # todo.__dict__ 是保存了所有实例属性的字典
        todo_list = [todo.__dict__ for todo in self.all(sort=True) if todo.id != self.id]

        # 自增 id
        if self.id is None:
            # 如果 todo_list 长度大于 0 说明不是第一条 todo,取最后一条 todo 的 id 加 1
            if len(todo_list) > 0:
                self.id = todo_list[-1]['id'] + 1
            # 否则说明是第一条 todo,id 为 1
            else:
                self.id = 1

        # 将当前 todo 追加到 todo_list
        todo_list.append(self.__dict__)
        # 将所有 todo 保存到文件
        self._save_db(todo_list)

在控制器层,编写一个 new 视图函数用来处理新增 todo 的业务逻辑:

# todo_list/todo/controllers.py

def new(request):
    """新建 todo 视图函数"""
    form = request.form
    print(f'form: {form}')

    content = form.get('content')
    # 这里判断前端传递过来的参数是否有内容,如果为空则说明不是一个有效的 todo,直接重定向到首页
    if content:
        todo = Todo(content=content)
        todo.save()
    return redirect('/index')

new 视图函数中判断如果前端传递过来的 todo 有效,就将其保存到文件,然后重定向到首页。

其实对于检查 todo 是否有效的逻辑前端也可以处理,只需要在输入 todo 的 input 标签加上 required 属性即可。这样当用户未输入任何内容就直接点击新建按钮时,浏览器会自动给出 请填写此字段。 的提示,在浏览器端做校验的好处是可以在请求未发送给后台服务器之前,由浏览器自动完成输入校验,这样能够减少发送请求的次数,避免无效的请求发送到服务器后台造成资源的浪费。

<form class="new" action="/new" method="post">
  <input type="text" name="content" required>
  <button class="save">新建</button>
</form>

现在就可以使用 Todo List 程序的新增功能来添加 todo 了:

修改 todo

修改 todo 的逻辑如下:

  1. 点击首页中想要修改的 todo 右侧的编辑按钮。
  2. 页面跳转到编辑页面。
  3. 修改这条 todo 内容,点击保存按钮。
  4. 将输入框中的 todo 内容通过 POST 请求传递到服务器端。
  5. 服务器端将修改后的 todo 保存到文件。
  6. 重新返回到程序首页。

需要注意的是,修改 todo 依然采用了 POST 请求方式来提交数据。在前面介绍 HTTP 请求方法时,我提到过 HTTP 常见请求方法有四个:POSTDELETEPUTGET 分别对应增、删、改、查四个操作。如果采用 RESTful 风格来开发 Web API 最好完全遵照 HTTP 请求方法和操作的对应关系,以便开发出更加语义化的接口。这里为了简单起见,Todo List 程序只使用了 GETPOST 两种请求方式,除了获取数据时采用 GET 请求,新增、修改、删除操作都采用 POST 请求方式。

  • todo 首页新增编辑按钮

编辑页面的 HTML 如下:

<!-- todo_list/todo/templates/edit.html -->

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Todo List | Edit</title>
    <link rel="stylesheet" href="/static/css/style.css">
</head>
<body>
<h1 class="container">Edit</h1>
<div class="container">
    <form class="edit" action="/edit" method="post">
        <input type="hidden" name="id" value="{{ todo.id }}">
        <input type="text" name="content" value="{{ todo.content }}">
        <button>保存</button>
    </form>
</div>
</body>
</html>

将编辑页面的 CSS 代码追加到 style.css 文件中:

/* todo_list/todo/static/css/style.css */

.new, .edit {
    width: 100%;
    max-width: 600px;
    height: 40px;
    display: flex;
    justify-content: space-between;
    align-items: center;
}

在模型层,给 Todo 模型类新增两个查询 todo 的方法:

# todo_list/todo/models.py

class Todo(object):
    """
    Todo 模型类
    """

    ...
    
    @classmethod
    def find_by(cls, limit=-1, ensure_one=False, sort=False, reverse=False, **kwargs):
        """查询 todo"""
        result = []
        todo_list = [todo.__dict__ for todo in cls.all(sort=sort, reverse=reverse)]
        for todo in todo_list:
            # 根据关键字参数查询 todo
            for k, v in kwargs.items():
                if todo.get(k) != v:
                    break
            else:
                result.append(cls(**todo))

        # 查询给定条数的数据
        if 0 < limit < len(result):
            result = result[:limit]
        # 查询结果集中的第一条数据
        if ensure_one:
            result = result[0] if len(result) > 0 else None

        return result

    @classmethod
    def get(cls, id):
        """通过 id 查询 todo"""
        result = cls.find_by(id=id, ensure_one=True)
        return result

find_by 方法用来根据给定条件查询 todo,get 方法内部调用的还是 find_by 方法,只根据 id 来搜索,查询结果为单条数据,这两个方法均为类方法。

在控制器层,编写一个 edit 视图函数用来处理编辑 todo 的业务逻辑:

# todo_list/todo/controllers.py

def edit(request):
    """编辑 todo 视图函数"""
    # 处理 POST 请求
    if request.method == 'POST':
        form = request.form
        print(f'form: {form}')

        id = int(form.get('id', -1))
        content = form.get('content')

        if id != -1 and content:
            todo = Todo.get(id)
            if todo:
                todo.content = content
                todo.save()
        return redirect('/index')

    # 处理 GET 请求
    args = request.args
    print(f'args: {args}')

    id = int(args.get('id', -1))
    if id == -1:
        return redirect('/index')

    todo = Todo.get(id)
    if not todo:
        return redirect('/index')

    context = {
        'todo': todo,
    }
    return render_template('edit.html', **context)


routes = {
    ...
    '/edit': (edit, ['GET', 'POST']),
}

edit 视图函数跟之前编写的其他视图函数有些不同,它同时支持两种请求方法,GET 请求返回 HTML 页面,POST 请求用来处理修改 todo 的逻辑,并在修改完成后重定向到首页。

现在就可以使用 Todo List 程序的编辑功能来修改 todo 了:

删除 todo

删除 todo 的逻辑如下:

  1. 点击首页中想要删除的 todo 右侧的删除按钮。
  2. 将这条 todo 的 id 通过 POST 请求传递到服务器端。
  3. 服务器端通过得到的 id 查询并删除对应的 todo。
  4. 重新返回到程序首页。

首页 HTML 中添加删除按钮:

<!-- todo_list/todo/templates/index.html -->

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Todo List</title>
    <!-- 引入外部 CSS 样式 -->
    <link rel="stylesheet" href="/static/css/style.css">
</head>
<body>
<h1 class="container">Todo List</h1>
<div class="container">
    <ul>
        <li>
            <form class="new" action="/new" method="post">
                <input type="text" name="content" required>
                <button class="save">新建</button>
            </form>
        </li>
        <br>
        {% for todo in todo_list %}
            <li>
                <div>{{ todo.content }}</div>
                <div>
                    <a href="/edit?id={{ todo.id }}">
                        <button>编辑</button>
                    </a>
                    <form class="delete" action="/delete" method="post">
                        <input type="hidden" name="id" value="{{ todo.id }}">
                        <button>删除</button>
                    </form>
                </div>
            </li>
        {% endfor %}
    </ul>
</div>
</body>
</html>

将删除按钮的 CSS 代码追加到 style.css 文件中:

/* todo_list/todo/static/css/style.css */

.delete {
    display: inline-block;
}

在模型层,给 Todo 模型类新增一个 delete 实例方法用来从文件中删除 todo 对象:

# todo_list/todo/models.py

class Todo(object):
    """
    Todo 模型类
    """

   ...

    def delete(self):
        """删除 todo"""
        todo_list = [todo.__dict__ for todo in self.all() if todo.id != self.id]
        self._save_db(todo_list)

在控制器层,编写一个 delete 视图函数用来处理删除 todo 的业务逻辑:

# todo_list/todo/controllers.py

def delete(request):
    """删除 todo 视图函数"""
    form = request.form
    print(f'form: {form}')

    id = int(form.get('id', -1))
    if id != -1:
        todo = Todo.get(id)
        if todo:
            todo.delete()
    return redirect('/index')


routes = {
    ...
    '/delete': (delete, ['POST']),
}

现在就可以使用 Todo List 程序的删除功能来删除 todo 了:

至此,todo 的管理部分代码编写完成。

本章源码:chapter6

联系我:

查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 1 次点赞
  • 获得 2 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 2 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2020-09-23
个人主页被 838 人浏览