目前的需求是,我需要监视数据库中某个字段的值,如果值为0,则重复查询,值为1后继续后面的操作,目前的一个问题是,如果我用unittest或者pytest来做,用例会阻塞(因为要重复查询等待值为1),有没有什么办法异步访问数据库中的值,值为0的时候挂起,重复查询,同时执行后面的测试用例,等到值为1的时候在回过头执行当前的用例?求一个代码例子,感激不尽
目前的需求是,我需要监视数据库中某个字段的值,如果值为0,则重复查询,值为1后继续后面的操作,目前的一个问题是,如果我用unittest或者pytest来做,用例会阻塞(因为要重复查询等待值为1),有没有什么办法异步访问数据库中的值,值为0的时候挂起,重复查询,同时执行后面的测试用例,等到值为1的时候在回过头执行当前的用例?求一个代码例子,感激不尽
同步的话,可以使用 pytest-xdist 支持并发执行测试用例。
异步的话 unittest IsolatedAsyncioTestCase 支持异步测试(requires version 3.8+), see
https://docs.python.org/3.8/l...
补充回答
题主也是够懒的,链接都发你了看都不看么
def pprint(message):
# make pytest-xdist writing to stdout/stderr
print(message, file=sys.__stderr__)
def query_db():
with open('/tmp/db') as f:
db_result = f.read()
pprint(f'query db result: {db_result}')
return db_result
def test_01():
pprint(f'{datetime.now()}: run test_01')
while not (result := query_db()):
time.sleep(1)
assert result == 'Insert into db'
pprint(f'{datetime.now()}: done test_01')
def test_02():
pprint(f'{datetime.now()}: run test_02')
time.sleep(3)
with open('/tmp/db', 'w') as f:
f.write('Insert into db')
pprint(f'{datetime.now()}: done test_02')
启动2个worker运行, 可以看到 test_01
是在 test_02
修改了 db 后完成的
pytest -s -n 2 test.py
=========================================================================================================== test session starts ===========================================================================================================
platform darwin -- Python 3.8.2, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
rootdir: /Users/yang/Workspace/Repos/f2pool-web
plugins: xdist-2.2.1, forked-1.3.0
gw0 [2] / gw1 [2]
2021-05-27 11:11:32.046926: run test_02
2021-05-27 11:11:32.047211: run test_01
query db result:
query db result:
query db result:
2021-05-27 11:11:35.049686: done test_02
.query db result: Insert into db
2021-05-27 11:11:35.051555: done test_01
.
============================================================================================================ 2 passed in 3.36s ============================================================================================================
1 回答9.4k 阅读✓ 已解决
2 回答5.1k 阅读✓ 已解决
2 回答3.4k 阅读✓ 已解决
3 回答4.3k 阅读
2 回答2.4k 阅读✓ 已解决
3 回答1.5k 阅读✓ 已解决
2 回答1.4k 阅读✓ 已解决
经过几天的研究,觉得采用unittest测试框架,我上面提的问题可以延伸为,如果有大量测试用例,每个用例需要花费大量时间,unittest如何利用协程异步去运行所有的测试用例?经过一番探索我找到了答案,已经有unittest异步执行测试用例的轮子了,这是参考链接
安装
使用
使用方法与unittest库一致,仅需将import unittest改为import fastunit as unittest即可。
顺便一提,我问的问题真的不难,,,我google了这个问题只需要几分钟,而我放到论坛里的问题挂了几天也没解决,,,哭了
详细说明文档