南方仔

南方仔 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

南方仔 提出了问题 · 11月3日

请问antd如何一次提交多个相同的form表单?

父组件通过遍历数组,引用子组件生成多个表单,现在想一键提交这些表单,该怎么做啊?

关注 2 回答 1

南方仔 回答了问题 · 3月11日

请问nginx的配置文件nginx.conf在哪里?

/usr/local下的才是你正在使用的配置文件,/etc/nginx那个只是默认的配置, nginx -t 如果没有加-c指定配置文件,则测试的是默认配置文件

关注 9 回答 9

南方仔 收藏了文章 · 2月25日

自动为Flask写的API生成帮助文档

Flask是Python一个非常轻量的库,可以让你毫不费力地写一个简单的网站。如果你需要写一些后台API或者准备自动化测试数据时,Flask是一个非常不错的选择。

一个API例子

举个例子,我们可以这样写几个API,具体实现暂时略过:

# views/api.py

api = Blueprint('api', __name__)

@api.route('/get_todo', methods=['GET'])
def get_todo():
    """Get all todo tasks."""
    pass


@api.route('/add_todo', methods=['POST'])
def add_todo():
    """
    Add a todo task,  please post data in json format, e.g.

    data = {
              "name":"the title",
              "task":"the detail"
            }
    """
    pass


@api.route('/delete_todo', methods=['GET', 'POST'])
def delete_todo():
    """Delete a todo task."""
    pass

一旦你的API完成,你可能需要和调用方沟通调用的细节,最好给一些例子。明明你已经在代码里给所有方法都写了注释,难道还要再把这些注释拿出来重新组织排版一下?

我猜你和我一样,听过这么一句话。

read the fucking manual!

可是谁会去翻代码去看你的注释呢,何况你的代码他们还不一定能看到。如果能自动生成一个帮助页面那就好了。

自动化API帮助文档

假设我们的API都是以 http://127.0.0.1/api/* 的形式书写的,那么最好把API的完整列表就放在根目录下面,比如这样:

api-demo-home

view 方法的实现主要依靠 app.url_map 来获取Flask中所有的API:

# views/api.py

def get_api_map():
    """Search API from rules, if match the pattern then we said it is API."""
    for rule in get_app().url_map.iter_rules():
        if re.search(r'/api/.+', str(rule)):
            yield str(rule), rule.endpoint


@api.route('/', methods=['GET'])
def index():
    """List all API to this page, api_map contains each api url + endpoint."""
    api_map = sorted(list(get_api_map()))
    index_url = url_for('main.index', _external=True)
    api_map = [(index_url + x[0][1:], x[1]) for x in api_map]
    return render_template('api_index.html', api_map=api_map)

模板的实现:

# templates/api_index.html

{% extends "./layout.html" %}

{% block title %}API Root{% endblock %}

{% block breadcrumb_nav %}
    <li><a href="{{ url_for('api.index') }}">Api Root</a></li>
{% endblock %}

{% block page_header %}
    <h1>Api Root</h1>
{% endblock %}

{% block content_area %}
<pre>{
{% for i in api_map %}    "<a href="/docs/{{ i[1] }}">{{ i[0] }}</a>"{{ ",\n" if not loop.last }}{% endfor %}
}</pre>
{% endblock %}

接下来我们来文档化每个具体的API方法,最终的展示结果会是这样的。

api-demo-full

view 方法的实现思路其实也很明确,我们可以通过 app.view_functions 这个字典找到每个API 的endpoint所绑定的方法,然后访问方法的名字和文档即可。

# views/main.py

main = Blueprint('main', __name__)


@main.route('/', methods=['GET'])
def index():
    """Redirect home page to docs page."""
    return redirect(url_for('api.index'))


@main.route('/docs/<endpoint>', methods=['GET'])
def docs(endpoint):
    """Document page for an endpoint."""
    api = {
        'endpoint': endpoint,
        'methods': [],
        'doc': '',
        'url': '',
        'name': ''
    }

    try:
        func = get_app().view_functions[endpoint]

        api['name'] = _get_api_name(func)
        api['doc'] = _get_api_doc(func)

        for rule in get_app().url_map.iter_rules():
            if rule.endpoint == endpoint:
                api['methods'] = ','.join(rule.methods)
                api['url'] = str(rule)

    except:
        api['doc'] = 'Invalid api endpoint: "{}"!'.format(endpoint)

    return render_template('api_docs.html', api=api)


def _get_api_name(func):
    """e.g. Convert 'do_work' to 'Do Work'"""
    words = func.__name__.split('_')
    words = [w.capitalize() for w in words]
    return ' '.join(words)


def _get_api_doc(func):
    if func.__doc__:
        return func.__doc__
    else:
        return 'No doc found for this API!'

模板的实现:

{% extends "./layout.html" %}

{% block title %}API - {{ api['name'] }}{% endblock %}

{% block breadcrumb_nav %}
    <li><a href="{{ url_for('api.index') }}">Api Root</a></li>
    <li><a href="{{ api['url'] }}">{{ api['name'] }}</a></li>
{% endblock %}

{% block page_header %}
    <h1>{{ api['name'] | upper }}</h1>
{% endblock %}

{% block content_area %}
<pre>
<b>Target:</b><span><a href="{{ api['url'] }}">{{ api['url'] }}</a></span>
<b>Allow :</b> <span>{{ api['methods'] }}</span>
<b>Usage :</b> <span>{{ api['doc'] }}</span>
</pre>
{% endblock %}

GitHub项目地址

如果你想看完整的例子,可以到我的GitHub去拉一份代码。

https://github.com/tobyqin/fl...

只需要三步就可以在你的机器上运行Demo:

cd /path/to/flask_api/doc
pip install -r requirements.txt
python main.py

如果你觉得Demo不错,欢迎给个Star。有建议或者想法也可以拿来讨论。

关于作者:

Toby Qin, Python 技术爱好者,目前从事测试开发相关工作,转载请注明原文出处。

欢迎关注我的博客 https://betacat.online,你可以到我的公众号中去当吃瓜群众。

Betacat.online

查看原文

南方仔 赞了文章 · 2月25日

自动为Flask写的API生成帮助文档

Flask是Python一个非常轻量的库,可以让你毫不费力地写一个简单的网站。如果你需要写一些后台API或者准备自动化测试数据时,Flask是一个非常不错的选择。

一个API例子

举个例子,我们可以这样写几个API,具体实现暂时略过:

# views/api.py

api = Blueprint('api', __name__)

@api.route('/get_todo', methods=['GET'])
def get_todo():
    """Get all todo tasks."""
    pass


@api.route('/add_todo', methods=['POST'])
def add_todo():
    """
    Add a todo task,  please post data in json format, e.g.

    data = {
              "name":"the title",
              "task":"the detail"
            }
    """
    pass


@api.route('/delete_todo', methods=['GET', 'POST'])
def delete_todo():
    """Delete a todo task."""
    pass

一旦你的API完成,你可能需要和调用方沟通调用的细节,最好给一些例子。明明你已经在代码里给所有方法都写了注释,难道还要再把这些注释拿出来重新组织排版一下?

我猜你和我一样,听过这么一句话。

read the fucking manual!

可是谁会去翻代码去看你的注释呢,何况你的代码他们还不一定能看到。如果能自动生成一个帮助页面那就好了。

自动化API帮助文档

假设我们的API都是以 http://127.0.0.1/api/* 的形式书写的,那么最好把API的完整列表就放在根目录下面,比如这样:

api-demo-home

view 方法的实现主要依靠 app.url_map 来获取Flask中所有的API:

# views/api.py

def get_api_map():
    """Search API from rules, if match the pattern then we said it is API."""
    for rule in get_app().url_map.iter_rules():
        if re.search(r'/api/.+', str(rule)):
            yield str(rule), rule.endpoint


@api.route('/', methods=['GET'])
def index():
    """List all API to this page, api_map contains each api url + endpoint."""
    api_map = sorted(list(get_api_map()))
    index_url = url_for('main.index', _external=True)
    api_map = [(index_url + x[0][1:], x[1]) for x in api_map]
    return render_template('api_index.html', api_map=api_map)

模板的实现:

# templates/api_index.html

{% extends "./layout.html" %}

{% block title %}API Root{% endblock %}

{% block breadcrumb_nav %}
    <li><a href="{{ url_for('api.index') }}">Api Root</a></li>
{% endblock %}

{% block page_header %}
    <h1>Api Root</h1>
{% endblock %}

{% block content_area %}
<pre>{
{% for i in api_map %}    "<a href="/docs/{{ i[1] }}">{{ i[0] }}</a>"{{ ",\n" if not loop.last }}{% endfor %}
}</pre>
{% endblock %}

接下来我们来文档化每个具体的API方法,最终的展示结果会是这样的。

api-demo-full

view 方法的实现思路其实也很明确,我们可以通过 app.view_functions 这个字典找到每个API 的endpoint所绑定的方法,然后访问方法的名字和文档即可。

# views/main.py

main = Blueprint('main', __name__)


@main.route('/', methods=['GET'])
def index():
    """Redirect home page to docs page."""
    return redirect(url_for('api.index'))


@main.route('/docs/<endpoint>', methods=['GET'])
def docs(endpoint):
    """Document page for an endpoint."""
    api = {
        'endpoint': endpoint,
        'methods': [],
        'doc': '',
        'url': '',
        'name': ''
    }

    try:
        func = get_app().view_functions[endpoint]

        api['name'] = _get_api_name(func)
        api['doc'] = _get_api_doc(func)

        for rule in get_app().url_map.iter_rules():
            if rule.endpoint == endpoint:
                api['methods'] = ','.join(rule.methods)
                api['url'] = str(rule)

    except:
        api['doc'] = 'Invalid api endpoint: "{}"!'.format(endpoint)

    return render_template('api_docs.html', api=api)


def _get_api_name(func):
    """e.g. Convert 'do_work' to 'Do Work'"""
    words = func.__name__.split('_')
    words = [w.capitalize() for w in words]
    return ' '.join(words)


def _get_api_doc(func):
    if func.__doc__:
        return func.__doc__
    else:
        return 'No doc found for this API!'

模板的实现:

{% extends "./layout.html" %}

{% block title %}API - {{ api['name'] }}{% endblock %}

{% block breadcrumb_nav %}
    <li><a href="{{ url_for('api.index') }}">Api Root</a></li>
    <li><a href="{{ api['url'] }}">{{ api['name'] }}</a></li>
{% endblock %}

{% block page_header %}
    <h1>{{ api['name'] | upper }}</h1>
{% endblock %}

{% block content_area %}
<pre>
<b>Target:</b><span><a href="{{ api['url'] }}">{{ api['url'] }}</a></span>
<b>Allow :</b> <span>{{ api['methods'] }}</span>
<b>Usage :</b> <span>{{ api['doc'] }}</span>
</pre>
{% endblock %}

GitHub项目地址

如果你想看完整的例子,可以到我的GitHub去拉一份代码。

https://github.com/tobyqin/fl...

只需要三步就可以在你的机器上运行Demo:

cd /path/to/flask_api/doc
pip install -r requirements.txt
python main.py

如果你觉得Demo不错,欢迎给个Star。有建议或者想法也可以拿来讨论。

关于作者:

Toby Qin, Python 技术爱好者,目前从事测试开发相关工作,转载请注明原文出处。

欢迎关注我的博客 https://betacat.online,你可以到我的公众号中去当吃瓜群众。

Betacat.online

查看原文

赞 3 收藏 6 评论 0

南方仔 回答了问题 · 2月24日

解决flask celery work报错KeyError: 'app.tasks.send_async_email'

问题解决了,我使用的工厂模式,要在create_app前导入执行函数,添加这一句
celery.conf['imports'] = ['app.tasks', ]

CELERY_BROKER_URL = "redis://:*****@localhost:6379/0"  
celery = Celery(__name_, broker=CELERY_BROKER_URL)  
# 在此处导入执行函数
celery.conf['imports'] = ['app.tasks', ]  
  
  
def create_app(env=None):  
    app = Flask(__name__)  
    app.config.from_object(config\[env\])  
  
    register_logging(app)  
    register_sentry(app)  
    register_routes(app)  
  
    register_command(app)  
  
    register_celery(app)  
  
    return app

关注 1 回答 1

南方仔 关注了问题 · 2月23日

解决flask celery work报错KeyError: 'app.tasks.send_async_email'

这个问题小弟看了好多天了,在网上搜索了各种答案,尝试了很多次依旧没解决,还请劳烦各位大佬帮忙看看。

依赖包:
celery==4.4.0
Flask==1.1.1

项目结构如下:
image.png

route.py 中提交异步任务
tasks.py 中存放异步函数
image.png

结果报错如下:

## celery -A app:celery worker -B -E --loglevel=INFO 
 
 -------------- celery@felixdeMacBook-Pro.local v4.4.0 (cliffs)
--- ***** ----- 
-- ******* ---- Darwin-19.3.0-x86_64-i386-64bit 2020-02-23 18:31:40
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         app:0x105bf0518
- ** ---------- .> transport:   redis://:**@localhost:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: ON
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]


[2020-02-23 18:31:41,293: INFO/Beat] beat: Starting...
[2020-02-23 18:31:41,294: INFO/MainProcess] Connected to redis://:**@localhost:6379/0
[2020-02-23 18:31:41,323: INFO/MainProcess] mingle: searching for neighbors
[2020-02-23 18:31:42,377: INFO/MainProcess] mingle: all alone
[2020-02-23 18:31:42,431: INFO/MainProcess] celery@felixdeMacBook-Pro.local ready.
[2020-02-23 18:32:53,110: ERROR/MainProcess] Received unregistered task of type 'app.tasks.send_async_email'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[10, 20], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (83b)
Traceback (most recent call last):
  File "/Users/felixweek/venvs/py3_venv/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
    strategy = strategies[type_]
KeyError: 'app.tasks.send_async_email'

关注 1 回答 1

南方仔 提出了问题 · 2月23日

解决flask celery work报错KeyError: 'app.tasks.send_async_email'

这个问题小弟看了好多天了,在网上搜索了各种答案,尝试了很多次依旧没解决,还请劳烦各位大佬帮忙看看。

依赖包:
celery==4.4.0
Flask==1.1.1

项目结构如下:
image.png

route.py 中提交异步任务
tasks.py 中存放异步函数
image.png

结果报错如下:

## celery -A app:celery worker -B -E --loglevel=INFO 
 
 -------------- celery@felixdeMacBook-Pro.local v4.4.0 (cliffs)
--- ***** ----- 
-- ******* ---- Darwin-19.3.0-x86_64-i386-64bit 2020-02-23 18:31:40
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         app:0x105bf0518
- ** ---------- .> transport:   redis://:**@localhost:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: ON
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]


[2020-02-23 18:31:41,293: INFO/Beat] beat: Starting...
[2020-02-23 18:31:41,294: INFO/MainProcess] Connected to redis://:**@localhost:6379/0
[2020-02-23 18:31:41,323: INFO/MainProcess] mingle: searching for neighbors
[2020-02-23 18:31:42,377: INFO/MainProcess] mingle: all alone
[2020-02-23 18:31:42,431: INFO/MainProcess] celery@felixdeMacBook-Pro.local ready.
[2020-02-23 18:32:53,110: ERROR/MainProcess] Received unregistered task of type 'app.tasks.send_async_email'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[10, 20], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (83b)
Traceback (most recent call last):
  File "/Users/felixweek/venvs/py3_venv/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
    strategy = strategies[type_]
KeyError: 'app.tasks.send_async_email'

关注 1 回答 1

南方仔 关注了用户 · 2月20日

麦基普拉斯 @maggiefor

反派角色

关注 2

南方仔 赞了文章 · 2月17日

运维工具体系

运维流程管理工具

发布变更流程管理工具:做为系统接口与其他角色的工作衔接。并提供审批环节控制发布变更的风险。流程管理工具并不负责具体的业务操作的执行,只是作为单据系统跟踪流程和确保闭环。

告警和突发管理工具:体现业务受损的告警自动建单管理。人工确认之后升级为突发单。通过建单管理告警和突发确保流程的闭环,以及每次故障都能够总结出经验,并未度量业务的可用性提供KPI。

运维发布变更工具

版本管理工具(数据库):所有的发布应该以版本管理为起点。研发给的版本包先入版本管理工具,再从版本管理工具分发到现网发布。杜绝 rsync 一台服务器发布另外一台的做法。

配置管理工具(数据库):版本加配置等于现网每台机器的状态。最粗粒度的配置管理是到 IP 级别,相当于对机器做资产管理,分组到不同的业务,模块和大区等业务概念上。细粒度一点会管理到进程以及进程的相关的配置。

配置和版本下发工具:把指定的版本,结合配置好的配置下发到现网的机器上。不同的版本和配置方式需要完全不同的下发方式。以 ssh/fabric 为代表的下发方式是以脚本为中心的。以 puppet/chef 为代表的下发方式是以配置为中心的。

现网状态同步工具:为了规避现网状态漂移,与管理工具内的记录不一致。需要有一个工具定时上报现网的实际状况。

服务调度工具:发布变更经常需要一个串行的流程,先做A模块,再做B模块。很多机器的时候,需要把能并发的操作并发执行,不能并发的操作确保串行执行。同时很多发布变更流程需要操作管理范围外的服务,比如云端的DNS服务器记录等。这就需要有一个服务调度工具统一调度配置和版本下发工具,流程单据工具,以及其他系统的API接口共同组装成一个流程。

资源管理和隔离工具:以xen/kvm为代表的工具让运维可以更灵活的切割资源。比如虚拟机的快速起停,ip在idc内的漂移等。以 lxc/docker 为代表的工具让运维可以进一步的切割资源到进程级别。资源隔离代理的细粒度的资源控制可以获得更好的资源利用率,以及更容易进行可伸缩的资源配置。

发布变更统一界面:包装所有的下层工具,提供简单的界面完成标准化的发布变更操作。

运维监控告警工具

采集工具:一般是采集日志文件,也可以是定时轮询 DB 或者其他系统的接口。流行的开源方案是 logstash。

收集工具:采集工具上报给收集工具。或者由开发直接修改代码上报指标给收集工具。流程的开源方案还是 logstash。

统计入库工具:上报可能是每次调用就上报一次,统计工具负责统计出一分钟内的次数。上报也可能是每5秒上报一次数值,统计工具负责统计出一分钟内的最大值。统计工具的存在是为了上报的方便。流行的开源方案是 statsd,也有大公司基于 storm 来做二次开发的。

时间序列数据库:所有定时指标会落地到数据库里。监控告警所需要的数据库需要能够支撑非常大的数据量,但是并没有很严格的 ACID 要求。

运维事件数据库:记录所有的告警。包括从其他系统获得告警,以及对现网的所有变更操作记录。这些数据用于支撑告警的原因定位。

指标异常检测工具:基于数学模型发现指标是否与过去的稳定模式背离,而推测出现网状态的变化。

拨测工具:定时 PING 或者 HTTP GET,模拟实际用户发现服务是否中断,产生告警。同时也产生指标上报给收集系统。拨测又分为本地拨测,和远程拨测。本地拨测可以用于发现磁盘只读等本机告警。远程拨测可以模拟用户的地理分布,把网络的链路状况也包含在拨测覆盖的范围内。

告警收敛工具:综合所有来源的告警,进行频率收敛,根源分析。统一汇总成报告催促人工修复。

告警自动修复工具:接受告警进行自动化的处理。帮运维完成固定的故障机下架退库等操作。或者在业务本身没有做高可用的情况下,做故障机替换,ip漂移等现网修复操作,一定程度地提高业务可用性。

告警通知工具:重要的告警需要升级为电话。需要有高可用的电话,短信,微信等通知接口。

监控告警统一界面:屏蔽下层各种工具,提供统一的agent安装,指标采集设置,指标曲线展示,告警查询的界面。一个地方知道现网的所有的问题。

查看原文

赞 5 收藏 28 评论 0

南方仔 赞了文章 · 2月17日

支持联机对战的五子棋小游戏

原文链接

https://mp.weixin.qq.com/s/79...

效果展示

1.png
2.png
3.png

原理简介

每次都写单机游戏自嗨好像没啥意思,这次我们来写个支持联机对战的游戏吧,省的有人在issue里说:

好吧,联机和对手比赛输了总不能怪我了吧

OK,跑题了,这明明是一个学习用的公众号。因为我之前也没写过可以联机对战的游戏,所以先整个简单的游戏试试吧,支持局域网联机对战的五子棋小游戏。废话不多说,让我们愉快地开始吧~

这里简单介绍下原理吧,代码主要用PyQt5写的,pygame只用来播放一些音效。首先,设计并实现个游戏主界面:

代码实现如下:

'''游戏开始界面'''
class gameStartUI(QWidget):
  def __init__(self, parent=None, **kwargs):
    super(gameStartUI, self).__init__(parent)
    self.setFixedSize(760, 650)
    self.setWindowTitle('五子棋-微信公众号: Charles的皮卡丘')
    self.setWindowIcon(QIcon(cfg.ICON_FILEPATH))
    # 背景图片
    palette = QPalette()
    palette.setBrush(self.backgroundRole(), QBrush(QPixmap(cfg.BACKGROUND_IMAGEPATHS.get('bg_start'))))
    self.setPalette(palette)
    # 按钮
    # --人机对战
    self.ai_button = PushButton(cfg.BUTTON_IMAGEPATHS.get('ai'), self)
    self.ai_button.move(250, 200)
    self.ai_button.show()
    self.ai_button.click_signal.connect(self.playWithAI)
    # --联机对战
    self.online_button = PushButton(cfg.BUTTON_IMAGEPATHS.get('online'), self)
    self.online_button.move(250, 350)
    self.online_button.show()
    self.online_button.click_signal.connect(self.playOnline)
  '''人机对战'''
  def playWithAI(self):
    self.close()
    self.gaming_ui = playWithAIUI(cfg)
    self.gaming_ui.exit_signal.connect(lambda: sys.exit())
    self.gaming_ui.back_signal.connect(self.show)
    self.gaming_ui.show()
  '''联机对战'''
  def playOnline(self):
    self.close()
    self.gaming_ui = playOnlineUI(cfg, self)
    self.gaming_ui.show()

会pyqt5的应该都可以写出这样的界面,没啥特别的,记得把人机对战和联机对战两个按钮触发后的信号分别绑定到人机对战和联机对战的函数上就行。

然后分别来实现人机对战和联机对战就行了。这里人机对战的算法抄的公众号之前发的那篇AI五子棋的文章里用的算法,所以只要花点心思用PyQt5重新写个游戏界面就行了,效果大概是这样的:

主要的代码实现如下:

'''人机对战'''
class playWithAIUI(QWidget):
    back_signal = pyqtSignal()
    exit_signal = pyqtSignal()
    send_back_signal = False
    def __init__(self, cfg, parent=None, **kwargs):
        super(playWithAIUI, self).__init__(parent)
        self.cfg = cfg
        self.setFixedSize(760, 650)
        self.setWindowTitle('五子棋-微信公众号: Charles的皮卡丘')
        self.setWindowIcon(QIcon(cfg.ICON_FILEPATH))
        # 背景图片
        palette = QPalette()
        palette.setBrush(self.backgroundRole(), QBrush(QPixmap(cfg.BACKGROUND_IMAGEPATHS.get('bg_game'))))
        self.setPalette(palette)
        # 按钮
        self.home_button = PushButton(cfg.BUTTON_IMAGEPATHS.get('home'), self)
        self.home_button.click_signal.connect(self.goHome)
        self.home_button.move(680, 10)
        self.startgame_button = PushButton(cfg.BUTTON_IMAGEPATHS.get('startgame'), self)
        self.startgame_button.click_signal.connect(self.startgame)
        self.startgame_button.move(640, 240)
        self.regret_button = PushButton(cfg.BUTTON_IMAGEPATHS.get('regret'), self)
        self.regret_button.click_signal.connect(self.regret)
        self.regret_button.move(640, 310)
        self.givein_button = PushButton(cfg.BUTTON_IMAGEPATHS.get('givein'), self)
        self.givein_button.click_signal.connect(self.givein)
        self.givein_button.move(640, 380)
        # 落子标志
        self.chessman_sign = QLabel(self)
        sign = QPixmap(cfg.CHESSMAN_IMAGEPATHS.get('sign'))
        self.chessman_sign.setPixmap(sign)
        self.chessman_sign.setFixedSize(sign.size())
        self.chessman_sign.show()
        self.chessman_sign.hide()
        # 棋盘(19*19矩阵)
        self.chessboard = [[None for i in range(19)] for _ in range(19)]
        # 历史记录(悔棋用)
        self.history_record = []
        # 是否在游戏中
        self.is_gaming = True
        # 胜利方
        self.winner = None
        self.winner_info_label = None
        # 颜色分配and目前轮到谁落子
        self.player_color = 'white'
        self.ai_color = 'black'
        self.whoseround = self.player_color
        # 实例化ai
        self.ai_player = aiGobang(self.ai_color, self.player_color)
        # 落子声音加载
        pygame.mixer.init()
        self.drop_sound = pygame.mixer.Sound(cfg.SOUNDS_PATHS.get('drop'))
    '''鼠标左键点击事件-玩家回合'''
    def mousePressEvent(self, event):
        if (event.buttons() != QtCore.Qt.LeftButton) or (self.winner is not None) or (self.whoseround != self.player_color) or (not self.is_gaming):
            return
        # 保证只在棋盘范围内响应
        if event.x() >= 50 and event.x() <= 50 + 30 * 18 + 14 and event.y() >= 50 and event.y() <= 50 + 30 * 18 + 14:
            pos = Pixel2Chesspos(event)
            # 保证落子的地方本来没有人落子
            if self.chessboard[pos[0]][pos[1]]:
                return
            # 实例化一个棋子并显示
            c = Chessman(self.cfg.CHESSMAN_IMAGEPATHS.get(self.whoseround), self)
            c.move(event.pos())
            c.show()
            self.chessboard[pos[0]][pos[1]] = c
            # 落子声音响起
            self.drop_sound.play()
            # 最后落子位置标志对落子位置进行跟随
            self.chessman_sign.show()
            self.chessman_sign.move(c.pos())
            self.chessman_sign.raise_()
            # 记录这次落子
            self.history_record.append([*pos, self.whoseround])
            # 是否胜利了
            self.winner = checkWin(self.chessboard)
            if self.winner:
                self.showGameEndInfo()
                return
            # 切换回合方(其实就是改颜色)
            self.nextRound()
    '''鼠标左键释放操作-调用电脑回合'''
    def mouseReleaseEvent(self, event):
        if (self.winner is not None) or (self.whoseround != self.ai_color) or (not self.is_gaming):
            return
        self.aiAct()
    '''电脑自动下-AI回合'''
    def aiAct(self):
        if (self.winner is not None) or (self.whoseround == self.player_color) or (not self.is_gaming):
            return
        next_pos = self.ai_player.act(self.history_record)
        # 实例化一个棋子并显示
        c = Chessman(self.cfg.CHESSMAN_IMAGEPATHS.get(self.whoseround), self)
        c.move(QPoint(*Chesspos2Pixel(next_pos)))
        c.show()
        self.chessboard[next_pos[0]][next_pos[1]] = c
        # 落子声音响起
        self.drop_sound.play()
        # 最后落子位置标志对落子位置进行跟随
        self.chessman_sign.show()
        self.chessman_sign.move(c.pos())
        self.chessman_sign.raise_()
        # 记录这次落子
        self.history_record.append([*next_pos, self.whoseround])
        # 是否胜利了
        self.winner = checkWin(self.chessboard)
        if self.winner:
            self.showGameEndInfo()
            return
        # 切换回合方(其实就是改颜色)
        self.nextRound()
    '''改变落子方'''
    def nextRound(self):
        self.whoseround = self.player_color if self.whoseround == self.ai_color else self.ai_color
    '''显示游戏结束结果'''
    def showGameEndInfo(self):
        self.is_gaming = False
        info_img = QPixmap(self.cfg.WIN_IMAGEPATHS.get(self.winner))
        self.winner_info_label = QLabel(self)
        self.winner_info_label.setPixmap(info_img)
        self.winner_info_label.resize(info_img.size())
        self.winner_info_label.move(50, 50)
        self.winner_info_label.show()
    '''认输'''
    def givein(self):
        if self.is_gaming and (self.winner is None) and (self.whoseround == self.player_color):
            self.winner = self.ai_color
            self.showGameEndInfo()
    '''悔棋-只有我方回合的时候可以悔棋'''
    def regret(self):
        if (self.winner is not None) or (len(self.history_record) == 0) or (not self.is_gaming) and (self.whoseround != self.player_color):
            return
        for _ in range(2):
            pre_round = self.history_record.pop(-1)
            self.chessboard[pre_round[0]][pre_round[1]].close()
            self.chessboard[pre_round[0]][pre_round[1]] = None
        self.chessman_sign.hide()
    '''开始游戏-之前的对弈必须已经结束才行'''
    def startgame(self):
        if self.is_gaming:
            return
        self.is_gaming = True
        self.whoseround = self.player_color
        for i, j in product(range(19), range(19)):
            if self.chessboard[i][j]:
                self.chessboard[i][j].close()
                self.chessboard[i][j] = None
        self.winner = None
        self.winner_info_label.close()
        self.winner_info_label = None
        self.history_record.clear()
        self.chessman_sign.hide()
    '''关闭窗口事件'''
    def closeEvent(self, event):
        if not self.send_back_signal:
            self.exit_signal.emit()
    '''返回游戏主页面'''
    def goHome(self):
        self.send_back_signal = True
        self.close()
        self.back_signal.emit()

整个逻辑是这样的:

设计并实现游戏的基本界面之后,先默认永远是玩家先手(白子),电脑后手(黑子)。然后,当监听到玩家鼠标左键点击到棋盘网格所在的范围内的时候,捕获该位置,若该位置之前没有人落子过,则玩家成功落子,否则重新等待玩家鼠标左键点击事件。玩家成功落子后,判断是否因为玩家落子而导致游戏结束(即棋盘上有5颗同色子相连了),若游戏结束,则显示游戏结束界面,否则轮到AI落子。AI落子和玩家落子的逻辑类似,然后又轮到玩家落子,以此类推。

需要注意的是:为保证响应的实时性,AI落子算法应当写到鼠标左键点击后释放事件的响应中(感兴趣的小伙伴可以试试写到鼠标点击事件的响应中,这样会导致必须在AI计算结束并落子后,才能显示玩家上一次的落子和AI此次的落子结果)。

开始按钮就是重置游戏,没啥可说的,这里为了避免有些人喜欢耍赖,我实现的时候代码写的是必须完成当前对弈才能重置游戏(毕竟小伙子小姑娘们要学会有耐心地下完一盘棋呀)。

因为是和AI下,所以悔棋按钮直接悔两步,从历史记录列表里pop最后两次落子然后从棋盘对应位置取下这两次落子就OK了,并且保证只有我方回合可以悔棋以避免出现意料之外的逻辑出错。

认输按钮也没啥可说的,就是认输然后提前结束游戏。

接下来我们来实现一下联机对战,这里我们选择使用TCP/IP协议进行联机通信从而实现联机对战。先启动游戏的一方作为服务器端:

通过新开一个线程来实现监听:

threading.Thread(target=self.startListen).start()
'''开始监听客户端的连接'''
def startListen(self):
    while True:
       self.setWindowTitle('五子棋-微信公众号: Charles的皮卡丘 ——> 服务器端启动成功, 等待客户端连接中')
       self.tcp_socket, self.client_ipport = self.tcp_server.accept()
       self.setWindowTitle('五子棋-微信公众号: Charles的皮卡丘 ——> 客户端已连接, 点击开始按钮进行游戏')

后启动方作为客户端连接服务器端并发送客户端玩家的基本信息:

self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_socket.connect(self.server_ipport)
data = {'type': 'nickname', 'data': self.nickname}
self.tcp_socket.sendall(packSocketData(data))
self.setWindowTitle('五子棋-微信公众号: Charles的皮卡丘 ——> 已经成功连接服务器, 点击开始按钮进行游戏')

当客户端连接到服务器端时,服务器端也发送服务器端的玩家基本信息给客户端:

data = {'type': 'nickname', 'data': self.nickname}
self.tcp_socket.sendall(packSocketData(data))

然后客户端和服务器端都利用新开的线程来实现网络数据监听接收:

'''接收客户端数据'''
def receiveClientData(self):
    while True:
        data = receiveAndReadSocketData(self.tcp_socket)
        self.receive_signal.emit(data)
'''接收服务器端数据'''
def receiveServerData(self):
    while True:
        data = receiveAndReadSocketData(self.tcp_socket)
        self.receive_signal.emit(data)

并根据接收到的不同数据在主进程中做成对应的响应:

'''响应接收到的数据'''
def responseForReceiveData(self, data):
    if data['type'] == 'action' and data['detail'] == 'exit':
        QMessageBox.information(self, '提示', '您的对手已退出游戏, 游戏将自动返回主界面')
        self.goHome()
    elif data['type'] == 'action' and data['detail'] == 'startgame':
        self.opponent_player_color, self.player_color = data['data']
        self.whoseround = 'white'
        self.whoseround2nickname_dict = {self.player_color: self.nickname, self.opponent_player_color: self.opponent_nickname}
        res = QMessageBox.information(self, '提示', '对方请求(重新)开始游戏, 您为%s, 您是否同意?' % {'white': '白子', 'black': '黑子'}.get(self.player_color), QMessageBox.Yes | QMessageBox.No)
        if res == QMessageBox.Yes:
            data = {'type': 'reply', 'detail': 'startgame', 'data': True}
            self.tcp_socket.sendall(packSocketData(data))
            self.is_gaming = True
            self.setWindowTitle('五子棋-微信公众号: Charles的皮卡丘 ——> %s走棋' % self.whoseround2nickname_dict.get(self.whoseround))
            for i, j in product(range(19), range(19)):
                if self.chessboard[i][j]:
                    self.chessboard[i][j].close()
                    self.chessboard[i][j] = None
            self.history_record.clear()
            self.winner = None
            if self.winner_info_label:
                self.winner_info_label.close()
            self.winner_info_label = None
            self.chessman_sign.hide()
        else:
            data = {'type': 'reply', 'detail': 'startgame', 'data': False}
            self.tcp_socket.sendall(packSocketData(data))
    elif data['type'] == 'action' and data['detail'] == 'drop':
        pos = data['data']
        # 实例化一个棋子并显示
        c = Chessman(self.cfg.CHESSMAN_IMAGEPATHS.get(self.whoseround), self)
        c.move(QPoint(*Chesspos2Pixel(pos)))
        c.show()
        self.chessboard[pos[0]][pos[1]] = c
        # 落子声音响起
        self.drop_sound.play()
        # 最后落子位置标志对落子位置进行跟随
        self.chessman_sign.show()
        self.chessman_sign.move(c.pos())
        self.chessman_sign.raise_()
        # 记录这次落子
        self.history_record.append([*pos, self.whoseround])
        # 是否胜利了
        self.winner = checkWin(self.chessboard)
        if self.winner:
            self.showGameEndInfo()
            return
        # 切换回合方(其实就是改颜色)
        self.nextRound()
    elif data['type'] == 'action' and data['detail'] == 'givein':
        self.winner = self.player_color
        self.showGameEndInfo()
    elif data['type'] == 'action' and data['detail'] == 'urge':
        self.urge_sound.play()
    elif data['type'] == 'action' and data['detail'] == 'regret':
        res = QMessageBox.information(self, '提示', '对方请求悔棋, 您是否同意?', QMessageBox.Yes | QMessageBox.No)
        if res == QMessageBox.Yes:
            pre_round = self.history_record.pop(-1)
            self.chessboard[pre_round[0]][pre_round[1]].close()
            self.chessboard[pre_round[0]][pre_round[1]] = None
            self.chessman_sign.hide()
            self.nextRound()
            data = {'type': 'reply', 'detail': 'regret', 'data': True}
            self.tcp_socket.sendall(packSocketData(data))
        else:
            data = {'type': 'reply', 'detail': 'regret', 'data': False}
            self.tcp_socket.sendall(packSocketData(data))
    elif data['type'] == 'reply' and data['detail'] == 'startgame':
        if data['data']:
            self.is_gaming = True
            self.setWindowTitle('五子棋-微信公众号: Charles的皮卡丘 ——> %s走棋' % self.whoseround2nickname_dict.get(self.whoseround))
            for i, j in product(range(19), range(19)):
                if self.chessboard[i][j]:
                    self.chessboard[i][j].close()
                    self.chessboard[i][j] = None
            self.history_record.clear()
            self.winner = None
            if self.winner_info_label:
                self.winner_info_label.close()
            self.winner_info_label = None
            self.chessman_sign.hide()
            QMessageBox.information(self, '提示', '对方同意开始游戏请求, 您为%s, 执白者先行.' % {'white': '白子', 'black': '黑子'}.get(self.player_color))
        else:
            QMessageBox.information(self, '提示', '对方拒绝了您开始游戏的请求.')
    elif data['type'] == 'reply' and data['detail'] == 'regret':
        if data['data']:
            pre_round = self.history_record.pop(-1)
            self.chessboard[pre_round[0]][pre_round[1]].close()
            self.chessboard[pre_round[0]][pre_round[1]] = None
            self.nextRound()
            QMessageBox.information(self, '提示', '对方同意了您的悔棋请求.')
        else:
            QMessageBox.information(self, '提示', '对方拒绝了您的悔棋请求.')
    elif data['type'] == 'nickname':
        self.opponent_nickname = data['data']

对战过程实现的基本逻辑和人机对战是一致的,只不过要考虑数据同步问题,所以看起来代码略多了一些。当然对于联机对战,我也做了一些小修改,比如必须点击开始按钮,并经过对方同意之后,才能正式开始对弈,悔棋按钮只有在对方回合才能按,对方同意悔棋后需要记得把落子方切换回自己。然后加了一个催促按钮,同样必须在对方回合才能按。其他好像也没什么特别的改动了。

All done~完整源代码详见相关文件~

相关文件

https://github.com/CharlesPik...

查看原文

赞 8 收藏 3 评论 0

认证与成就

  • 获得 0 次点赞
  • 获得 3 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 3 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2017-08-20
个人主页被 72 人浏览