python项目注册nacos,健康实例数不稳定,是为什么?

使用Tornado框架,向2.0版本的nacos注册服务,然后每隔5秒发送心跳。注册服务,发送心跳都是用的V2版本的API。
可以成功注册,在日志也可以看到每隔5秒1次的心跳发送成功,但是在nacos的管理页面刷新实例列表,实例数和健康实例数一直在变,不稳定。
我会有4个服务节点注册到nacos, 实例数和健康实例数一直在1到4之前变化,每次刷新管理平台页面都不一样。
请问是什么原因呢?有没有人遇到过这个问题,,

nacos客户端类

import json, tornado.gen
import aiohttp
from libs.apollo.apollo_util import init_ip
from libs.apollo.client import AsyncApollo
import logging
import asyncio

logger = logging.getLogger(__name__)

DEFAULT_GROUP_NAME = "DEFAULT_GROUP"


class NacosException(Exception):
    pass


class NacosRequestException(NacosException):
    pass


class NacosClient(object):
    def __init__(self, server_addresses, namespace=None, username=None, password=None):
        self.server_list = server_addresses.split(',')
        self.current_server = self.server_list[0]
        self.namespace = namespace
        self.username = username
        self.password = password
        self.server_offset = 0
        self.service_ip = init_ip()

    def change_server(self):
        self.server_offset = (self.server_offset + 1) % len(self.server_list)
        self.current_server = self.server_list[self.server_offset]

    def get_server(self):
        return self.current_server

    def _build_metadata(self, metadata, params):
        if metadata:
            if isinstance(metadata, dict):
                params["metadata"] = json.dumps(metadata)
            else:
                params["metadata"] = metadata

    async def _async_post(self, url, **kwargs):
        conn = aiohttp.TCPConnector(enable_cleanup_closed=True, ssl=False, limit=5)
        async with aiohttp.ClientSession(connector=conn) as session:
            async with session.post(url, timeout=10, **kwargs) as response:
                return await response.json()

    async def _async_put(self, url, **kwargs):
        conn = aiohttp.TCPConnector(enable_cleanup_closed=True, ssl=False, limit=5)
        async with aiohttp.ClientSession(connector=conn) as session:
            async with session.put(url, timeout=10, **kwargs) as response:
                return await response.json()

    async def _async_delete(self, url, **kwargs):
        conn = aiohttp.TCPConnector(enable_cleanup_closed=True, ssl=False, limit=5)
        async with aiohttp.ClientSession(connector=conn) as session:
            async with session.delete(url, timeout=10, **kwargs) as response:
                return await response.json()

    async def register(self, service_name, port, cluster_name='default', weight=1.0, metadata=None,
                       enable=True, healthy=True, ephemeral=True, group_name=DEFAULT_GROUP_NAME):
        url = '/nacos/v2/ns/instance'
        params = {
            'ip': self.service_ip,
            'port': port,
            'serviceName': service_name,
            'weight': weight,
            'enable': enable,
            'healthy': healthy,
            'clusterName': cluster_name,
            'ephemeral': ephemeral,
            'groupName': group_name,
            'namespaceId': self.namespace,
            'username': self.username,
            'password': self.password
        }
        self._build_metadata(metadata, params)
        tries = 0
        while True:
            try:
                response = await self._async_post(self.current_server + url, data=params)
                logger.info(response)
                if response['data'] == 'ok':
                    return 'ok'
            except Exception as e:
                logger.error(e)
            tries += 1
            if tries >= len(self.server_list):
                raise NacosRequestException("All server are not available")
            self.change_server()

    async def deregister(self, service_name, port, cluster_name='default', ephemeral=True, group_name=DEFAULT_GROUP_NAME):
        url = '/nacos/v2/ns/instance'
        params = {
            'serviceName': service_name,
            'ip': self.service_ip,
            'port': port,
            'namespaceId': self.namespace,
            'healthy': True,
            'username': self.username,
            'password': self.password,
            'ephemeral': ephemeral,
            'groupName': group_name,
            'clusterName': cluster_name
        }
        tries = 0
        while True:
            try:
                response = await self._async_delete(self.current_server + url, data=params)
                logger.info(response)
                if response['data'] == 'ok':
                    return 'ok'
            except Exception as e:
                logger.error(e)
            tries += 1
            if tries >= len(self.server_list):
                raise NacosRequestException("deregister instance error!")
            self.change_server()

    async def send_beat(self, service_name, port, cluster_name='default', weight=1.0,
                        ephemeral=True, group_name=DEFAULT_GROUP_NAME):
        url = '/nacos/v2/ns/instance/beat'
        beat_data = {
            'serviceName': f'{group_name}@@{service_name}',
            'ip': self.service_ip,
            'port': port,
            'weight': weight,
            'ephemeral': ephemeral,
            'cluster': cluster_name
        }
        params = {
            'serviceName': f'{group_name}@@{service_name}',
            'beat': json.dumps(beat_data),
            'groupName': group_name,
            'namespaceId': self.namespace,
            'username': self.username,
            'password': self.password
        }
        tries = 0
        while True:
            try:
                response = await self._async_put(self.current_server + url, data=params)
                logger.info(response)
                if response['code'] == 10200:
                    return 'ok'
            except Exception as e:
                logger.error(e)
            tries += 1
            if tries >= len(self.server_list):
                return
            self.change_server()

async def init_nacos_new():
    host = await AsyncApollo.get_value('nacos_host', namespace='risk-model-nacos')
    namespace = await AsyncApollo.get_value('nacos_namespace', namespace='')
    username = await AsyncApollo.get_value('nacos_username', namespace='')
    password = await AsyncApollo.get_value('nacos_password', namespace='')
    return NacosClient(host, namespace, username, password)


nacos_client = asyncio.get_event_loop().run_until_complete(init_nacos_new())


async def register_instance(service_name):
    logger.info('start register instance to nacos!')
    await nacos_client.register(service_name, 8094)

async def deregister_instance(service_name):
    logger.info('start deregister instance from nacos!')
    await nacos_client.deregister(service_name, 8094)

async def send_heartbeat(service_name):
    while True:
        await nacos_client.send_beat(service_name, 8094)
        logger.info('Send heartbeat successful')
        await tornado.gen.sleep(5)

项目启动:

def start_app(args):
    io_loop = tornado.ioloop.IOLoop.current()
    app = Application(
        urls, log_function=log_func, debug=CurrentConfig.DEBUG
    )
    app.listen(args.port, args.host, xheaders=True)
    io_loop.spawn_callback(register_instance, name)
    io_loop.spawn_callback(send_heartbeat, name)

    io_loop.start()
阅读 1.9k
1 个回答
✓ 已被采纳新手上路,请多包涵

没人回复,我自己来回复吧。
因为使用的是2.X版本的Nacos,所以下意识的会使用V2版本的API发送心跳,出现了上述我说的问题。
于是我把注册,发送心跳,注销的接口都改为了V1版本,似乎解决了健康实例数不稳定的问题。
这是否意味着V2版本的API有一些问题呢?官方的支持2.X版本的python sdk一直都没有,所以只能先这样了。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题