xddd

xddd 查看完整档案

杭州编辑浙江工业大学  |  控制理论与控制工程 编辑Whale  |  后端 编辑 sf.gg/u/xddd 编辑
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 个人简介什么都没有

个人动态

xddd 收藏了文章 · 2020-01-16

《领域驱动设计之PHP实现》全书翻译 - 应用服务

应用服务

  1. 《领域驱动设计之PHP实现》全书翻译 - DDD入门
  2. 《领域驱动设计之PHP实现》全书翻译 - 架构风格
  3. 《领域驱动设计之PHP实现》全书翻译 - 值对象
  4. 《领域驱动设计之PHP实现》全书翻译 - 实体
  5. 《领域驱动设计之PHP实现》全书翻译 - 服务
  6. 《领域驱动设计之PHP实现》全书翻译 - 领域事件
  7. 《领域驱动设计之PHP实现》全书翻译 - 模块
  8. 《领域驱动设计之PHP实现》全书翻译 - 聚合
  9. 《领域驱动设计之PHP实现》全书翻译 - 工厂
  10. 《领域驱动设计之PHP实现》全书翻译 - 仓储
  11. 《领域驱动设计之PHP实现》全书翻译 - 应用服务
  12. 《领域驱动设计之PHP实现》全书翻译 - 集成上下文
  13. 《用PHP实现六边形架构》

应用层是将领域模型与查询或更改其状态的客户端分离的层。应用服务是此层的构建块。正如 Vaughn Vernon 所说:“应用服务是领域模型的直接客户端。” 你可以考虑把应用服务当作外部世界(HTML 表单,API 客户端,命令行,框架,UI 等等)与领域模型自身之间的连接点。考虑向外部展示系统的顶级用例,也许会有帮助。例如:“作为来宾,我想注册”,“作为以登录用户,我要购买产品”,等等。

在这一章,我们将解释怎样实现应用服务,理解命令模式(Command pattern)的角色,并且确定应用服务的职责。所以,让我们考虑一个注册新用户(signing up a new user)的用例。

从概念上讲,为注册新用户,我们必须:

  • 从客户端获取电子邮箱(email)和密码(password)
  • 检查电子邮箱(email)是否在使用
  • 创建一个新用户(user)
  • 将用户(user)添加到已有用户集合(user set)
  • 返回我们刚创建的用户(user)

让我们开始干吧!

请求(Requests)

我们需要发送电子邮件(email)和密码(password)给应用服务。这有许多方法可以从客户端来做这样事情(HTML 表单,API 客户端,或者甚至命令行)。我们可以通过用方法签名发送一个标准的参数(email 和 password),或者构建并发送一个含有这些信息的数据结构。后面的这种方法,即发送 DTO,把一些有趣的功能拿到台面上。通过发送对象,可以在命令总线上对其进行序列化和排队。也可以添加类型安全和一些 IDE 帮助。

数据传输对象(Data Transfer Object)

DTO 是一种在过程之间转移数据的数据结构。不要把它误认为是一种全能的对象。DTO 除了存储和检索它自身数据(存取器),没有其他任何行为。DTO 是简单的对象,它不应该含有任何需要测试的业务逻辑。

正如 Vaughn Vernon 所说:

应用服务方法签名仅使用基本类型(整数,字符串等等),或者 DTO 作为这些方法的替代方法,更好的方法可能是设计命令对象(Command Object)。这不一定是正确或错误的方法,这主要取决于你的口味和目标。

一个含有应用服务所含数据的 DTO 实现可能像这样:

namespace Lw\Application\Service\User;
class SignUpUserRequest
{
    private $email;
    private $password;

    public function __construct($email, $password)
    {
        $this->email = $email;
        $this->password = $password;
    }

    public function email()
    {
        return $this->email;
    }

    public function password()
    {
        return $this->password;
    }
}

正如你所见,SignUpUserRequest 没有行为,只有数据。这可能来自于一个 HTML 表单或者一个 API 端点,尽管我们不关心这些。

构建应用服务请求

从你最喜欢的框架交付机制创建一个请求,应该是最直观的。在 web 中,你可以将控制器请求中的参数打包成一个 DTO 并将它们下发给服务。对 CLI 命令来说也是相同的原则:读取输入参数并下发。

通过使用 Symfony,我们可以提取来自 HttpFoundation 组件的请求中的所需数据:

// ...
class UsersController extends Controller
{
    /**
     * @Route('/signup', name = 'signup')
     * @param Request $request
     * @return Response
     */
    public function signUpAction(Request $request)
    {
// ...
        $signUpUserRequest = new SignUpUserRequest(
            $request->get('email'),
            $request->get('password')
        );
// ...
    }
// ...

在一个使用 Form 组件来捕获和验证参数的更精细的 Silex 应用程序上,它看起来像这样:

// ...
$app->match('/signup', function (Request $request) use ($app) {
    $form = $app['sign_up_form'];
    $form->handleRequest($request);
    if ($form->isValid()) {
        $data = $form->getData();
        try {
            $app['sign_in_user_application_service']->execute(
                new SignUpUserRequest(
                    $data['email'],
                    $data['password']
                )
            );
            return $app->redirect(
                $app['url_generator']->generate('login')
            );
        } catch (UserAlreadyExistsException $e) {
            $form
                ->get('email')
                ->addError(
                    new FormError(
                        'Email is already registered by another user'
                    )
                );
        } catch (Exception $e) {
            $form
                ->addError(
                    new FormError(
                        'There was an error, please get in touch with us'
                    )
                );
        }
    }
    return $app['twig']->render('signup.html.twig', [
        'form' => $form->createView(),
    ]);
});

请求的设计

在设计你的请求对象时,你应该总是遵循这些原则:使用基本类型,序列化设计,并且不包含业务逻辑在里面。这样,你可以在单元测试时节省开支。

使用基本类型

我们推荐使用基本类型来构建你的请求对象(就是字符串,整数,布尔值等等)。我们仅仅是抽象出输入参数。你应该能够从交付机制当中独立地消费应用服务。即使是非常复杂的 HTML 表单,也总是可以在控制器级别转换为基本类型。你应该不想混淆你的框架和业务逻辑。

在某些情况下,很容易直接使用值对象。不要这样做。值对象定义的更新将影响所有客户端,并且你会将客户端与领域逻辑耦合在一起。

序列化

使用基本类型的一个副作用就是,任何请求对象可以轻松地序列化为字符串,发送并存储在消息系统或者数据库中。

无业务逻辑

避免在你的请求对象中加入任何业务甚至验证逻辑。验证应该放到你的领域中(这里指的是实体,值对象,领域服务等等)。验证是保持业务不变性和领域约束的方法。

无测试

应用程序请求是数据结构,不是对象。数据结构的单元测试就像测试 getters 和 setters。这没有行为需要测试,因此对请求对象和 DTO 进行单元测试没有太多价值。这些结构将作为更复杂的测试(例如集成测试或验收测试)的副作用而覆盖。

命令是请求对象的替代方法。我们设计一个具有多种应用方法服务,并且每个方法都含有你放到 Request 中的参数。对于简单的应用程序来说这是可以的,但后面我们就得操心这个问题。

应用服务剖析

当我们在请求中封装好了数据,就该处理业务逻辑了。正如 Vaughn Vernon 所说:“尽量保证应用服务很小很薄,仅仅用它们在模型上协调任务。”

首先要做的事情就是从请求中提取必要信息,即 email 和 password。必要的话,我们需要确认是否含有特殊 email 的用户。如果不关心这些的话,那么我们创建和添加用户到 UserRepository。在发现有用户具有相同 email 的特殊情况下,我们抛出一个异常以便客户端以自己的方式处理(显示错误,重试,或者直接忽略它)。

namespace Lw\Application\Service\User;

use Ddd\Application\Service\ApplicationService;
use Lw\Domain\Model\User\User;
use Lw\Domain\Model\User\UserAlreadyExistsException;
use Lw\Domain\Model\User\UserRepository;

class SignUpUserService
{
    private $userRepository;

    public function __construct(UserRepository $userRepository)
    {
        $this->userRepository = $userRepository;
    }

    public function execute(SignUpUserRequest $request)
    {
        $email = $request->email();
        $password = $request->password();
        $user = $this->userRepository->ofEmail($email);
        if ($user) {
            throw new UserAlreadyExistsException();
        }
        $this->userRepository->add(
            new User(
                $this->userRepository->nextIdentity(),
                $email,
                $password
            )
        );
    }
}

漂亮!如果你想知道构造函数里的 UserRepository 是做什么的,我们会在后续向你展示。

处理异常

应用服务抛出异常是向客户端反馈不正常的情况和流程的方法。这一层上的异常与业务逻辑有关(像查找一个用户),但并不是实现细节(像 PDOException,PredisException, 或者 DoctrineException)。

依赖倒置

处理用户不是服务的职责。正如我们在第 10 章,仓储中所见,有一个专门的类来处理 User 集合:UserRepository。这是一个从应用服务到仓储的依赖。我们不想用仓储的具体实现耦合应用服务,因此这之后会导致我们用基础设施细节耦合服务。所以我们依赖具体实现依赖的契约(接口),即 UserRepository。

UserRepository 的特定实现会在运行时被构建和传送,例如用 DoctrineUserRepository,一个使用 Doctine 的专门实现。传递一个专门的实现在测试时同样正常。例如,NotAvailableUserRepository 可以是一个专门的实现,它会在一个操作每个执行时抛出一个异常。这样,我们可以测试所有应用服务行为,包括悲观路径(sad paths),即使当出现了问题,应用服务也必须正常工作。

应用服务也可以依赖领域服务(如 GetBadgesByUser)。在运行时,此类服务的实现可能相当复杂。可以想像一个通过 HTTP 协议整合上下文的 HttpGetBadgesByUser。

依赖抽象,我们可以使我们的应用服务不受底层基础设施更改的影响。

应用服务实例

仅实例化应用服务很容易,但根据依赖构建的复杂度,构建依赖树的可能很棘手。为此,大多数框架都带有依赖注入容器。如果没有,你最后会在控制器的某处得到类似以下的代码:

$redisClient = new Predis\Client([
    'scheme' => 'tcp',
    'host' => '10.0.0.1',
    'port' => 6379
]);
$userRepository = new RedisUserRepository($redisClient);
$signUp = new SignUpUserService($userRepository);
$signUp->execute(new SignUpUserRequest(
    'user@example.com',
    'password'
));

我们决定为 UserRepository 使用 Redis 的实现。在之前的代码示例中,为构建一个在内部使用 Redis 的仓储,我们构建了所有所需的依赖项。这些依赖是:一个 Predis 客户端,以及所有连接到 Redis 服务器的参数。这不仅效率低下,还会在控制器内重复传播。

你可以将创建逻辑重构为一个工厂,或者你可以使用依赖注入容器(大多数现代框架都包含了它)。

使用依赖注入容器是否不好?

一点也不。依赖注入只是一个工具。它们有助于剥离构建依赖时的复杂性。对构建基础构件也很有用。Symfony 提供了一个完整的实现。

请考虑以下事实,将整个容器作为一个整体传递给服务之一是不好的做法。这就像将整个应用程序的上下文与领域耦合在一起。如果一个服务需要指定的对象,请从框架来创建它们,并且把它们作为依赖传递给服务。但不要使服务觉察到其中的上下文。

让我们看看如何在 Silex 中构建依赖:

$app = new \Silex\Application();
$app['redis_parameters'] = [
    'scheme' => 'tcp',
    'host' => '127.0.0.1',
    'port' => 6379
];
$app['redis'] = $app->share(function ($app) {
    return new Predis\Client($app['redis_parameters']);
});
$app['user_repository'] = $app->share(function($app) {
    return new RedisUserRepository(
        $app['redis']
    );
});
$app['sign_up_user_application_service'] = $app->share(function($app) {
    return new SignUpUserService(
        $app['user_repository']
    );
});
// ...
$app->match('/signup' ,function (Request $request) use ($app) {
// ...
    $app['sign_up_user_application_service']->execute(
        new SignUpUserRequest(
            $request->get('email'),
            $request->get('password')
        )
    );
// ...
});

正如你所见,$app 被当作服务容器使用。我们注册了所有所需的组件,以及它们的依赖。sign_up_user_application_service 依赖上面的定义。改变 user_repository 的实现就像返回其他东西一样简单(MySQL,MongoDB 等等),所以我们根本不需要改变服务代码。

Symfony 应用里等效的内容如下:

<?xml version=" 1.0" ?>
<container xmlns="http://symfony.com/schema/dic/services"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="
http://symfony.com/schema/dic/services
http://symfony.com/schema/dic/services/services-1.0.xsd">
    <services>
        <service
                id="sign_up_user_application_service"
                class="SignUpUserService">
            <argument type="service" id="user_repository" />
        </service>
        <service
                id="user_repository"
                class="RedisUserRepository">
            <argument type="service">
                <service class="Predis\Client" />
            </argument>
        </service>
    </services>
</container>

现在,你有了在 Symonfy Service Container 中的应用服务定义,之后获取它也非常直观。所有交付机制(Web 控制器,REST 控制器,甚至控制台命令)都共享相同的定义。在任何实现 ContainerWare 接口的类上,服务都是可用的。获取服务与 $this->get('sign_up_user_application_service') 一样容易。

总而言之,你怎样构建服务(adhoc,服务容器,工厂等等)并不重要。重要的是保持你的应用服务设置在基础设施边界之外。

自定义一个应用服务

自定义服务的主要方法是选择你要传递的依赖。根据你服务容器能力,这可能有一点棘手,因此你可以添加一个 setter 方法来即时改变依赖。例如,你可能需要更改依赖输出,以便你能设置一个默认项然后改变它。如果逻辑变得过于复杂,你可以创建一个应用服务工厂,它可以为你处理这种局面。

执行

这有两种调用应用服务的方法:一个是每个用例对应一个含有单个 execution 方法的专用类,以及多个应用服务和用例在同一个类。

一个类一个应用服务

这是我们更喜欢的方法,并且这可能适合所有场景:

class SignUpUserService
{
// ...
    public function execute(SignUpUserRequest $request)
    {
// ...
    }
}

每个应用服务使用一个专用的类使得代码在应对外部变化时更健壮(单一职责原则)。这几乎没有原因去改变类,因为服务仅仅只做一件事。应用服务会更容易测试,理解,因为它们只做一件事。这使得实现一个通用的应用服务契约更容易,使类装饰更简单(查看第 10 章,仓储的子章节 事务)。这将同样会更高内聚,因为所有依赖由单一的用例所独有。

execution 方法可以有一个更具表达性的名称,例如 signUp。但是,命令模式的执行通过应用服务标准格式化了通用契约,从而使得这更容易装饰,这对于事务非常方面。

一个类多个应用服务方法

有时候,将内聚的应用服务组织在同一个类中可能是个好主意:

class UserService
{
// ...
    public function signUp(SignUpUserRequest $request)
    {
// ...
    }
    public function signIn(SignUpUserRequest $request)
    {
// ...
    }
    public function logOut(LogOutUserRequest $request)
    {
// ...
    }
}

我们并不推荐这种做法,因为并不是所有应用服务都 100% 内聚的。一些服务可能需要不同的依赖,从而导致你的应用服务依赖其并不需要的东西。另一个问题是这种类会快速成长。因为它违反了单一职责原则,这将导致有多种原因改变它从而破坏它。

返回值

注册后,我们可能会考虑将用户重定向到个人资料页面。回传所需信息给控制器最自然的方式是直接从服务中返回实体。

class SignUpUserService
{
// ...
    public function execute(SignUpUserRequest $request)
    {
        $user = new User(
            $this->userRepository->nextIdentity(),
            $email,
            $password
        );
        $this->userRepository->add($user);
        return $user;
    }
}

然后,我们会从控制器中拿到 id 字段,然后重定向到别的地方。但是,三思而后行。我们返回了功能齐全的实体给控制器,这将使得交付机制可以绕过应用层直接与领域交互。

假设 User 实体提供了一个 updateEmailAddress 方法。你可以不这样做,但从长远来看,有人可能会考虑使用它:

$app-> match( '/signup' , function (Request $request) use ($app) {
// ...
    $user = $app['sign_up_user_application_service']->execute(
        new SignUpUserRequest(
            $request->get('email'),
            $request->get('password'))
    );
    $user->updateEmailAddress('shouldnotupdate@email.com');
// ...
});

不仅仅是那样,而且表现层所需的数据与领域管理的数据不相同。我们并不想围绕表现层演变和耦合领域层。相反,我们想自由进化。

为达到此目的,我们需要一种弹性的方式来解耦这两层。

来自聚合实例的 DTO

我们可以用表现层所需的信息返回干净的(sterile)数据。正如我们之前所见,DTO 非常适合这种场景。我们仅仅需要在应用服务中组合它们并将其返回给客户端:

class UserDTO
{
    private $email ;
// ...
    public function __construct(User $user)
    {
        $this->email = $user->email ();
// ...
    }
    public function email ()
    {
        return $this->email ;
    }
}

UserDTO 将在表现层上从 User 实体暴露我们任何所需的数据,从而避免暴露行为:

class SignUpUserService
{
    public function execute(SignUpUserRequest $request)
    {
// ...
        $user = // ...
    return new UserDTO($user);
    }
}

使命达成!现在我们可以把参数传给模板引擎并把它们转换进 挂件(widgets),标签(tags),或者子模板(subtemplates),或者用数据做任何我们想在表现层一侧所做的:

$app->match('/signup' , function (Request $request) use ($app) {
    /**
     * @var UserDTO $user
     */
    $userDto=$app['sign_up_user_application_service']->execute(
        new SignUpUserRequest(
            $request->get('email'),
            $request->get('password')
        )
    );
// ...
});

但是,让应用服务决定如何构建 DTO 揭示了另一个限制。由于构建 DTO 仅仅取决于应用服务,因此很难适配不同的客户端。考虑在同一用例上,Web 控制器重定向所需的数据和 REST 响应所需的数据,根本不相同。

让我们允许客户端通过传递特定的 DTO 汇编器(Assembler)来定义如何构建 DTO:

class SignUpUserService
{
    private $userDtoAssembler;
    public function __construct(
        UserRepository $userRepository,
        UserDTOAssembler $userDtoAssembler
    ) {
        $this->userRepository = $userRepository;
        $this->userDtoAssembler = $userDtoAssembler;
    }
    public function execute(SignUpUserRequest $request)
    {
        $user = // ...
    return $this->userDtoAssembler->assemble($user);
    }
}

现在客户端可以通过传递一个指定的 UserDTOAssembler 来自定义回复。

数据转换器(Data Transformer)

在一些情况下,为更复杂回复(像 JSON,XML,CSV,和 iCAL 契约)生成中间 DTO 可能被视为不必要的开销。我们可以将表述输出到缓冲区中,然后在交付端获取它。

转换器有助于降低由高层次领域概念到低层次客户端细节的开销。让我们看一个例子:

interface UserDataTransformer
{
    public function write(User $user);
    /**
     * @return mixed
     */
    public function read();
}

考虑为一个给定产品生成不同的数据表述的示例。通常,产品信息通过一个 web 接口(HTML)提供,但我们可能对提供其他格式感兴趣,例如 XML,JSON,或者 CSV。这可能会启动与其他服务的集成。

考虑一个类似 blog 的例子。我们可以用 HTML 暴露我们潜在的作者给外部,但一些用户对通过 RSS 消费我们的文章感兴趣。这个用例(应用服务)仍然相同。而表述却不一样。

DTO 是一个干净且简单的方案,它能够以不同表述形式传递给模板引擎,但最后数据转换这一步的逻辑可能有点复杂,因为这样的模板逻辑可能会成为一个需要维护,测试和理解的问题。

数据转换器可能在特定的例子上会更好。他们对领域概念(聚合,实体等等)来说是黑盒子,因为输入和只读表述(XML,JSON,CSV 等等)与输出一样。这些转换器也很容易测试:

class JsonUserDataTransformer implements UserDataTransformer
{
    private $data;
    public function write(User $user)
    {
// More complex logic could be placed here
// As using JMSSerializer, native json, etc.
        $this->data = json_encode($user);
    }
    /**
     * @return string
     */
    public function read()
    {
        return $this->data;
    }
}

这很简单。想知道 XML 或者 CSV 是怎样的?让我们看看怎样通过应用服务整合数据转换器:

class SignUpUserService
{
    private $userRepository;
    private $userDataTransformer;
    public function __construct(
        UserRepository $userRepository,
        UserDataTransformer $userDataTransformer
    ) {
        $this->userRepository = $userRepository;
        $this->userDataTransformer = $userDataTransformer;
    }
    public function execute(SignUpUserRequest $request)
    {
        $user = // ...
            $this->userDataTransformer()->write($user);
    }
    /**
     * @return UserDataTransformer
     */
    public function userDataTransformer()
    {
        return $this->userDataTransformer;
    }
}

这与 DTO 汇编器方法相似,但是这一次没有返回一个具体的值。数据转换器用来持有数据和与数据交互。

使用 DTO 最主要的问题是过度写入它们。大多数时候,你的领域概念和 DTO 表述会呈现相同的结构。大多数时候,你会觉得没必要花时间去做一个这样的映射。这的确令人沮丧,表述与聚合的关系不是 1:1。你可以将两个聚合以一个表述呈现。你也可以用多种方式呈现同一相聚合。你怎样做取决于你的用例。

不过,依据 Martin Fowler 所说:

当你在表现展示台的模型与基础领域模型之间存在重大不匹配时,使用 DTO 之类的方法很有用。在这种情况下,从领域模型映射特定表述的外观/网关(facade/gateway并且为表述呈现一个方便的接口是有意义的。这是值得做的,但是仅对于具有这种不匹配的场景才值得做(在这种情况下,这不是多余的工作,因为无论如何你都需要在场景下进行操作。)

我们认为从长远角度来看是值得投资的。在大中型项目中,接口表述和领域概念总是无规律的变化。你可能想将它们各自解耦以便为更新降低摩擦。使用 DTO 或数据转换器允许你自由演变模型而不必总是考虑破坏布局(layout)。

复合层上的多个应用服务

大多数时候,布局(layout)总是与单个应用服务不一样。我们的项目有相同复杂的接口。

考虑一个特定项目的首页。我们该怎样渲染如此多的用例?这有一些选项,让我们看看吧。

AJAX 内容的集成

你可以让浏览器直接通过 AJAX 或 Hijax 请求不同端点并在布局上组合数据。这可以避免在你的控制器混合多个应用服务,但它可能有性能开销,因为触发了多个请求。

ESI 内容的集成

Edge Side Includes(ESI)是一种小型的标记语言,与之前的方法相似,但是是在服务器端。它需要额外的精力来配置额外的中间件,例如 NGINX 或 Varnish,以使其正常工作。

Symfony 子请求

如果你使用 Symfony,子请求可能是一个有趣的选择。依据 Symfony Documentation:

除了发送给 HttpKernel::handle 的主请求之外,你也可以发送所谓的子请求(sub request)。子请求的外观和行为看起来与其它请求一样,但通常用来渲染页面的一小部分而不是整个页面。你大多数时候从控制器发送子请求(或者从模板内部,它由你的控制器渲染)。这会创建了另一个完整的请求 - 回复周期,在此周期,新的请求被转换为回复。内部唯一不同的是一些监听器(例如,安全)只能根据主请求执行操作。每个监听器都传递 KernelEvent 的某个子类,该子类的 MasterRequest() 可用于检查当前请求是主请求还是子请求。

这非常棒,因为在没有 AJAX 开销或者不使用复杂的 ESI 配置的情况下,你将会从调用独立的应用服务中受益。

一个控制器(controller),多个应用服务

最后一个选择可能是用同一个控制器管理多个应用服务,从而控制器的逻辑会变得有点脏,因为它要处理和合成传递给视图的回复。

测试应用服务

由于你对测试应用服务自身行为感兴趣,因此没必要将其转换为具有针对真实数据的复杂设置的集成测试。你对测试低层次细节是不感兴趣的,因此在大多数情况下,单元测试就足够了。

class SignUpUserServiceTest extends \PHPUnit_Framework_TestCase
{
    /**
     * @var \Lw\Domain\Model\User\UserRepository
     */
    private $userRepository;
    /**
     * @var SignUpUserService
     */
    private $signUpUserService;

    public function setUp()
    {
        $this->userRepository = new InMemoryUserRepository();
        $this->signUpUserService = new SignUpUserService(
            $this->userRepository
        );
    }

    /**
     * @test
     * @expectedException
     * \Lw\Domain\Model\User\UserAlreadyExistsException
     */
    public function alreadyExistingEmailShouldThrowAnException()
    {
        $this->executeSignUp();
        $this->executeSignUp();
    }

    private function executeSignUp()
    {
        return $this->signUpUserService->execute(
            new SignUpUserRequest(
                'user@example.com',
                'password'
            )
        );
    }

    /**
     * @test
     */
    public function afterUserSignUpItShouldBeInTheRepository()
    {
        $user = $this->executeSignUp();
        $this->assertSame(
            $user,
            $this->userRepository->ofId($user->id())
        );
    }
}

我们为 User 仓储提供了一个内存实现。这就是所谓的 Fake:仓储的全功能实现使我们的测试成为一个单元。我们不需要去测试类的行为。那会使我们的测试缓慢而脆弱。

检查领域事件的归属也很有趣。如果创建用户触发了用户注册的事件,则确保该事件触发可能是一个好主意:

class SignUpUserServiceTest extends \PHPUnit_Framework_TestCase
{
// ...
    /**
     * @test
     */
    public function itShouldPublishUserRegisteredEvent()
    {
        $subscriber = new SpySubscriber();
        $id = DomainEventPublisher::instance()->subscribe($subscriber);
        $user = $this->executeSignUp();
        $userId = $user->id();
        DomainEventPublisher::instance()->unsubscribe($id);
        $this->assertUserRegisteredEventPublished(
            $subscriber, $userId
        );
    }
    private function assertUserRegisteredEventPublished(
        $subscriber, $userId
    ) {
        $this->assertInstanceOf(
            'UserRegistered', $subscriber->domainEvent
        );
        $this->assertTrue(
            $subscriber->domainEvent->userId()->equals($userId)
        );
    }
}
class SpySubscriber implements DomainEventSubscriber
{
    public $domainEvent;
    public function handle($aDomainEvent)
    {
        $this->domainEvent = $aDomainEvent;
    }
    public function isSubscribedTo($aDomainEvent)
    {
        return true;
    }
}

事务

事务是与持久化机制相关的实现细节。领域层不需要关心底层实现细节。考虑在这一层开始,提交,或者回滚一个事务是种坏味道。这一层的细节属于基础设施层。

处理事务最好的方式是不处理它们。我们可以用一个装饰器包装我们的应用服务来自动处理事务会话。

我们已经在我们的一个仓储中为这个问题实现了一个方案,同时你可以在这里检查它:

interface TransactionalSession
{
    /**
     * @return mixed
     */
    public function executeAtomically(callable $operation);
}

这个契约只用了一小块代码并且自动执行。取决于你的持久化机制,你会得到不同的实现。

让我们看看怎样用 Doctrine ORM 来做:

class DoctrineSession implements TransactionalSession
{
    private $entityManager;

    public function __construct(EntityManager $entityManager)
    {
        $this->entityManager = $entityManager;
    }

    public function executeAtomically(callable $operation)
    {
        return $this->entityManager->transactional($operation);
    }
}

客户端是怎样使用上面的代码:

/** @var EntityManager $em */
$nonTxApplicationService = new SignUpUserService(
    $em->getRepository('BoundedContext\Domain\Model\User\User')
);
$txApplicationService = new TransactionalApplicationService(
    $nonTxApplicationService,
    new DoctrineSession($em)
);
$response = $txApplicationService->execute(
    new SignUpUserRequest(
        'user@example.com',
        'password'
    )
);

现在我们有了事务会话的 Doctrine 实现,为我们的应用服务创建一个装饰器会很棒。通过这种方法,我们使得事务性请求对领域透明化:

class TransactionalApplicationService implements ApplicationService
{
    private $session;
    private $service;
    public function __construct(
        ApplicationService $service, TransactionalSession $session
    ) {
        $this->session = $session;
        $this->service = $service;
    }
    public function execute(BaseRequest $request)
    {
        $operation = function () use ($request) {
            return $this->service->execute($request);
        };
        return $this->session->executeAtomically($operation);
    }
}

使用 Doctrine Session 的一个很好的副作用是,它会自动管理 flush 方法,因此你无需在领域或基础设施中添加 flush。

安全

如果你想知道一般如何管理和处理用户凭据和安全性,除非这是你领域的责任,否则我们建议让框架来处理它。用户会话是交付机制的关注点。用这样的概念污染领域将使开发变得更加困难。

领域事件

领域事件监听器不得不在应用服务执行之前配置好,否则没有人会被通知到。在某些情况下,必须先明确并配置监听器,然后才能执行应用服务。

// ...
$subscriber = new SpySubscriber();
DomainEventPublisher::instance()->subscribe($subscriber);
$applicationService = // ...
$applicationService->execute(...);

大多数时候,这可以通过配置依赖注入容器做到。

命令助手(Command Handlers)

执行应用服务的一个有趣的方式是通过一个命令总线(Command Bus)库。一个好的选择是 Tactician。来自 Tactician 官网上的介绍:

什么是命令总线?这个术语大多数用于当我们用服务层组合命令模式时。它的职责是取出一个命令对象(描述用户想做什么)并且匹配一个 Handler(用来执行)。这可以使你的代码结构整齐。

我们的应用服务就是服务层,并且我们的请求对象看起来非常像命令。

如果我们有一个链接到所有应用服务的机制,并且基于请求执行正确的请求,那不是很好吗?好吧,这实际上就是命令总线。

Tiactician 库和其他选择

Tactician 是一个命令总线库,它允许你在应用服务中使用命令模式。它对于应用服务尤其方便,但是你可以使用任何输入形式。

让我们看看 Tiactician 官网的例子:

// You build a simple message object like this:
class PurchaseProductCommand
{
    protected $productId;
    protected $userId;
// ...and constructor to assign those properties...
}
// And a Handler class that expects it:
class PurchaseProductHandler
{
    public function handle(PurchaseProductCommand $command)
    {
// use command to update your models, etc
    }
}
// And then in your Controllers, you can fill in the command using your favorite
// form or serializer library, then drop it in the CommandBus and you're done!
$command = new PurchaseProductCommand(42, 29);
$commandBus->handle($command);

这就是了,Tactician 是 $commandBus 服务。它搭建了所有查找正确的 handler 和 方法的管道,这可以避免许多样板代码,这里命令和 Handlers 仅仅是正常的类,但是你可以配置最适合你应用的一个。

总而言之,我们可以总结,命令就是请求对象,并且命令 Handlers 就是应用服务。

Tactician 一个非常酷的地方就是它们非常容易扩展。Tactician 为公用任务提供插件,像日志和数据库事务。这样,你可以忘掉在每个 handler 上做的连接。

Tactician 另一个有意思的插件言归正传 Bernard 集成。Bernard 是一个异步工作队列,它允许你将一些任务放到之后的进程。大量的进程会阻碍回复。大多数时候,我们可以分流以及在之后延迟它们的执行。最佳实践是,一旦分支进程完成,就立刻回复消费者让他们知道。

Matthias Noback 开发了另一个相似的项目,叫做 SimpleBus,它可以作为 Tactician 的替代方案。主要区别是 SimpleBus Command Handlers 没有返回值。

小结

应用服务呈现你限界上下文中的应用服务。高层次的用例应该简单且薄,因为它们的目的是围绕领域协调演变。应用服务是领域逻辑交互的入口。我们看到请求和命令保持事物有条不紊。DTO 和 数据转换器允许我们从领域概念中解耦数据表述。用依赖注入容器构建应用服务非常直观。并且在复杂布局中组合应用服务,我们有大量的方法。

查看原文

xddd 赞了文章 · 2020-01-16

《领域驱动设计之PHP实现》全书翻译 - 应用服务

应用服务

  1. 《领域驱动设计之PHP实现》全书翻译 - DDD入门
  2. 《领域驱动设计之PHP实现》全书翻译 - 架构风格
  3. 《领域驱动设计之PHP实现》全书翻译 - 值对象
  4. 《领域驱动设计之PHP实现》全书翻译 - 实体
  5. 《领域驱动设计之PHP实现》全书翻译 - 服务
  6. 《领域驱动设计之PHP实现》全书翻译 - 领域事件
  7. 《领域驱动设计之PHP实现》全书翻译 - 模块
  8. 《领域驱动设计之PHP实现》全书翻译 - 聚合
  9. 《领域驱动设计之PHP实现》全书翻译 - 工厂
  10. 《领域驱动设计之PHP实现》全书翻译 - 仓储
  11. 《领域驱动设计之PHP实现》全书翻译 - 应用服务
  12. 《领域驱动设计之PHP实现》全书翻译 - 集成上下文
  13. 《用PHP实现六边形架构》

应用层是将领域模型与查询或更改其状态的客户端分离的层。应用服务是此层的构建块。正如 Vaughn Vernon 所说:“应用服务是领域模型的直接客户端。” 你可以考虑把应用服务当作外部世界(HTML 表单,API 客户端,命令行,框架,UI 等等)与领域模型自身之间的连接点。考虑向外部展示系统的顶级用例,也许会有帮助。例如:“作为来宾,我想注册”,“作为以登录用户,我要购买产品”,等等。

在这一章,我们将解释怎样实现应用服务,理解命令模式(Command pattern)的角色,并且确定应用服务的职责。所以,让我们考虑一个注册新用户(signing up a new user)的用例。

从概念上讲,为注册新用户,我们必须:

  • 从客户端获取电子邮箱(email)和密码(password)
  • 检查电子邮箱(email)是否在使用
  • 创建一个新用户(user)
  • 将用户(user)添加到已有用户集合(user set)
  • 返回我们刚创建的用户(user)

让我们开始干吧!

请求(Requests)

我们需要发送电子邮件(email)和密码(password)给应用服务。这有许多方法可以从客户端来做这样事情(HTML 表单,API 客户端,或者甚至命令行)。我们可以通过用方法签名发送一个标准的参数(email 和 password),或者构建并发送一个含有这些信息的数据结构。后面的这种方法,即发送 DTO,把一些有趣的功能拿到台面上。通过发送对象,可以在命令总线上对其进行序列化和排队。也可以添加类型安全和一些 IDE 帮助。

数据传输对象(Data Transfer Object)

DTO 是一种在过程之间转移数据的数据结构。不要把它误认为是一种全能的对象。DTO 除了存储和检索它自身数据(存取器),没有其他任何行为。DTO 是简单的对象,它不应该含有任何需要测试的业务逻辑。

正如 Vaughn Vernon 所说:

应用服务方法签名仅使用基本类型(整数,字符串等等),或者 DTO 作为这些方法的替代方法,更好的方法可能是设计命令对象(Command Object)。这不一定是正确或错误的方法,这主要取决于你的口味和目标。

一个含有应用服务所含数据的 DTO 实现可能像这样:

namespace Lw\Application\Service\User;
class SignUpUserRequest
{
    private $email;
    private $password;

    public function __construct($email, $password)
    {
        $this->email = $email;
        $this->password = $password;
    }

    public function email()
    {
        return $this->email;
    }

    public function password()
    {
        return $this->password;
    }
}

正如你所见,SignUpUserRequest 没有行为,只有数据。这可能来自于一个 HTML 表单或者一个 API 端点,尽管我们不关心这些。

构建应用服务请求

从你最喜欢的框架交付机制创建一个请求,应该是最直观的。在 web 中,你可以将控制器请求中的参数打包成一个 DTO 并将它们下发给服务。对 CLI 命令来说也是相同的原则:读取输入参数并下发。

通过使用 Symfony,我们可以提取来自 HttpFoundation 组件的请求中的所需数据:

// ...
class UsersController extends Controller
{
    /**
     * @Route('/signup', name = 'signup')
     * @param Request $request
     * @return Response
     */
    public function signUpAction(Request $request)
    {
// ...
        $signUpUserRequest = new SignUpUserRequest(
            $request->get('email'),
            $request->get('password')
        );
// ...
    }
// ...

在一个使用 Form 组件来捕获和验证参数的更精细的 Silex 应用程序上,它看起来像这样:

// ...
$app->match('/signup', function (Request $request) use ($app) {
    $form = $app['sign_up_form'];
    $form->handleRequest($request);
    if ($form->isValid()) {
        $data = $form->getData();
        try {
            $app['sign_in_user_application_service']->execute(
                new SignUpUserRequest(
                    $data['email'],
                    $data['password']
                )
            );
            return $app->redirect(
                $app['url_generator']->generate('login')
            );
        } catch (UserAlreadyExistsException $e) {
            $form
                ->get('email')
                ->addError(
                    new FormError(
                        'Email is already registered by another user'
                    )
                );
        } catch (Exception $e) {
            $form
                ->addError(
                    new FormError(
                        'There was an error, please get in touch with us'
                    )
                );
        }
    }
    return $app['twig']->render('signup.html.twig', [
        'form' => $form->createView(),
    ]);
});

请求的设计

在设计你的请求对象时,你应该总是遵循这些原则:使用基本类型,序列化设计,并且不包含业务逻辑在里面。这样,你可以在单元测试时节省开支。

使用基本类型

我们推荐使用基本类型来构建你的请求对象(就是字符串,整数,布尔值等等)。我们仅仅是抽象出输入参数。你应该能够从交付机制当中独立地消费应用服务。即使是非常复杂的 HTML 表单,也总是可以在控制器级别转换为基本类型。你应该不想混淆你的框架和业务逻辑。

在某些情况下,很容易直接使用值对象。不要这样做。值对象定义的更新将影响所有客户端,并且你会将客户端与领域逻辑耦合在一起。

序列化

使用基本类型的一个副作用就是,任何请求对象可以轻松地序列化为字符串,发送并存储在消息系统或者数据库中。

无业务逻辑

避免在你的请求对象中加入任何业务甚至验证逻辑。验证应该放到你的领域中(这里指的是实体,值对象,领域服务等等)。验证是保持业务不变性和领域约束的方法。

无测试

应用程序请求是数据结构,不是对象。数据结构的单元测试就像测试 getters 和 setters。这没有行为需要测试,因此对请求对象和 DTO 进行单元测试没有太多价值。这些结构将作为更复杂的测试(例如集成测试或验收测试)的副作用而覆盖。

命令是请求对象的替代方法。我们设计一个具有多种应用方法服务,并且每个方法都含有你放到 Request 中的参数。对于简单的应用程序来说这是可以的,但后面我们就得操心这个问题。

应用服务剖析

当我们在请求中封装好了数据,就该处理业务逻辑了。正如 Vaughn Vernon 所说:“尽量保证应用服务很小很薄,仅仅用它们在模型上协调任务。”

首先要做的事情就是从请求中提取必要信息,即 email 和 password。必要的话,我们需要确认是否含有特殊 email 的用户。如果不关心这些的话,那么我们创建和添加用户到 UserRepository。在发现有用户具有相同 email 的特殊情况下,我们抛出一个异常以便客户端以自己的方式处理(显示错误,重试,或者直接忽略它)。

namespace Lw\Application\Service\User;

use Ddd\Application\Service\ApplicationService;
use Lw\Domain\Model\User\User;
use Lw\Domain\Model\User\UserAlreadyExistsException;
use Lw\Domain\Model\User\UserRepository;

class SignUpUserService
{
    private $userRepository;

    public function __construct(UserRepository $userRepository)
    {
        $this->userRepository = $userRepository;
    }

    public function execute(SignUpUserRequest $request)
    {
        $email = $request->email();
        $password = $request->password();
        $user = $this->userRepository->ofEmail($email);
        if ($user) {
            throw new UserAlreadyExistsException();
        }
        $this->userRepository->add(
            new User(
                $this->userRepository->nextIdentity(),
                $email,
                $password
            )
        );
    }
}

漂亮!如果你想知道构造函数里的 UserRepository 是做什么的,我们会在后续向你展示。

处理异常

应用服务抛出异常是向客户端反馈不正常的情况和流程的方法。这一层上的异常与业务逻辑有关(像查找一个用户),但并不是实现细节(像 PDOException,PredisException, 或者 DoctrineException)。

依赖倒置

处理用户不是服务的职责。正如我们在第 10 章,仓储中所见,有一个专门的类来处理 User 集合:UserRepository。这是一个从应用服务到仓储的依赖。我们不想用仓储的具体实现耦合应用服务,因此这之后会导致我们用基础设施细节耦合服务。所以我们依赖具体实现依赖的契约(接口),即 UserRepository。

UserRepository 的特定实现会在运行时被构建和传送,例如用 DoctrineUserRepository,一个使用 Doctine 的专门实现。传递一个专门的实现在测试时同样正常。例如,NotAvailableUserRepository 可以是一个专门的实现,它会在一个操作每个执行时抛出一个异常。这样,我们可以测试所有应用服务行为,包括悲观路径(sad paths),即使当出现了问题,应用服务也必须正常工作。

应用服务也可以依赖领域服务(如 GetBadgesByUser)。在运行时,此类服务的实现可能相当复杂。可以想像一个通过 HTTP 协议整合上下文的 HttpGetBadgesByUser。

依赖抽象,我们可以使我们的应用服务不受底层基础设施更改的影响。

应用服务实例

仅实例化应用服务很容易,但根据依赖构建的复杂度,构建依赖树的可能很棘手。为此,大多数框架都带有依赖注入容器。如果没有,你最后会在控制器的某处得到类似以下的代码:

$redisClient = new Predis\Client([
    'scheme' => 'tcp',
    'host' => '10.0.0.1',
    'port' => 6379
]);
$userRepository = new RedisUserRepository($redisClient);
$signUp = new SignUpUserService($userRepository);
$signUp->execute(new SignUpUserRequest(
    'user@example.com',
    'password'
));

我们决定为 UserRepository 使用 Redis 的实现。在之前的代码示例中,为构建一个在内部使用 Redis 的仓储,我们构建了所有所需的依赖项。这些依赖是:一个 Predis 客户端,以及所有连接到 Redis 服务器的参数。这不仅效率低下,还会在控制器内重复传播。

你可以将创建逻辑重构为一个工厂,或者你可以使用依赖注入容器(大多数现代框架都包含了它)。

使用依赖注入容器是否不好?

一点也不。依赖注入只是一个工具。它们有助于剥离构建依赖时的复杂性。对构建基础构件也很有用。Symfony 提供了一个完整的实现。

请考虑以下事实,将整个容器作为一个整体传递给服务之一是不好的做法。这就像将整个应用程序的上下文与领域耦合在一起。如果一个服务需要指定的对象,请从框架来创建它们,并且把它们作为依赖传递给服务。但不要使服务觉察到其中的上下文。

让我们看看如何在 Silex 中构建依赖:

$app = new \Silex\Application();
$app['redis_parameters'] = [
    'scheme' => 'tcp',
    'host' => '127.0.0.1',
    'port' => 6379
];
$app['redis'] = $app->share(function ($app) {
    return new Predis\Client($app['redis_parameters']);
});
$app['user_repository'] = $app->share(function($app) {
    return new RedisUserRepository(
        $app['redis']
    );
});
$app['sign_up_user_application_service'] = $app->share(function($app) {
    return new SignUpUserService(
        $app['user_repository']
    );
});
// ...
$app->match('/signup' ,function (Request $request) use ($app) {
// ...
    $app['sign_up_user_application_service']->execute(
        new SignUpUserRequest(
            $request->get('email'),
            $request->get('password')
        )
    );
// ...
});

正如你所见,$app 被当作服务容器使用。我们注册了所有所需的组件,以及它们的依赖。sign_up_user_application_service 依赖上面的定义。改变 user_repository 的实现就像返回其他东西一样简单(MySQL,MongoDB 等等),所以我们根本不需要改变服务代码。

Symfony 应用里等效的内容如下:

<?xml version=" 1.0" ?>
<container xmlns="http://symfony.com/schema/dic/services"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="
http://symfony.com/schema/dic/services
http://symfony.com/schema/dic/services/services-1.0.xsd">
    <services>
        <service
                id="sign_up_user_application_service"
                class="SignUpUserService">
            <argument type="service" id="user_repository" />
        </service>
        <service
                id="user_repository"
                class="RedisUserRepository">
            <argument type="service">
                <service class="Predis\Client" />
            </argument>
        </service>
    </services>
</container>

现在,你有了在 Symonfy Service Container 中的应用服务定义,之后获取它也非常直观。所有交付机制(Web 控制器,REST 控制器,甚至控制台命令)都共享相同的定义。在任何实现 ContainerWare 接口的类上,服务都是可用的。获取服务与 $this->get('sign_up_user_application_service') 一样容易。

总而言之,你怎样构建服务(adhoc,服务容器,工厂等等)并不重要。重要的是保持你的应用服务设置在基础设施边界之外。

自定义一个应用服务

自定义服务的主要方法是选择你要传递的依赖。根据你服务容器能力,这可能有一点棘手,因此你可以添加一个 setter 方法来即时改变依赖。例如,你可能需要更改依赖输出,以便你能设置一个默认项然后改变它。如果逻辑变得过于复杂,你可以创建一个应用服务工厂,它可以为你处理这种局面。

执行

这有两种调用应用服务的方法:一个是每个用例对应一个含有单个 execution 方法的专用类,以及多个应用服务和用例在同一个类。

一个类一个应用服务

这是我们更喜欢的方法,并且这可能适合所有场景:

class SignUpUserService
{
// ...
    public function execute(SignUpUserRequest $request)
    {
// ...
    }
}

每个应用服务使用一个专用的类使得代码在应对外部变化时更健壮(单一职责原则)。这几乎没有原因去改变类,因为服务仅仅只做一件事。应用服务会更容易测试,理解,因为它们只做一件事。这使得实现一个通用的应用服务契约更容易,使类装饰更简单(查看第 10 章,仓储的子章节 事务)。这将同样会更高内聚,因为所有依赖由单一的用例所独有。

execution 方法可以有一个更具表达性的名称,例如 signUp。但是,命令模式的执行通过应用服务标准格式化了通用契约,从而使得这更容易装饰,这对于事务非常方面。

一个类多个应用服务方法

有时候,将内聚的应用服务组织在同一个类中可能是个好主意:

class UserService
{
// ...
    public function signUp(SignUpUserRequest $request)
    {
// ...
    }
    public function signIn(SignUpUserRequest $request)
    {
// ...
    }
    public function logOut(LogOutUserRequest $request)
    {
// ...
    }
}

我们并不推荐这种做法,因为并不是所有应用服务都 100% 内聚的。一些服务可能需要不同的依赖,从而导致你的应用服务依赖其并不需要的东西。另一个问题是这种类会快速成长。因为它违反了单一职责原则,这将导致有多种原因改变它从而破坏它。

返回值

注册后,我们可能会考虑将用户重定向到个人资料页面。回传所需信息给控制器最自然的方式是直接从服务中返回实体。

class SignUpUserService
{
// ...
    public function execute(SignUpUserRequest $request)
    {
        $user = new User(
            $this->userRepository->nextIdentity(),
            $email,
            $password
        );
        $this->userRepository->add($user);
        return $user;
    }
}

然后,我们会从控制器中拿到 id 字段,然后重定向到别的地方。但是,三思而后行。我们返回了功能齐全的实体给控制器,这将使得交付机制可以绕过应用层直接与领域交互。

假设 User 实体提供了一个 updateEmailAddress 方法。你可以不这样做,但从长远来看,有人可能会考虑使用它:

$app-> match( '/signup' , function (Request $request) use ($app) {
// ...
    $user = $app['sign_up_user_application_service']->execute(
        new SignUpUserRequest(
            $request->get('email'),
            $request->get('password'))
    );
    $user->updateEmailAddress('shouldnotupdate@email.com');
// ...
});

不仅仅是那样,而且表现层所需的数据与领域管理的数据不相同。我们并不想围绕表现层演变和耦合领域层。相反,我们想自由进化。

为达到此目的,我们需要一种弹性的方式来解耦这两层。

来自聚合实例的 DTO

我们可以用表现层所需的信息返回干净的(sterile)数据。正如我们之前所见,DTO 非常适合这种场景。我们仅仅需要在应用服务中组合它们并将其返回给客户端:

class UserDTO
{
    private $email ;
// ...
    public function __construct(User $user)
    {
        $this->email = $user->email ();
// ...
    }
    public function email ()
    {
        return $this->email ;
    }
}

UserDTO 将在表现层上从 User 实体暴露我们任何所需的数据,从而避免暴露行为:

class SignUpUserService
{
    public function execute(SignUpUserRequest $request)
    {
// ...
        $user = // ...
    return new UserDTO($user);
    }
}

使命达成!现在我们可以把参数传给模板引擎并把它们转换进 挂件(widgets),标签(tags),或者子模板(subtemplates),或者用数据做任何我们想在表现层一侧所做的:

$app->match('/signup' , function (Request $request) use ($app) {
    /**
     * @var UserDTO $user
     */
    $userDto=$app['sign_up_user_application_service']->execute(
        new SignUpUserRequest(
            $request->get('email'),
            $request->get('password')
        )
    );
// ...
});

但是,让应用服务决定如何构建 DTO 揭示了另一个限制。由于构建 DTO 仅仅取决于应用服务,因此很难适配不同的客户端。考虑在同一用例上,Web 控制器重定向所需的数据和 REST 响应所需的数据,根本不相同。

让我们允许客户端通过传递特定的 DTO 汇编器(Assembler)来定义如何构建 DTO:

class SignUpUserService
{
    private $userDtoAssembler;
    public function __construct(
        UserRepository $userRepository,
        UserDTOAssembler $userDtoAssembler
    ) {
        $this->userRepository = $userRepository;
        $this->userDtoAssembler = $userDtoAssembler;
    }
    public function execute(SignUpUserRequest $request)
    {
        $user = // ...
    return $this->userDtoAssembler->assemble($user);
    }
}

现在客户端可以通过传递一个指定的 UserDTOAssembler 来自定义回复。

数据转换器(Data Transformer)

在一些情况下,为更复杂回复(像 JSON,XML,CSV,和 iCAL 契约)生成中间 DTO 可能被视为不必要的开销。我们可以将表述输出到缓冲区中,然后在交付端获取它。

转换器有助于降低由高层次领域概念到低层次客户端细节的开销。让我们看一个例子:

interface UserDataTransformer
{
    public function write(User $user);
    /**
     * @return mixed
     */
    public function read();
}

考虑为一个给定产品生成不同的数据表述的示例。通常,产品信息通过一个 web 接口(HTML)提供,但我们可能对提供其他格式感兴趣,例如 XML,JSON,或者 CSV。这可能会启动与其他服务的集成。

考虑一个类似 blog 的例子。我们可以用 HTML 暴露我们潜在的作者给外部,但一些用户对通过 RSS 消费我们的文章感兴趣。这个用例(应用服务)仍然相同。而表述却不一样。

DTO 是一个干净且简单的方案,它能够以不同表述形式传递给模板引擎,但最后数据转换这一步的逻辑可能有点复杂,因为这样的模板逻辑可能会成为一个需要维护,测试和理解的问题。

数据转换器可能在特定的例子上会更好。他们对领域概念(聚合,实体等等)来说是黑盒子,因为输入和只读表述(XML,JSON,CSV 等等)与输出一样。这些转换器也很容易测试:

class JsonUserDataTransformer implements UserDataTransformer
{
    private $data;
    public function write(User $user)
    {
// More complex logic could be placed here
// As using JMSSerializer, native json, etc.
        $this->data = json_encode($user);
    }
    /**
     * @return string
     */
    public function read()
    {
        return $this->data;
    }
}

这很简单。想知道 XML 或者 CSV 是怎样的?让我们看看怎样通过应用服务整合数据转换器:

class SignUpUserService
{
    private $userRepository;
    private $userDataTransformer;
    public function __construct(
        UserRepository $userRepository,
        UserDataTransformer $userDataTransformer
    ) {
        $this->userRepository = $userRepository;
        $this->userDataTransformer = $userDataTransformer;
    }
    public function execute(SignUpUserRequest $request)
    {
        $user = // ...
            $this->userDataTransformer()->write($user);
    }
    /**
     * @return UserDataTransformer
     */
    public function userDataTransformer()
    {
        return $this->userDataTransformer;
    }
}

这与 DTO 汇编器方法相似,但是这一次没有返回一个具体的值。数据转换器用来持有数据和与数据交互。

使用 DTO 最主要的问题是过度写入它们。大多数时候,你的领域概念和 DTO 表述会呈现相同的结构。大多数时候,你会觉得没必要花时间去做一个这样的映射。这的确令人沮丧,表述与聚合的关系不是 1:1。你可以将两个聚合以一个表述呈现。你也可以用多种方式呈现同一相聚合。你怎样做取决于你的用例。

不过,依据 Martin Fowler 所说:

当你在表现展示台的模型与基础领域模型之间存在重大不匹配时,使用 DTO 之类的方法很有用。在这种情况下,从领域模型映射特定表述的外观/网关(facade/gateway并且为表述呈现一个方便的接口是有意义的。这是值得做的,但是仅对于具有这种不匹配的场景才值得做(在这种情况下,这不是多余的工作,因为无论如何你都需要在场景下进行操作。)

我们认为从长远角度来看是值得投资的。在大中型项目中,接口表述和领域概念总是无规律的变化。你可能想将它们各自解耦以便为更新降低摩擦。使用 DTO 或数据转换器允许你自由演变模型而不必总是考虑破坏布局(layout)。

复合层上的多个应用服务

大多数时候,布局(layout)总是与单个应用服务不一样。我们的项目有相同复杂的接口。

考虑一个特定项目的首页。我们该怎样渲染如此多的用例?这有一些选项,让我们看看吧。

AJAX 内容的集成

你可以让浏览器直接通过 AJAX 或 Hijax 请求不同端点并在布局上组合数据。这可以避免在你的控制器混合多个应用服务,但它可能有性能开销,因为触发了多个请求。

ESI 内容的集成

Edge Side Includes(ESI)是一种小型的标记语言,与之前的方法相似,但是是在服务器端。它需要额外的精力来配置额外的中间件,例如 NGINX 或 Varnish,以使其正常工作。

Symfony 子请求

如果你使用 Symfony,子请求可能是一个有趣的选择。依据 Symfony Documentation:

除了发送给 HttpKernel::handle 的主请求之外,你也可以发送所谓的子请求(sub request)。子请求的外观和行为看起来与其它请求一样,但通常用来渲染页面的一小部分而不是整个页面。你大多数时候从控制器发送子请求(或者从模板内部,它由你的控制器渲染)。这会创建了另一个完整的请求 - 回复周期,在此周期,新的请求被转换为回复。内部唯一不同的是一些监听器(例如,安全)只能根据主请求执行操作。每个监听器都传递 KernelEvent 的某个子类,该子类的 MasterRequest() 可用于检查当前请求是主请求还是子请求。

这非常棒,因为在没有 AJAX 开销或者不使用复杂的 ESI 配置的情况下,你将会从调用独立的应用服务中受益。

一个控制器(controller),多个应用服务

最后一个选择可能是用同一个控制器管理多个应用服务,从而控制器的逻辑会变得有点脏,因为它要处理和合成传递给视图的回复。

测试应用服务

由于你对测试应用服务自身行为感兴趣,因此没必要将其转换为具有针对真实数据的复杂设置的集成测试。你对测试低层次细节是不感兴趣的,因此在大多数情况下,单元测试就足够了。

class SignUpUserServiceTest extends \PHPUnit_Framework_TestCase
{
    /**
     * @var \Lw\Domain\Model\User\UserRepository
     */
    private $userRepository;
    /**
     * @var SignUpUserService
     */
    private $signUpUserService;

    public function setUp()
    {
        $this->userRepository = new InMemoryUserRepository();
        $this->signUpUserService = new SignUpUserService(
            $this->userRepository
        );
    }

    /**
     * @test
     * @expectedException
     * \Lw\Domain\Model\User\UserAlreadyExistsException
     */
    public function alreadyExistingEmailShouldThrowAnException()
    {
        $this->executeSignUp();
        $this->executeSignUp();
    }

    private function executeSignUp()
    {
        return $this->signUpUserService->execute(
            new SignUpUserRequest(
                'user@example.com',
                'password'
            )
        );
    }

    /**
     * @test
     */
    public function afterUserSignUpItShouldBeInTheRepository()
    {
        $user = $this->executeSignUp();
        $this->assertSame(
            $user,
            $this->userRepository->ofId($user->id())
        );
    }
}

我们为 User 仓储提供了一个内存实现。这就是所谓的 Fake:仓储的全功能实现使我们的测试成为一个单元。我们不需要去测试类的行为。那会使我们的测试缓慢而脆弱。

检查领域事件的归属也很有趣。如果创建用户触发了用户注册的事件,则确保该事件触发可能是一个好主意:

class SignUpUserServiceTest extends \PHPUnit_Framework_TestCase
{
// ...
    /**
     * @test
     */
    public function itShouldPublishUserRegisteredEvent()
    {
        $subscriber = new SpySubscriber();
        $id = DomainEventPublisher::instance()->subscribe($subscriber);
        $user = $this->executeSignUp();
        $userId = $user->id();
        DomainEventPublisher::instance()->unsubscribe($id);
        $this->assertUserRegisteredEventPublished(
            $subscriber, $userId
        );
    }
    private function assertUserRegisteredEventPublished(
        $subscriber, $userId
    ) {
        $this->assertInstanceOf(
            'UserRegistered', $subscriber->domainEvent
        );
        $this->assertTrue(
            $subscriber->domainEvent->userId()->equals($userId)
        );
    }
}
class SpySubscriber implements DomainEventSubscriber
{
    public $domainEvent;
    public function handle($aDomainEvent)
    {
        $this->domainEvent = $aDomainEvent;
    }
    public function isSubscribedTo($aDomainEvent)
    {
        return true;
    }
}

事务

事务是与持久化机制相关的实现细节。领域层不需要关心底层实现细节。考虑在这一层开始,提交,或者回滚一个事务是种坏味道。这一层的细节属于基础设施层。

处理事务最好的方式是不处理它们。我们可以用一个装饰器包装我们的应用服务来自动处理事务会话。

我们已经在我们的一个仓储中为这个问题实现了一个方案,同时你可以在这里检查它:

interface TransactionalSession
{
    /**
     * @return mixed
     */
    public function executeAtomically(callable $operation);
}

这个契约只用了一小块代码并且自动执行。取决于你的持久化机制,你会得到不同的实现。

让我们看看怎样用 Doctrine ORM 来做:

class DoctrineSession implements TransactionalSession
{
    private $entityManager;

    public function __construct(EntityManager $entityManager)
    {
        $this->entityManager = $entityManager;
    }

    public function executeAtomically(callable $operation)
    {
        return $this->entityManager->transactional($operation);
    }
}

客户端是怎样使用上面的代码:

/** @var EntityManager $em */
$nonTxApplicationService = new SignUpUserService(
    $em->getRepository('BoundedContext\Domain\Model\User\User')
);
$txApplicationService = new TransactionalApplicationService(
    $nonTxApplicationService,
    new DoctrineSession($em)
);
$response = $txApplicationService->execute(
    new SignUpUserRequest(
        'user@example.com',
        'password'
    )
);

现在我们有了事务会话的 Doctrine 实现,为我们的应用服务创建一个装饰器会很棒。通过这种方法,我们使得事务性请求对领域透明化:

class TransactionalApplicationService implements ApplicationService
{
    private $session;
    private $service;
    public function __construct(
        ApplicationService $service, TransactionalSession $session
    ) {
        $this->session = $session;
        $this->service = $service;
    }
    public function execute(BaseRequest $request)
    {
        $operation = function () use ($request) {
            return $this->service->execute($request);
        };
        return $this->session->executeAtomically($operation);
    }
}

使用 Doctrine Session 的一个很好的副作用是,它会自动管理 flush 方法,因此你无需在领域或基础设施中添加 flush。

安全

如果你想知道一般如何管理和处理用户凭据和安全性,除非这是你领域的责任,否则我们建议让框架来处理它。用户会话是交付机制的关注点。用这样的概念污染领域将使开发变得更加困难。

领域事件

领域事件监听器不得不在应用服务执行之前配置好,否则没有人会被通知到。在某些情况下,必须先明确并配置监听器,然后才能执行应用服务。

// ...
$subscriber = new SpySubscriber();
DomainEventPublisher::instance()->subscribe($subscriber);
$applicationService = // ...
$applicationService->execute(...);

大多数时候,这可以通过配置依赖注入容器做到。

命令助手(Command Handlers)

执行应用服务的一个有趣的方式是通过一个命令总线(Command Bus)库。一个好的选择是 Tactician。来自 Tactician 官网上的介绍:

什么是命令总线?这个术语大多数用于当我们用服务层组合命令模式时。它的职责是取出一个命令对象(描述用户想做什么)并且匹配一个 Handler(用来执行)。这可以使你的代码结构整齐。

我们的应用服务就是服务层,并且我们的请求对象看起来非常像命令。

如果我们有一个链接到所有应用服务的机制,并且基于请求执行正确的请求,那不是很好吗?好吧,这实际上就是命令总线。

Tiactician 库和其他选择

Tactician 是一个命令总线库,它允许你在应用服务中使用命令模式。它对于应用服务尤其方便,但是你可以使用任何输入形式。

让我们看看 Tiactician 官网的例子:

// You build a simple message object like this:
class PurchaseProductCommand
{
    protected $productId;
    protected $userId;
// ...and constructor to assign those properties...
}
// And a Handler class that expects it:
class PurchaseProductHandler
{
    public function handle(PurchaseProductCommand $command)
    {
// use command to update your models, etc
    }
}
// And then in your Controllers, you can fill in the command using your favorite
// form or serializer library, then drop it in the CommandBus and you're done!
$command = new PurchaseProductCommand(42, 29);
$commandBus->handle($command);

这就是了,Tactician 是 $commandBus 服务。它搭建了所有查找正确的 handler 和 方法的管道,这可以避免许多样板代码,这里命令和 Handlers 仅仅是正常的类,但是你可以配置最适合你应用的一个。

总而言之,我们可以总结,命令就是请求对象,并且命令 Handlers 就是应用服务。

Tactician 一个非常酷的地方就是它们非常容易扩展。Tactician 为公用任务提供插件,像日志和数据库事务。这样,你可以忘掉在每个 handler 上做的连接。

Tactician 另一个有意思的插件言归正传 Bernard 集成。Bernard 是一个异步工作队列,它允许你将一些任务放到之后的进程。大量的进程会阻碍回复。大多数时候,我们可以分流以及在之后延迟它们的执行。最佳实践是,一旦分支进程完成,就立刻回复消费者让他们知道。

Matthias Noback 开发了另一个相似的项目,叫做 SimpleBus,它可以作为 Tactician 的替代方案。主要区别是 SimpleBus Command Handlers 没有返回值。

小结

应用服务呈现你限界上下文中的应用服务。高层次的用例应该简单且薄,因为它们的目的是围绕领域协调演变。应用服务是领域逻辑交互的入口。我们看到请求和命令保持事物有条不紊。DTO 和 数据转换器允许我们从领域概念中解耦数据表述。用依赖注入容器构建应用服务非常直观。并且在复杂布局中组合应用服务,我们有大量的方法。

查看原文

赞 5 收藏 4 评论 0

xddd 收藏了文章 · 2019-09-07

实战Go内存泄露

最近解决了我们项目中的一个内存泄露问题,事实再次证明pprof是一个好工具,但掌握好工具的正确用法,才能发挥好工具的威力,不然就算你手里有屠龙刀,也成不了天下第一,本文就是带你用pprof定位内存泄露问题。

关于Go的内存泄露有这么一句话不知道你听过没有:

10次内存泄露,有9次是goroutine泄露。

我所解决的问题,也是goroutine泄露导致的内存泄露,所以这篇文章主要介绍Go程序的goroutine泄露,掌握了如何定位和解决goroutine泄露,就掌握了内存泄露的大部分场景

本文草稿最初数据都是生产坏境数据,为了防止敏感内容泄露,全部替换成了demo数据,demo的数据比生产环境数据简单多了,更适合入门理解,有助于掌握pprof。

go pprof基本知识

定位goroutine泄露会使用到pprof,pprof是Go的性能工具,在开始介绍内存泄露前,先简单介绍下pprof的基本使用,更详细的使用给大家推荐了资料。

什么是pprof

pprof是Go的性能分析工具,在程序运行过程中,可以记录程序的运行信息,可以是CPU使用情况、内存使用情况、goroutine运行情况等,当需要性能调优或者定位Bug时候,这些记录的信息是相当重要。

基本使用

使用pprof有多种方式,Go已经现成封装好了1个:net/http/pprof,使用简单的几行命令,就可以开启pprof,记录运行信息,并且提供了Web服务,能够通过浏览器和命令行2种方式获取运行数据。

看个最简单的pprof的例子:

文件:golang_step_by_step/pprof/pprof/demo.go

package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
)

func main() {
    // 开启pprof,监听请求
    ip := "0.0.0.0:6060"
    if err := http.ListenAndServe(ip, nil); err != nil {
        fmt.Printf("start pprof failed on %s\n", ip)
    }
}

提醒:本文所有代码部分可左右滑动

浏览器方式

image-20190516173924325

输入网址ip:port/debug/pprof/打开pprof主页,从上到下依次是5类profile信息

  1. block:goroutine的阻塞信息,本例就截取自一个goroutine阻塞的demo,但block为0,没掌握block的用法
  2. goroutine:所有goroutine的信息,下面的full goroutine stack dump是输出所有goroutine的调用栈,是goroutine的debug=2,后面会详细介绍。
  3. heap:堆内存的信息
  4. mutex:锁的信息
  5. threadcreate:线程信息

这篇文章我们主要关注goroutine和heap,这两个都会打印调用栈信息,goroutine里面还会包含goroutine的数量信息,heap则是内存分配信息,本文用不到的地方就不展示了,最后推荐几篇文章大家去看。

命令行方式

当连接在服务器终端上的时候,是没有浏览器可以使用的,Go提供了命令行的方式,能够获取以上5类信息,这种方式用起来更方便。

使用命令go tool pprof url可以获取指定的profile文件,此命令会发起http请求,然后下载数据到本地,之后进入交互式模式,就像gdb一样,可以使用命令查看运行信息,以下是5类请求的方式:

# 下载cpu profile,默认从当前开始收集30s的cpu使用情况,需要等待30s
go tool pprof http://localhost:6060/debug/pprof/profile   # 30-second CPU profile
go tool pprof http://localhost:6060/debug/pprof/profile?seconds=120     # wait 120s

# 下载heap profile
go tool pprof http://localhost:6060/debug/pprof/heap      # heap profile

# 下载goroutine profile
go tool pprof http://localhost:6060/debug/pprof/goroutine # goroutine profile

# 下载block profile
go tool pprof http://localhost:6060/debug/pprof/block     # goroutine blocking profile

# 下载mutex profile
go tool pprof http://localhost:6060/debug/pprof/mutex

上面的pprof/demo.go太简单了,如果去获取内存profile,几乎获取不到什么,换一个Demo进行内存profile的展示:

文件:golang_step_by_step/pprof/heap/demo2.go

// 展示内存增长和pprof,并不是泄露
package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
    "os"
    "time"
)

// 运行一段时间:fatal error: runtime: out of memory
func main() {
    // 开启pprof
    go func() {
        ip := "0.0.0.0:6060"
        if err := http.ListenAndServe(ip, nil); err != nil {
            fmt.Printf("start pprof failed on %s\n", ip)
            os.Exit(1)
        }
    }()

    tick := time.Tick(time.Second / 100)
    var buf []byte
    for range tick {
        buf = append(buf, make([]byte, 1024*1024)...)
    }
}

上面这个demo会不断的申请内存,把它编译运行起来,然后执行:

$ go tool pprof http://localhost:6060/debug/pprof/heap

Fetching profile over HTTP from http://localhost:6060/debug/pprof/heap
Saved profile in /home/ubuntu/pprof/pprof.demo.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz       //<--- 下载到的内存profile文件
File: demo // 程序名称
Build ID: a9069a125ee9c0df3713b2149ca859e8d4d11d5a
Type: inuse_space
Time: May 16, 2019 at 8:55pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof)
(pprof) help  // 使用help打印所有可用命令
  Commands:
    callgrind        Outputs a graph in callgrind format
    comments         Output all profile comments
    disasm           Output assembly listings annotated with samples
    dot              Outputs a graph in DOT format
    eog              Visualize graph through eog
    evince           Visualize graph through evince
    gif              Outputs a graph image in GIF format
    gv               Visualize graph through gv
    kcachegrind      Visualize report in KCachegrind
    list             Output annotated source for functions matching regexp
    pdf              Outputs a graph in PDF format
    peek             Output callers/callees of functions matching regexp
    png              Outputs a graph image in PNG format
    proto            Outputs the profile in compressed protobuf format
    ps               Outputs a graph in PS format
    raw              Outputs a text representation of the raw profile
    svg              Outputs a graph in SVG format
    tags             Outputs all tags in the profile
    text             Outputs top entries in text form
    top              Outputs top entries in text form
    topproto         Outputs top entries in compressed protobuf format
    traces           Outputs all profile samples in text form
    tree             Outputs a text rendering of call graph
    web              Visualize graph through web browser
    weblist          Display annotated source in a web browser
    o/options        List options and their current values
    quit/exit/^D     Exit pprof
    
    ....

以上信息我们只关注2个地方:

  1. 下载得到的文件:/home/ubuntu/pprof/pprof.demo.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz,这其中包含了程序名demo,profile类型alloc已分配的内存,inuse代表使用中的内存。
  2. help可以获取帮助,最先会列出支持的命令,想掌握pprof,要多看看,多尝试。

关于命令,本文只会用到3个,我认为也是最常用的:toplisttraces,分别介绍一下。

top

按指标大小列出前10个函数,比如内存是按内存占用多少,CPU是按执行时间多少。

(pprof) top
Showing nodes accounting for 814.62MB, 100% of 814.62MB total
      flat  flat%   sum%        cum   cum%
  814.62MB   100%   100%   814.62MB   100%  main.main
         0     0%   100%   814.62MB   100%  runtime.main

top会列出5个统计数据:

  • flat: 本函数占用的内存量。
  • flat%: 本函数内存占使用中内存总量的百分比。
  • sum%: 前面每一行flat百分比的和,比如第2行虽然的100% 是 100% + 0%。
  • cum: 是累计量,加入main函数调用了函数f,函数f占用的内存量,也会记进来。
  • cum%: 是累计量占总量的百分比。

list

查看某个函数的代码,以及该函数每行代码的指标信息,如果函数名不明确,会进行模糊匹配,比如list main会列出main.mainruntime.main

(pprof) list main.main  // 精确列出函数
Total: 814.62MB
ROUTINE ======================== main.main in /home/ubuntu/heap/demo2.go
  814.62MB   814.62MB (flat, cum)   100% of Total
         .          .     20:    }()
         .          .     21:
         .          .     22:    tick := time.Tick(time.Second / 100)
         .          .     23:    var buf []byte
         .          .     24:    for range tick {
  814.62MB   814.62MB     25:        buf = append(buf, make([]byte, 1024*1024)...)
         .          .     26:    }
         .          .     27:}
         .          .     28:
(pprof) list main  // 匹配所有函数名带main的函数
Total: 814.62MB
ROUTINE ======================== main.main in /home/ubuntu/heap/demo2.go
  814.62MB   814.62MB (flat, cum)   100% of Total
         .          .     20:    }()
         .          .     21:
..... // 省略几行
         .          .     28:
ROUTINE ======================== runtime.main in /usr/lib/go-1.10/src/runtime/proc.go
         0   814.62MB (flat, cum)   100% of Total
         .          .    193:        // A program compiled with -buildmode=c-archive or c-shared
..... // 省略几行

可以看到在main.main中的第25行占用了814.62MB内存,左右2个数据分别是flat和cum,含义和top中解释的一样。

traces

打印所有调用栈,以及调用栈的指标信息。

(pprof) traces
File: demo2
Type: inuse_space
Time: May 16, 2019 at 7:08pm (CST)
-----------+-------------------------------------------------------
     bytes:  813.46MB
  813.46MB   main.main
             runtime.main
-----------+-------------------------------------------------------
     bytes:  650.77MB
         0   main.main
             runtime.main
....... // 省略几十行

每个- - - - - 隔开的是一个调用栈,能看到runtime.main调用了main.main,并且main.main中占用了813.46MB内存。

其他的profile操作和内存是类似的,这里就不展示了。

这里只是简单介绍本文用到的pprof的功能,pprof功能很强大,也经常和benchmark结合起来,但这不是本文的重点,所以就不多介绍了,为大家推荐几篇文章,一定要好好研读、实践:

  1. Go官方博客关于pprof的介绍,很详细,也包含样例,可以实操:Profiling Go Programs
  2. 跟煎鱼也讨论过pprof,煎鱼的这篇文章也很适合入门: Golang 大杀器之性能剖析 PProf

什么是内存泄露

内存泄露指的是程序运行过程中已不再使用的内存,没有被释放掉,导致这些内存无法被使用,直到程序结束这些内存才被释放的问题。

Go虽然有GC来回收不再使用的堆内存,减轻了开发人员对内存的管理负担,但这并不意味着Go程序不再有内存泄露问题。在Go程序中,如果没有Go语言的编程思维,也不遵守良好的编程实践,就可能埋下隐患,造成内存泄露问题。

怎么发现内存泄露

在Go中发现内存泄露有2种方法,一个是通用的监控工具,另一个是go pprof:

  1. 监控工具:固定周期对进程的内存占用情况进行采样,数据可视化后,根据内存占用走势(持续上升),很容易发现是否发生内存泄露。
  2. go pprof:适合没有监控工具的情况,使用Go提供的pprof工具判断是否发生内存泄露。

这2种方式分别介绍一下。

监控工具查看进程内在占用情况

如果使用云平台部署Go程序,云平台都提供了内存查看的工具,可以查看OS的内存占用情况和某个进程的内存占用情况,比如阿里云,我们在1个云主机上只部署了1个Go服务,所以OS的内存占用情况,基本是也反映了进程内存占用情况,OS内存占用情况如下,可以看到随着时间的推进,内存的占用率在不断的提高,这是内存泄露的最明显现象

image-20190512111200988

如果没有云平台这种内存监控工具,可以制作一个简单的内存记录工具。

1、建立一个脚本prog_mem.sh,获取进程占用的物理内存情况,脚本内容如下:

#!/bin/bash
prog_name="your_programe_name"
prog_mem=$(pidstat  -r -u -h -C $prog_name |awk 'NR==4{print $12}')
time=$(date "+%Y-%m-%d %H:%M:%S")
echo $time"\tmemory(Byte)\t"$prog_mem >>~/record/prog_mem.log

2、然后使用crontab建立定时任务,每分钟记录1次。使用crontab -e编辑crontab配置,在最后增加1行:

*/1 * * * * ~/record/prog_mem.sh

脚本输出的内容保存在prog_mem.log,只要大体浏览一下就可以发现内存的增长情况,判断是否存在内存泄露。如果需要可视化,可以直接黏贴prog_mem.log内容到Excel等表格工具,绘制内存占用图。

image-20190512172935195

go pprof发现存在内存问题

有情提醒:如果对pprof不了解,可以先看go pprof基本知识,这是下一节,看完再倒回来看。

如果你Google或者百度,Go程序内存泄露的文章,它总会告诉你使用pprof heap,能够生成漂亮的调用路径图,火焰图等等,然后你根据调用路径就能定位内存泄露问题,我最初也是对此深信不疑,尝试了若干天后,只是发现内存泄露跟某种场景有关,根本找不到内存泄露的根源,如果哪位朋友用heap就能定位内存泄露的线上问题,麻烦介绍下

后来读了Dave的《High Performance Go Workshop》,刷新了对heap的认识,内存pprof的简要内容如下:

image-20190512114048868

Dave讲了以下几点:

  1. 内存profiling记录的是堆内存分配的情况,以及调用栈信息,并不是进程完整的内存情况,猜测这也是在go pprof中称为heap而不是memory的原因。
  2. 栈内存的分配是在调用栈结束后会被释放的内存,所以并不在内存profile中
  3. 内存profiling是基于抽样的,默认是每1000次堆内存分配,执行1次profile记录。
  4. 因为内存profiling是基于抽样和它跟踪的是已分配的内存,而不是使用中的内存,(比如有些内存已经分配,看似使用,但实际以及不使用的内存,比如内存泄露的那部分),所以不能使用内存profiling衡量程序总体的内存使用情况
  5. Dave个人观点:使用内存profiling不能够发现内存泄露

基于目前对heap的认知,我有2个观点:

  1. heap能帮助我们发现内存问题,但不一定能发现内存泄露问题,这个看法与Dave是类似的。heap记录了内存分配的情况,我们能通过heap观察内存的变化,增长与减少,内存主要被哪些代码占用了,程序存在内存问题,这只能说明内存有使用不合理的地方,但并不能说明这是内存泄露。
  2. heap在帮助定位内存泄露原因上贡献的力量微乎其微。如第一条所言,能通过heap找到占用内存多的位置,但这个位置通常不一定是内存泄露,就算是内存泄露,也只是内存泄露的结果,并不是真正导致内存泄露的根源。

接下来,我介绍怎么用heap发现问题,然后再解释为什么heap几乎不能定位内存泄露的根因。

怎么用heap发现内存问题

使用pprof的heap能够获取程序运行时的内存信息,在程序平稳运行的情况下,每个一段时间使用heap获取内存的profile,然后使用base能够对比两个profile文件的差别,就像diff命令一样显示出增加和减少的变化,使用一个简单的demo来说明heap和base的使用,依然使用demo2进行展示。

文件:golang_step_by_step/pprof/heap/demo2.go

// 展示内存增长和pprof,并不是泄露
package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
    "os"
    "time"
)

// 运行一段时间:fatal error: runtime: out of memory
func main() {
    // 开启pprof
    go func() {
        ip := "0.0.0.0:6060"
        if err := http.ListenAndServe(ip, nil); err != nil {
            fmt.Printf("start pprof failed on %s\n", ip)
            os.Exit(1)
        }
    }()

    tick := time.Tick(time.Second / 100)
    var buf []byte
    for range tick {
        buf = append(buf, make([]byte, 1024*1024)...)
    }
}

将上面代码运行起来,执行以下命令获取profile文件,Ctrl-D退出,1分钟后再获取1次。

go tool pprof http://localhost:6060/debug/pprof/heap

我已经获取到了两个profile文件:

$ ls
pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz
pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.002.pb.gz

使用base把001文件作为基准,然后用002和001对比,先执行toptop的对比,然后执行list main列出main函数的内存对比,结果如下:

$ go tool pprof -base pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.002.pb.gz

File: demo2
Type: inuse_space
Time: May 14, 2019 at 2:33pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof)
(pprof) top
Showing nodes accounting for 970.34MB, 32.30% of 3003.99MB total
      flat  flat%   sum%        cum   cum%
  970.34MB 32.30% 32.30%   970.34MB 32.30%  main.main   // 看这
         0     0% 32.30%   970.34MB 32.30%  runtime.main
(pprof)
(pprof)
(pprof) list main.main
Total: 2.93GB
ROUTINE ======================== main.main in /home/ubuntu/heap/demo2.go
  970.34MB   970.34MB (flat, cum) 32.30% of Total
         .          .     20:    }()
         .          .     21:
         .          .     22:    tick := time.Tick(time.Second / 100)
         .          .     23:    var buf []byte
         .          .     24:    for range tick {
  970.34MB   970.34MB     25:        buf = append(buf, make([]byte, 1024*1024)...) // 看这
         .          .     26:    }
         .          .     27:}
         .          .     28:

top列出了main.mainruntime.mainmain.main就是我们编写的main函数,runtime.main是runtime包中的main函数,也就是所有main函数的入口,这里不多介绍了,有兴趣可以看之前的调度器文章《Go调度器系列(2)宏观看调度器》

top显示main.main 第2次内存占用,比第1次内存占用多了970.34MB。

list main.main告诉了我们增长的内存都在这一行:

buf = append(buf, make([]byte, 1024*1024)...)

001和002 profile的文件不进去看了,你本地测试下计算差值,绝对是刚才对比出的970.34MB。

heap“不能”定位内存泄露

heap能显示内存的分配情况,以及哪行代码占用了多少内存,我们能轻易的找到占用内存最多的地方,如果这个地方的数值还在不断怎大,基本可以认定这里就是内存泄露的位置。

曾想按图索骥,从内存泄露的位置,根据调用栈向上查找,总能找到内存泄露的原因,这种方案看起来是不错的,但实施起来却找不到内存泄露的原因,结果是事半功倍。

原因在于一个Go程序,其中有大量的goroutine,这其中的调用关系也许有点复杂,也许内存泄露是在某个三方包里。举个栗子,比如下面这幅图,每个椭圆代表1个goroutine,其中的数字为编号,箭头代表调用关系。heap profile显示g111(最下方标红节点)这个协程的代码出现了泄露,任何一个从g101到g111的调用路径都可能造成了g111的内存泄露,有2类可能:

  1. 该goroutine只调用了少数几次,但消耗了大量的内存,说明每个goroutine调用都消耗了不少内存,内存泄露的原因基本就在该协程内部
  2. 该goroutine的调用次数非常多,虽然每个协程调用过程中消耗的内存不多,但该调用路径上,协程数量巨大,造成消耗大量的内存,并且这些goroutine由于某种原因无法退出,占用的内存不会释放,内存泄露的原因在到g111调用路径上某段代码实现有问题,造成创建了大量的g111

第2种情况,就是goroutine泄露,这是通过heap无法发现的,所以heap在定位内存泄露这件事上,发挥的作用不大

image-20190512144150064


goroutine泄露怎么导致内存泄露

什么是goroutine泄露

如果你启动了1个goroutine,但并没有符合预期的退出,直到程序结束,此goroutine才退出,这种情况就是goroutine泄露。

提前思考:什么会导致goroutine无法退出/阻塞?

goroutine泄露怎么导致内存泄露

每个goroutine占用2KB内存,泄露1百万goroutine至少泄露2KB * 1000000 = 2GB内存,为什么说至少呢?

goroutine执行过程中还存在一些变量,如果这些变量指向堆内存中的内存,GC会认为这些内存仍在使用,不会对其进行回收,这些内存谁都无法使用,造成了内存泄露。

所以goroutine泄露有2种方式造成内存泄露:

  1. goroutine本身的栈所占用的空间造成内存泄露。
  2. goroutine中的变量所占用的堆内存导致堆内存泄露,这一部分是能通过heap profile体现出来的。

Dave在文章中也提到了,如果不知道何时停止一个goroutine,这个goroutine就是潜在的内存泄露:

7.1.1 Know when to stop a goroutine

If you don’t know the answer, that’s a potential memory leak as the goroutine will pin its stack’s memory on the heap, as well as any heap allocated variables reachable from the stack.

怎么确定是goroutine泄露引发的内存泄露

掌握了前面的pprof命令行的基本用法,很快就可以确认是否是goroutine泄露导致内存泄露,如果你不记得了,马上回去看一下go pprof基本知识

判断依据:在节点正常运行的情况下,隔一段时间获取goroutine的数量,如果后面获取的那次,某些goroutine比前一次多,如果多获取几次,是持续增长的,就极有可能是goroutine泄露

goroutine导致内存泄露的demo:

文件:golang_step_by_step/pprof/goroutine/leak_demo1.go

// goroutine泄露导致内存泄露
package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
    "os"
    "time"
)

func main() {
    // 开启pprof
    go func() {
        ip := "0.0.0.0:6060"
        if err := http.ListenAndServe(ip, nil); err != nil {
            fmt.Printf("start pprof failed on %s\n", ip)
            os.Exit(1)
        }
    }()

    outCh := make(chan int)
    // 死代码,永不读取
    go func() {
        if false {
            <-outCh
        }
        select {}
    }()

    // 每s起100个goroutine,goroutine会阻塞,不释放内存
    tick := time.Tick(time.Second / 100)
    i := 0
    for range tick {
        i++
        fmt.Println(i)
        alloc1(outCh)
    }
}

func alloc1(outCh chan<- int) {
    go alloc2(outCh)
}

func alloc2(outCh chan<- int) {
    func() {
        defer fmt.Println("alloc-fm exit")
        // 分配内存,假用一下
        buf := make([]byte, 1024*1024*10)
        _ = len(buf)
        fmt.Println("alloc done")

        outCh <- 0 // 53行
    }()
}

编译并运行以上代码,然后使用go tool pprof获取gorourine的profile文件。

go tool pprof http://localhost:6060/debug/pprof/goroutine

已经通过pprof命令获取了2个goroutine的profile文件:

$ ls
/home/ubuntu/pprof/pprof.leak_demo.goroutine.001.pb.gz
/home/ubuntu/pprof/pprof.leak_demo.goroutine.002.pb.gz

同heap一样,我们可以使用base对比2个goroutine profile文件:

$go tool pprof -base pprof.leak_demo.goroutine.001.pb.gz pprof.leak_demo.goroutine.002.pb.gz

File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof) top
Showing nodes accounting for 20312, 100% of 20312 total
      flat  flat%   sum%        cum   cum%
     20312   100%   100%      20312   100%  runtime.gopark
         0     0%   100%      20312   100%  main.alloc2
         0     0%   100%      20312   100%  main.alloc2.func1
         0     0%   100%      20312   100%  runtime.chansend
         0     0%   100%      20312   100%  runtime.chansend1
         0     0%   100%      20312   100%  runtime.goparkunlock
(pprof)

可以看到运行到runtime.gopark的goroutine数量增加了20312个。再通过002文件,看一眼执行到gopark的goroutine数量,即挂起的goroutine数量:

go tool pprof pprof.leak_demo.goroutine.002.pb.gz
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:47pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 24330, 100% of 24331 total
Dropped 32 nodes (cum <= 121)
      flat  flat%   sum%        cum   cum%
     24330   100%   100%      24330   100%  runtime.gopark
         0     0%   100%      24326   100%  main.alloc2
         0     0%   100%      24326   100%  main.alloc2.func1
         0     0%   100%      24326   100%  runtime.chansend
         0     0%   100%      24326   100%  runtime.chansend1
         0     0%   100%      24327   100%  runtime.goparkunlock

显示有24330个goroutine被挂起,这不是goroutine泄露这是啥?已经能确定八九成goroutine泄露了。

是什么导致如此多的goroutine被挂起而无法退出?接下来就看怎么定位goroutine泄露。


定位goroutine泄露的2种方法

使用pprof有2种方式,一种是web网页,一种是go tool pprof命令行交互,这两种方法查看goroutine都支持,但有轻微不同,也有各自的优缺点。

我们先看Web的方式,再看命令行交互的方式,这两种都很好使用,结合起来用也不错。

Web可视化查看

Web方式适合web服务器的端口能访问的情况,使用起来方便,有2种方式:

  1. 查看某条调用路径上,当前阻塞在此goroutine的数量
  2. 查看所有goroutine的运行栈(调用路径),可以显示阻塞在此的时间

方式一

url请求中设置debug=1:

http://ip:port/debug/pprof/goroutine?debug=1

效果如下:

看起来密密麻麻的,其实简单又十分有用,看上图标出来的部分,手机上图看起来可能不方便,那就放大图片,或直接看下面各字段的含义:

  1. goroutine profile: total 32023:32023是goroutine的总数量
  2. 32015 @ 0x42e15a 0x42e20e 0x40534b 0x4050e5 ...:32015代表当前有32015个goroutine运行这个调用栈,并且停在相同位置,@后面的十六进制,现在用不到这个数据,所以暂不深究了。
  3. 下面是当前goroutine的调用栈,列出了函数和所在文件的行数,这个行数对定位很有帮助,如下:
32015 @ 0x42e15a 0x42e20e 0x40534b 0x4050e5 0x6d8559 0x6d831b 0x45abe1
#    0x6d8558    main.alloc2.func1+0xf8    /home/ubuntu/heap/leak_demo.go:53
#    0x6d831a    main.alloc2+0x2a    /home/ubuntu/heap/leak_demo.go:54

根据上面的提示,就能判断32015个goroutine运行到leak_demo.go的53行:

func alloc2(outCh chan<- int) {
    func() {
        defer fmt.Println("alloc-fm exit")
        // 分配内存,假用一下
        buf := make([]byte, 1024*1024*10)
        _ = len(buf)
        fmt.Println("alloc done")

        outCh <- 0 // 53行
    }()
}

阻塞的原因是outCh这个写操作无法完成,outCh是无缓冲的通道,并且由于以下代码是死代码,所以goroutine始终没有从outCh读数据,造成outCh阻塞,进而造成无数个alloc2的goroutine阻塞,形成内存泄露:

if false {
    <-outCh
}

方式二

url请求中设置debug=2:

http://ip:port/debug/pprof/goroutine?debug=2

第2种方式和第1种方式是互补的,它可以看到每个goroutine的信息:

  1. goroutine 20 [chan send, 2 minutes]:20是goroutine id,[]中是当前goroutine的状态,阻塞在写channel,并且阻塞了2分钟,长时间运行的系统,你能看到阻塞时间更长的情况。
  2. 同时,也可以看到调用栈,看当前执行停到哪了:leak_demo.go的53行,
goroutine 20 [chan send, 2 minutes]:
main.alloc2.func1(0xc42015e060)
    /home/ubuntu/heap/leak_demo.go:53 +0xf9  // 这
main.alloc2(0xc42015e060)
    /home/ubuntu/heap/leak_demo.go:54 +0x2b
created by main.alloc1
    /home/ubuntu/heap/leak_demo.go:42 +0x3f

命令行交互式方法

Web的方法是简单粗暴,无需登录服务器,浏览器打开看看就行了。但就像前面提的,没有浏览器可访问时,命令行交互式才是最佳的方式,并且也是手到擒来,感觉比Web一样方便。

命令行交互式只有1种获取goroutine profile的方法,不像Web网页分debug=1debug=22中方式,并将profile文件保存到本地:

// 注意命令没有`debug=1`,debug=1,加debug有些版本的go不支持
$ go tool pprof http://0.0.0.0:6060/debug/pprof/goroutine
Fetching profile over HTTP from http://localhost:6061/debug/pprof/goroutine
Saved profile in /home/ubuntu/pprof/pprof.leak_demo.goroutine.001.pb.gz  // profile文件保存位置
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)

命令行只需要掌握3个命令就好了,上面介绍过了,详细的倒回去看top, list, traces

  1. top:显示正运行到某个函数goroutine的数量
  2. traces:显示所有goroutine的调用栈
  3. list:列出代码详细的信息。

我们依然使用leak_demo.go这个demo,

$  go tool pprof -base pprof.leak_demo.goroutine.001.pb.gz pprof.leak_demo.goroutine.002.pb.gz
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof)
(pprof) top
Showing nodes accounting for 20312, 100% of 20312 total
      flat  flat%   sum%        cum   cum%
     20312   100%   100%      20312   100%  runtime.gopark
         0     0%   100%      20312   100%  main.alloc2
         0     0%   100%      20312   100%  main.alloc2.func1
         0     0%   100%      20312   100%  runtime.chansend
         0     0%   100%      20312   100%  runtime.chansend1
         0     0%   100%      20312   100%  runtime.goparkunlock
(pprof)
(pprof) traces
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
-----------+-------------------------------------------------------
     20312   runtime.gopark
             runtime.goparkunlock
             runtime.chansend
             runtime.chansend1 // channel发送
             main.alloc2.func1 // alloc2中的匿名函数
             main.alloc2
-----------+-------------------------------------------------------

top命令在怎么确定是goroutine泄露引发的内存泄露介绍过了,直接看traces命令,traces能列出002中比001中多的那些goroutine的调用栈,这里只有1个调用栈,有20312个goroutine都执行这个调用路径,可以看到alloc2中的匿名函数alloc2.func1调用了写channel的操作,然后阻塞挂起了goroutine,使用list列出alloc2.func1的代码,显示有20312个goroutine阻塞在53行:

(pprof) list main.alloc2.func1
Total: 20312
ROUTINE ======================== main.alloc2.func1 in /home/ubuntu/heap/leak_demo.go
         0      20312 (flat, cum)   100% of Total
         .          .     48:        // 分配内存,假用一下
         .          .     49:        buf := make([]byte, 1024*1024*10)
         .          .     50:        _ = len(buf)
         .          .     51:        fmt.Println("alloc done")
         .          .     52:
         .      20312     53:        outCh <- 0  // 看这
         .          .     54:    }()
         .          .     55:}
         .          .     56:

友情提醒:使用list命令的前提是程序的源码在当前机器,不然可没法列出源码。服务器上,通常没有源码,那我们咋办呢?刚才介绍了Web查看的方式,那里会列出代码行数,我们可以使用wget下载网页:

$ wget http://localhost:6060/debug/pprof/goroutine?debug=1

下载网页后,使用编辑器打开文件,使用关键字main.alloc2.func1进行搜索,找到与当前相同的调用栈,就可以看到该goroutine阻塞在哪一行了,不要忘记使用debug=2还可以看到阻塞了多久和原因,Web方式中已经介绍了,此处省略代码几十行。


总结

文章略长,但全是干货,感谢阅读到这。然读到着了,跟定很想掌握pprof,建议实践一把,现在和大家温习一把本文的主要内容。

goroutine泄露的本质

goroutine泄露的本质是channel阻塞,无法继续向下执行,导致此goroutine关联的内存都无法释放,进一步造成内存泄露。

goroutine泄露的发现和定位

利用好go pprof获取goroutine profile文件,然后利用3个命令top、traces、list定位内存泄露的原因。

goroutine泄露的场景

泄露的场景不仅限于以下两类,但因channel相关的泄露是最多的。

  1. channel的读或者写:

    1. 无缓冲channel的阻塞通常是写操作因为没有读而阻塞
    2. 有缓冲的channel因为缓冲区满了,写操作阻塞
    3. 期待从channel读数据,结果没有goroutine写
  2. select操作,select里也是channel操作,如果所有case上的操作阻塞,goroutine也无法继续执行。

编码goroutine泄露的建议

为避免goroutine泄露造成内存泄露,启动goroutine前要思考清楚:

  1. goroutine如何退出?
  2. 是否会有阻塞造成无法退出?如果有,那么这个路径是否会创建大量的goroutine?

示例源码

本文所有示例源码,及历史文章、代码都存储在Github,阅读原文可直接跳转,Github:https://github.com/Shitaibin/golang_step_by_step/tree/master/pprof

推荐阅读

这些既是参考资料也是推荐阅读的文章,不容错过。

【Go Blog关于pprof详细介绍和Demo】 https://blog.golang.org/profi...

【Dave关于高性能Go程序的workshop】 https://dave.cheney.net/high-...

【煎鱼pprof文章,很适合入门 Golang大杀器之性能剖析PProf】 https://segmentfault.com/a/11...

【SO上goroutine调用栈各字段的介绍】https://stackoverflow.com/a/3...

【我的老文,有runtime.main的介绍,想学习调度器,可以看下系列文章 Go调度器系列(2)宏观看调度器】http://lessisbetter.site/2019...

  1. 如果这篇文章对你有帮助,不妨关注下我的Github,有文章会收到通知。
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/05/18/go-goroutine-leak/

查看原文

xddd 赞了文章 · 2019-09-07

实战Go内存泄露

最近解决了我们项目中的一个内存泄露问题,事实再次证明pprof是一个好工具,但掌握好工具的正确用法,才能发挥好工具的威力,不然就算你手里有屠龙刀,也成不了天下第一,本文就是带你用pprof定位内存泄露问题。

关于Go的内存泄露有这么一句话不知道你听过没有:

10次内存泄露,有9次是goroutine泄露。

我所解决的问题,也是goroutine泄露导致的内存泄露,所以这篇文章主要介绍Go程序的goroutine泄露,掌握了如何定位和解决goroutine泄露,就掌握了内存泄露的大部分场景

本文草稿最初数据都是生产坏境数据,为了防止敏感内容泄露,全部替换成了demo数据,demo的数据比生产环境数据简单多了,更适合入门理解,有助于掌握pprof。

go pprof基本知识

定位goroutine泄露会使用到pprof,pprof是Go的性能工具,在开始介绍内存泄露前,先简单介绍下pprof的基本使用,更详细的使用给大家推荐了资料。

什么是pprof

pprof是Go的性能分析工具,在程序运行过程中,可以记录程序的运行信息,可以是CPU使用情况、内存使用情况、goroutine运行情况等,当需要性能调优或者定位Bug时候,这些记录的信息是相当重要。

基本使用

使用pprof有多种方式,Go已经现成封装好了1个:net/http/pprof,使用简单的几行命令,就可以开启pprof,记录运行信息,并且提供了Web服务,能够通过浏览器和命令行2种方式获取运行数据。

看个最简单的pprof的例子:

文件:golang_step_by_step/pprof/pprof/demo.go

package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
)

func main() {
    // 开启pprof,监听请求
    ip := "0.0.0.0:6060"
    if err := http.ListenAndServe(ip, nil); err != nil {
        fmt.Printf("start pprof failed on %s\n", ip)
    }
}

提醒:本文所有代码部分可左右滑动

浏览器方式

image-20190516173924325

输入网址ip:port/debug/pprof/打开pprof主页,从上到下依次是5类profile信息

  1. block:goroutine的阻塞信息,本例就截取自一个goroutine阻塞的demo,但block为0,没掌握block的用法
  2. goroutine:所有goroutine的信息,下面的full goroutine stack dump是输出所有goroutine的调用栈,是goroutine的debug=2,后面会详细介绍。
  3. heap:堆内存的信息
  4. mutex:锁的信息
  5. threadcreate:线程信息

这篇文章我们主要关注goroutine和heap,这两个都会打印调用栈信息,goroutine里面还会包含goroutine的数量信息,heap则是内存分配信息,本文用不到的地方就不展示了,最后推荐几篇文章大家去看。

命令行方式

当连接在服务器终端上的时候,是没有浏览器可以使用的,Go提供了命令行的方式,能够获取以上5类信息,这种方式用起来更方便。

使用命令go tool pprof url可以获取指定的profile文件,此命令会发起http请求,然后下载数据到本地,之后进入交互式模式,就像gdb一样,可以使用命令查看运行信息,以下是5类请求的方式:

# 下载cpu profile,默认从当前开始收集30s的cpu使用情况,需要等待30s
go tool pprof http://localhost:6060/debug/pprof/profile   # 30-second CPU profile
go tool pprof http://localhost:6060/debug/pprof/profile?seconds=120     # wait 120s

# 下载heap profile
go tool pprof http://localhost:6060/debug/pprof/heap      # heap profile

# 下载goroutine profile
go tool pprof http://localhost:6060/debug/pprof/goroutine # goroutine profile

# 下载block profile
go tool pprof http://localhost:6060/debug/pprof/block     # goroutine blocking profile

# 下载mutex profile
go tool pprof http://localhost:6060/debug/pprof/mutex

上面的pprof/demo.go太简单了,如果去获取内存profile,几乎获取不到什么,换一个Demo进行内存profile的展示:

文件:golang_step_by_step/pprof/heap/demo2.go

// 展示内存增长和pprof,并不是泄露
package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
    "os"
    "time"
)

// 运行一段时间:fatal error: runtime: out of memory
func main() {
    // 开启pprof
    go func() {
        ip := "0.0.0.0:6060"
        if err := http.ListenAndServe(ip, nil); err != nil {
            fmt.Printf("start pprof failed on %s\n", ip)
            os.Exit(1)
        }
    }()

    tick := time.Tick(time.Second / 100)
    var buf []byte
    for range tick {
        buf = append(buf, make([]byte, 1024*1024)...)
    }
}

上面这个demo会不断的申请内存,把它编译运行起来,然后执行:

$ go tool pprof http://localhost:6060/debug/pprof/heap

Fetching profile over HTTP from http://localhost:6060/debug/pprof/heap
Saved profile in /home/ubuntu/pprof/pprof.demo.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz       //<--- 下载到的内存profile文件
File: demo // 程序名称
Build ID: a9069a125ee9c0df3713b2149ca859e8d4d11d5a
Type: inuse_space
Time: May 16, 2019 at 8:55pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof)
(pprof) help  // 使用help打印所有可用命令
  Commands:
    callgrind        Outputs a graph in callgrind format
    comments         Output all profile comments
    disasm           Output assembly listings annotated with samples
    dot              Outputs a graph in DOT format
    eog              Visualize graph through eog
    evince           Visualize graph through evince
    gif              Outputs a graph image in GIF format
    gv               Visualize graph through gv
    kcachegrind      Visualize report in KCachegrind
    list             Output annotated source for functions matching regexp
    pdf              Outputs a graph in PDF format
    peek             Output callers/callees of functions matching regexp
    png              Outputs a graph image in PNG format
    proto            Outputs the profile in compressed protobuf format
    ps               Outputs a graph in PS format
    raw              Outputs a text representation of the raw profile
    svg              Outputs a graph in SVG format
    tags             Outputs all tags in the profile
    text             Outputs top entries in text form
    top              Outputs top entries in text form
    topproto         Outputs top entries in compressed protobuf format
    traces           Outputs all profile samples in text form
    tree             Outputs a text rendering of call graph
    web              Visualize graph through web browser
    weblist          Display annotated source in a web browser
    o/options        List options and their current values
    quit/exit/^D     Exit pprof
    
    ....

以上信息我们只关注2个地方:

  1. 下载得到的文件:/home/ubuntu/pprof/pprof.demo.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz,这其中包含了程序名demo,profile类型alloc已分配的内存,inuse代表使用中的内存。
  2. help可以获取帮助,最先会列出支持的命令,想掌握pprof,要多看看,多尝试。

关于命令,本文只会用到3个,我认为也是最常用的:toplisttraces,分别介绍一下。

top

按指标大小列出前10个函数,比如内存是按内存占用多少,CPU是按执行时间多少。

(pprof) top
Showing nodes accounting for 814.62MB, 100% of 814.62MB total
      flat  flat%   sum%        cum   cum%
  814.62MB   100%   100%   814.62MB   100%  main.main
         0     0%   100%   814.62MB   100%  runtime.main

top会列出5个统计数据:

  • flat: 本函数占用的内存量。
  • flat%: 本函数内存占使用中内存总量的百分比。
  • sum%: 前面每一行flat百分比的和,比如第2行虽然的100% 是 100% + 0%。
  • cum: 是累计量,加入main函数调用了函数f,函数f占用的内存量,也会记进来。
  • cum%: 是累计量占总量的百分比。

list

查看某个函数的代码,以及该函数每行代码的指标信息,如果函数名不明确,会进行模糊匹配,比如list main会列出main.mainruntime.main

(pprof) list main.main  // 精确列出函数
Total: 814.62MB
ROUTINE ======================== main.main in /home/ubuntu/heap/demo2.go
  814.62MB   814.62MB (flat, cum)   100% of Total
         .          .     20:    }()
         .          .     21:
         .          .     22:    tick := time.Tick(time.Second / 100)
         .          .     23:    var buf []byte
         .          .     24:    for range tick {
  814.62MB   814.62MB     25:        buf = append(buf, make([]byte, 1024*1024)...)
         .          .     26:    }
         .          .     27:}
         .          .     28:
(pprof) list main  // 匹配所有函数名带main的函数
Total: 814.62MB
ROUTINE ======================== main.main in /home/ubuntu/heap/demo2.go
  814.62MB   814.62MB (flat, cum)   100% of Total
         .          .     20:    }()
         .          .     21:
..... // 省略几行
         .          .     28:
ROUTINE ======================== runtime.main in /usr/lib/go-1.10/src/runtime/proc.go
         0   814.62MB (flat, cum)   100% of Total
         .          .    193:        // A program compiled with -buildmode=c-archive or c-shared
..... // 省略几行

可以看到在main.main中的第25行占用了814.62MB内存,左右2个数据分别是flat和cum,含义和top中解释的一样。

traces

打印所有调用栈,以及调用栈的指标信息。

(pprof) traces
File: demo2
Type: inuse_space
Time: May 16, 2019 at 7:08pm (CST)
-----------+-------------------------------------------------------
     bytes:  813.46MB
  813.46MB   main.main
             runtime.main
-----------+-------------------------------------------------------
     bytes:  650.77MB
         0   main.main
             runtime.main
....... // 省略几十行

每个- - - - - 隔开的是一个调用栈,能看到runtime.main调用了main.main,并且main.main中占用了813.46MB内存。

其他的profile操作和内存是类似的,这里就不展示了。

这里只是简单介绍本文用到的pprof的功能,pprof功能很强大,也经常和benchmark结合起来,但这不是本文的重点,所以就不多介绍了,为大家推荐几篇文章,一定要好好研读、实践:

  1. Go官方博客关于pprof的介绍,很详细,也包含样例,可以实操:Profiling Go Programs
  2. 跟煎鱼也讨论过pprof,煎鱼的这篇文章也很适合入门: Golang 大杀器之性能剖析 PProf

什么是内存泄露

内存泄露指的是程序运行过程中已不再使用的内存,没有被释放掉,导致这些内存无法被使用,直到程序结束这些内存才被释放的问题。

Go虽然有GC来回收不再使用的堆内存,减轻了开发人员对内存的管理负担,但这并不意味着Go程序不再有内存泄露问题。在Go程序中,如果没有Go语言的编程思维,也不遵守良好的编程实践,就可能埋下隐患,造成内存泄露问题。

怎么发现内存泄露

在Go中发现内存泄露有2种方法,一个是通用的监控工具,另一个是go pprof:

  1. 监控工具:固定周期对进程的内存占用情况进行采样,数据可视化后,根据内存占用走势(持续上升),很容易发现是否发生内存泄露。
  2. go pprof:适合没有监控工具的情况,使用Go提供的pprof工具判断是否发生内存泄露。

这2种方式分别介绍一下。

监控工具查看进程内在占用情况

如果使用云平台部署Go程序,云平台都提供了内存查看的工具,可以查看OS的内存占用情况和某个进程的内存占用情况,比如阿里云,我们在1个云主机上只部署了1个Go服务,所以OS的内存占用情况,基本是也反映了进程内存占用情况,OS内存占用情况如下,可以看到随着时间的推进,内存的占用率在不断的提高,这是内存泄露的最明显现象

image-20190512111200988

如果没有云平台这种内存监控工具,可以制作一个简单的内存记录工具。

1、建立一个脚本prog_mem.sh,获取进程占用的物理内存情况,脚本内容如下:

#!/bin/bash
prog_name="your_programe_name"
prog_mem=$(pidstat  -r -u -h -C $prog_name |awk 'NR==4{print $12}')
time=$(date "+%Y-%m-%d %H:%M:%S")
echo $time"\tmemory(Byte)\t"$prog_mem >>~/record/prog_mem.log

2、然后使用crontab建立定时任务,每分钟记录1次。使用crontab -e编辑crontab配置,在最后增加1行:

*/1 * * * * ~/record/prog_mem.sh

脚本输出的内容保存在prog_mem.log,只要大体浏览一下就可以发现内存的增长情况,判断是否存在内存泄露。如果需要可视化,可以直接黏贴prog_mem.log内容到Excel等表格工具,绘制内存占用图。

image-20190512172935195

go pprof发现存在内存问题

有情提醒:如果对pprof不了解,可以先看go pprof基本知识,这是下一节,看完再倒回来看。

如果你Google或者百度,Go程序内存泄露的文章,它总会告诉你使用pprof heap,能够生成漂亮的调用路径图,火焰图等等,然后你根据调用路径就能定位内存泄露问题,我最初也是对此深信不疑,尝试了若干天后,只是发现内存泄露跟某种场景有关,根本找不到内存泄露的根源,如果哪位朋友用heap就能定位内存泄露的线上问题,麻烦介绍下

后来读了Dave的《High Performance Go Workshop》,刷新了对heap的认识,内存pprof的简要内容如下:

image-20190512114048868

Dave讲了以下几点:

  1. 内存profiling记录的是堆内存分配的情况,以及调用栈信息,并不是进程完整的内存情况,猜测这也是在go pprof中称为heap而不是memory的原因。
  2. 栈内存的分配是在调用栈结束后会被释放的内存,所以并不在内存profile中
  3. 内存profiling是基于抽样的,默认是每1000次堆内存分配,执行1次profile记录。
  4. 因为内存profiling是基于抽样和它跟踪的是已分配的内存,而不是使用中的内存,(比如有些内存已经分配,看似使用,但实际以及不使用的内存,比如内存泄露的那部分),所以不能使用内存profiling衡量程序总体的内存使用情况
  5. Dave个人观点:使用内存profiling不能够发现内存泄露

基于目前对heap的认知,我有2个观点:

  1. heap能帮助我们发现内存问题,但不一定能发现内存泄露问题,这个看法与Dave是类似的。heap记录了内存分配的情况,我们能通过heap观察内存的变化,增长与减少,内存主要被哪些代码占用了,程序存在内存问题,这只能说明内存有使用不合理的地方,但并不能说明这是内存泄露。
  2. heap在帮助定位内存泄露原因上贡献的力量微乎其微。如第一条所言,能通过heap找到占用内存多的位置,但这个位置通常不一定是内存泄露,就算是内存泄露,也只是内存泄露的结果,并不是真正导致内存泄露的根源。

接下来,我介绍怎么用heap发现问题,然后再解释为什么heap几乎不能定位内存泄露的根因。

怎么用heap发现内存问题

使用pprof的heap能够获取程序运行时的内存信息,在程序平稳运行的情况下,每个一段时间使用heap获取内存的profile,然后使用base能够对比两个profile文件的差别,就像diff命令一样显示出增加和减少的变化,使用一个简单的demo来说明heap和base的使用,依然使用demo2进行展示。

文件:golang_step_by_step/pprof/heap/demo2.go

// 展示内存增长和pprof,并不是泄露
package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
    "os"
    "time"
)

// 运行一段时间:fatal error: runtime: out of memory
func main() {
    // 开启pprof
    go func() {
        ip := "0.0.0.0:6060"
        if err := http.ListenAndServe(ip, nil); err != nil {
            fmt.Printf("start pprof failed on %s\n", ip)
            os.Exit(1)
        }
    }()

    tick := time.Tick(time.Second / 100)
    var buf []byte
    for range tick {
        buf = append(buf, make([]byte, 1024*1024)...)
    }
}

将上面代码运行起来,执行以下命令获取profile文件,Ctrl-D退出,1分钟后再获取1次。

go tool pprof http://localhost:6060/debug/pprof/heap

我已经获取到了两个profile文件:

$ ls
pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz
pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.002.pb.gz

使用base把001文件作为基准,然后用002和001对比,先执行toptop的对比,然后执行list main列出main函数的内存对比,结果如下:

$ go tool pprof -base pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz pprof.demo2.alloc_objects.alloc_space.inuse_objects.inuse_space.002.pb.gz

File: demo2
Type: inuse_space
Time: May 14, 2019 at 2:33pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof)
(pprof) top
Showing nodes accounting for 970.34MB, 32.30% of 3003.99MB total
      flat  flat%   sum%        cum   cum%
  970.34MB 32.30% 32.30%   970.34MB 32.30%  main.main   // 看这
         0     0% 32.30%   970.34MB 32.30%  runtime.main
(pprof)
(pprof)
(pprof) list main.main
Total: 2.93GB
ROUTINE ======================== main.main in /home/ubuntu/heap/demo2.go
  970.34MB   970.34MB (flat, cum) 32.30% of Total
         .          .     20:    }()
         .          .     21:
         .          .     22:    tick := time.Tick(time.Second / 100)
         .          .     23:    var buf []byte
         .          .     24:    for range tick {
  970.34MB   970.34MB     25:        buf = append(buf, make([]byte, 1024*1024)...) // 看这
         .          .     26:    }
         .          .     27:}
         .          .     28:

top列出了main.mainruntime.mainmain.main就是我们编写的main函数,runtime.main是runtime包中的main函数,也就是所有main函数的入口,这里不多介绍了,有兴趣可以看之前的调度器文章《Go调度器系列(2)宏观看调度器》

top显示main.main 第2次内存占用,比第1次内存占用多了970.34MB。

list main.main告诉了我们增长的内存都在这一行:

buf = append(buf, make([]byte, 1024*1024)...)

001和002 profile的文件不进去看了,你本地测试下计算差值,绝对是刚才对比出的970.34MB。

heap“不能”定位内存泄露

heap能显示内存的分配情况,以及哪行代码占用了多少内存,我们能轻易的找到占用内存最多的地方,如果这个地方的数值还在不断怎大,基本可以认定这里就是内存泄露的位置。

曾想按图索骥,从内存泄露的位置,根据调用栈向上查找,总能找到内存泄露的原因,这种方案看起来是不错的,但实施起来却找不到内存泄露的原因,结果是事半功倍。

原因在于一个Go程序,其中有大量的goroutine,这其中的调用关系也许有点复杂,也许内存泄露是在某个三方包里。举个栗子,比如下面这幅图,每个椭圆代表1个goroutine,其中的数字为编号,箭头代表调用关系。heap profile显示g111(最下方标红节点)这个协程的代码出现了泄露,任何一个从g101到g111的调用路径都可能造成了g111的内存泄露,有2类可能:

  1. 该goroutine只调用了少数几次,但消耗了大量的内存,说明每个goroutine调用都消耗了不少内存,内存泄露的原因基本就在该协程内部
  2. 该goroutine的调用次数非常多,虽然每个协程调用过程中消耗的内存不多,但该调用路径上,协程数量巨大,造成消耗大量的内存,并且这些goroutine由于某种原因无法退出,占用的内存不会释放,内存泄露的原因在到g111调用路径上某段代码实现有问题,造成创建了大量的g111

第2种情况,就是goroutine泄露,这是通过heap无法发现的,所以heap在定位内存泄露这件事上,发挥的作用不大

image-20190512144150064


goroutine泄露怎么导致内存泄露

什么是goroutine泄露

如果你启动了1个goroutine,但并没有符合预期的退出,直到程序结束,此goroutine才退出,这种情况就是goroutine泄露。

提前思考:什么会导致goroutine无法退出/阻塞?

goroutine泄露怎么导致内存泄露

每个goroutine占用2KB内存,泄露1百万goroutine至少泄露2KB * 1000000 = 2GB内存,为什么说至少呢?

goroutine执行过程中还存在一些变量,如果这些变量指向堆内存中的内存,GC会认为这些内存仍在使用,不会对其进行回收,这些内存谁都无法使用,造成了内存泄露。

所以goroutine泄露有2种方式造成内存泄露:

  1. goroutine本身的栈所占用的空间造成内存泄露。
  2. goroutine中的变量所占用的堆内存导致堆内存泄露,这一部分是能通过heap profile体现出来的。

Dave在文章中也提到了,如果不知道何时停止一个goroutine,这个goroutine就是潜在的内存泄露:

7.1.1 Know when to stop a goroutine

If you don’t know the answer, that’s a potential memory leak as the goroutine will pin its stack’s memory on the heap, as well as any heap allocated variables reachable from the stack.

怎么确定是goroutine泄露引发的内存泄露

掌握了前面的pprof命令行的基本用法,很快就可以确认是否是goroutine泄露导致内存泄露,如果你不记得了,马上回去看一下go pprof基本知识

判断依据:在节点正常运行的情况下,隔一段时间获取goroutine的数量,如果后面获取的那次,某些goroutine比前一次多,如果多获取几次,是持续增长的,就极有可能是goroutine泄露

goroutine导致内存泄露的demo:

文件:golang_step_by_step/pprof/goroutine/leak_demo1.go

// goroutine泄露导致内存泄露
package main

import (
    "fmt"
    "net/http"
    _ "net/http/pprof"
    "os"
    "time"
)

func main() {
    // 开启pprof
    go func() {
        ip := "0.0.0.0:6060"
        if err := http.ListenAndServe(ip, nil); err != nil {
            fmt.Printf("start pprof failed on %s\n", ip)
            os.Exit(1)
        }
    }()

    outCh := make(chan int)
    // 死代码,永不读取
    go func() {
        if false {
            <-outCh
        }
        select {}
    }()

    // 每s起100个goroutine,goroutine会阻塞,不释放内存
    tick := time.Tick(time.Second / 100)
    i := 0
    for range tick {
        i++
        fmt.Println(i)
        alloc1(outCh)
    }
}

func alloc1(outCh chan<- int) {
    go alloc2(outCh)
}

func alloc2(outCh chan<- int) {
    func() {
        defer fmt.Println("alloc-fm exit")
        // 分配内存,假用一下
        buf := make([]byte, 1024*1024*10)
        _ = len(buf)
        fmt.Println("alloc done")

        outCh <- 0 // 53行
    }()
}

编译并运行以上代码,然后使用go tool pprof获取gorourine的profile文件。

go tool pprof http://localhost:6060/debug/pprof/goroutine

已经通过pprof命令获取了2个goroutine的profile文件:

$ ls
/home/ubuntu/pprof/pprof.leak_demo.goroutine.001.pb.gz
/home/ubuntu/pprof/pprof.leak_demo.goroutine.002.pb.gz

同heap一样,我们可以使用base对比2个goroutine profile文件:

$go tool pprof -base pprof.leak_demo.goroutine.001.pb.gz pprof.leak_demo.goroutine.002.pb.gz

File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof) top
Showing nodes accounting for 20312, 100% of 20312 total
      flat  flat%   sum%        cum   cum%
     20312   100%   100%      20312   100%  runtime.gopark
         0     0%   100%      20312   100%  main.alloc2
         0     0%   100%      20312   100%  main.alloc2.func1
         0     0%   100%      20312   100%  runtime.chansend
         0     0%   100%      20312   100%  runtime.chansend1
         0     0%   100%      20312   100%  runtime.goparkunlock
(pprof)

可以看到运行到runtime.gopark的goroutine数量增加了20312个。再通过002文件,看一眼执行到gopark的goroutine数量,即挂起的goroutine数量:

go tool pprof pprof.leak_demo.goroutine.002.pb.gz
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:47pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 24330, 100% of 24331 total
Dropped 32 nodes (cum <= 121)
      flat  flat%   sum%        cum   cum%
     24330   100%   100%      24330   100%  runtime.gopark
         0     0%   100%      24326   100%  main.alloc2
         0     0%   100%      24326   100%  main.alloc2.func1
         0     0%   100%      24326   100%  runtime.chansend
         0     0%   100%      24326   100%  runtime.chansend1
         0     0%   100%      24327   100%  runtime.goparkunlock

显示有24330个goroutine被挂起,这不是goroutine泄露这是啥?已经能确定八九成goroutine泄露了。

是什么导致如此多的goroutine被挂起而无法退出?接下来就看怎么定位goroutine泄露。


定位goroutine泄露的2种方法

使用pprof有2种方式,一种是web网页,一种是go tool pprof命令行交互,这两种方法查看goroutine都支持,但有轻微不同,也有各自的优缺点。

我们先看Web的方式,再看命令行交互的方式,这两种都很好使用,结合起来用也不错。

Web可视化查看

Web方式适合web服务器的端口能访问的情况,使用起来方便,有2种方式:

  1. 查看某条调用路径上,当前阻塞在此goroutine的数量
  2. 查看所有goroutine的运行栈(调用路径),可以显示阻塞在此的时间

方式一

url请求中设置debug=1:

http://ip:port/debug/pprof/goroutine?debug=1

效果如下:

看起来密密麻麻的,其实简单又十分有用,看上图标出来的部分,手机上图看起来可能不方便,那就放大图片,或直接看下面各字段的含义:

  1. goroutine profile: total 32023:32023是goroutine的总数量
  2. 32015 @ 0x42e15a 0x42e20e 0x40534b 0x4050e5 ...:32015代表当前有32015个goroutine运行这个调用栈,并且停在相同位置,@后面的十六进制,现在用不到这个数据,所以暂不深究了。
  3. 下面是当前goroutine的调用栈,列出了函数和所在文件的行数,这个行数对定位很有帮助,如下:
32015 @ 0x42e15a 0x42e20e 0x40534b 0x4050e5 0x6d8559 0x6d831b 0x45abe1
#    0x6d8558    main.alloc2.func1+0xf8    /home/ubuntu/heap/leak_demo.go:53
#    0x6d831a    main.alloc2+0x2a    /home/ubuntu/heap/leak_demo.go:54

根据上面的提示,就能判断32015个goroutine运行到leak_demo.go的53行:

func alloc2(outCh chan<- int) {
    func() {
        defer fmt.Println("alloc-fm exit")
        // 分配内存,假用一下
        buf := make([]byte, 1024*1024*10)
        _ = len(buf)
        fmt.Println("alloc done")

        outCh <- 0 // 53行
    }()
}

阻塞的原因是outCh这个写操作无法完成,outCh是无缓冲的通道,并且由于以下代码是死代码,所以goroutine始终没有从outCh读数据,造成outCh阻塞,进而造成无数个alloc2的goroutine阻塞,形成内存泄露:

if false {
    <-outCh
}

方式二

url请求中设置debug=2:

http://ip:port/debug/pprof/goroutine?debug=2

第2种方式和第1种方式是互补的,它可以看到每个goroutine的信息:

  1. goroutine 20 [chan send, 2 minutes]:20是goroutine id,[]中是当前goroutine的状态,阻塞在写channel,并且阻塞了2分钟,长时间运行的系统,你能看到阻塞时间更长的情况。
  2. 同时,也可以看到调用栈,看当前执行停到哪了:leak_demo.go的53行,
goroutine 20 [chan send, 2 minutes]:
main.alloc2.func1(0xc42015e060)
    /home/ubuntu/heap/leak_demo.go:53 +0xf9  // 这
main.alloc2(0xc42015e060)
    /home/ubuntu/heap/leak_demo.go:54 +0x2b
created by main.alloc1
    /home/ubuntu/heap/leak_demo.go:42 +0x3f

命令行交互式方法

Web的方法是简单粗暴,无需登录服务器,浏览器打开看看就行了。但就像前面提的,没有浏览器可访问时,命令行交互式才是最佳的方式,并且也是手到擒来,感觉比Web一样方便。

命令行交互式只有1种获取goroutine profile的方法,不像Web网页分debug=1debug=22中方式,并将profile文件保存到本地:

// 注意命令没有`debug=1`,debug=1,加debug有些版本的go不支持
$ go tool pprof http://0.0.0.0:6060/debug/pprof/goroutine
Fetching profile over HTTP from http://localhost:6061/debug/pprof/goroutine
Saved profile in /home/ubuntu/pprof/pprof.leak_demo.goroutine.001.pb.gz  // profile文件保存位置
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)

命令行只需要掌握3个命令就好了,上面介绍过了,详细的倒回去看top, list, traces

  1. top:显示正运行到某个函数goroutine的数量
  2. traces:显示所有goroutine的调用栈
  3. list:列出代码详细的信息。

我们依然使用leak_demo.go这个demo,

$  go tool pprof -base pprof.leak_demo.goroutine.001.pb.gz pprof.leak_demo.goroutine.002.pb.gz
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)
(pprof)
(pprof) top
Showing nodes accounting for 20312, 100% of 20312 total
      flat  flat%   sum%        cum   cum%
     20312   100%   100%      20312   100%  runtime.gopark
         0     0%   100%      20312   100%  main.alloc2
         0     0%   100%      20312   100%  main.alloc2.func1
         0     0%   100%      20312   100%  runtime.chansend
         0     0%   100%      20312   100%  runtime.chansend1
         0     0%   100%      20312   100%  runtime.goparkunlock
(pprof)
(pprof) traces
File: leak_demo
Type: goroutine
Time: May 16, 2019 at 2:44pm (CST)
-----------+-------------------------------------------------------
     20312   runtime.gopark
             runtime.goparkunlock
             runtime.chansend
             runtime.chansend1 // channel发送
             main.alloc2.func1 // alloc2中的匿名函数
             main.alloc2
-----------+-------------------------------------------------------

top命令在怎么确定是goroutine泄露引发的内存泄露介绍过了,直接看traces命令,traces能列出002中比001中多的那些goroutine的调用栈,这里只有1个调用栈,有20312个goroutine都执行这个调用路径,可以看到alloc2中的匿名函数alloc2.func1调用了写channel的操作,然后阻塞挂起了goroutine,使用list列出alloc2.func1的代码,显示有20312个goroutine阻塞在53行:

(pprof) list main.alloc2.func1
Total: 20312
ROUTINE ======================== main.alloc2.func1 in /home/ubuntu/heap/leak_demo.go
         0      20312 (flat, cum)   100% of Total
         .          .     48:        // 分配内存,假用一下
         .          .     49:        buf := make([]byte, 1024*1024*10)
         .          .     50:        _ = len(buf)
         .          .     51:        fmt.Println("alloc done")
         .          .     52:
         .      20312     53:        outCh <- 0  // 看这
         .          .     54:    }()
         .          .     55:}
         .          .     56:

友情提醒:使用list命令的前提是程序的源码在当前机器,不然可没法列出源码。服务器上,通常没有源码,那我们咋办呢?刚才介绍了Web查看的方式,那里会列出代码行数,我们可以使用wget下载网页:

$ wget http://localhost:6060/debug/pprof/goroutine?debug=1

下载网页后,使用编辑器打开文件,使用关键字main.alloc2.func1进行搜索,找到与当前相同的调用栈,就可以看到该goroutine阻塞在哪一行了,不要忘记使用debug=2还可以看到阻塞了多久和原因,Web方式中已经介绍了,此处省略代码几十行。


总结

文章略长,但全是干货,感谢阅读到这。然读到着了,跟定很想掌握pprof,建议实践一把,现在和大家温习一把本文的主要内容。

goroutine泄露的本质

goroutine泄露的本质是channel阻塞,无法继续向下执行,导致此goroutine关联的内存都无法释放,进一步造成内存泄露。

goroutine泄露的发现和定位

利用好go pprof获取goroutine profile文件,然后利用3个命令top、traces、list定位内存泄露的原因。

goroutine泄露的场景

泄露的场景不仅限于以下两类,但因channel相关的泄露是最多的。

  1. channel的读或者写:

    1. 无缓冲channel的阻塞通常是写操作因为没有读而阻塞
    2. 有缓冲的channel因为缓冲区满了,写操作阻塞
    3. 期待从channel读数据,结果没有goroutine写
  2. select操作,select里也是channel操作,如果所有case上的操作阻塞,goroutine也无法继续执行。

编码goroutine泄露的建议

为避免goroutine泄露造成内存泄露,启动goroutine前要思考清楚:

  1. goroutine如何退出?
  2. 是否会有阻塞造成无法退出?如果有,那么这个路径是否会创建大量的goroutine?

示例源码

本文所有示例源码,及历史文章、代码都存储在Github,阅读原文可直接跳转,Github:https://github.com/Shitaibin/golang_step_by_step/tree/master/pprof

推荐阅读

这些既是参考资料也是推荐阅读的文章,不容错过。

【Go Blog关于pprof详细介绍和Demo】 https://blog.golang.org/profi...

【Dave关于高性能Go程序的workshop】 https://dave.cheney.net/high-...

【煎鱼pprof文章,很适合入门 Golang大杀器之性能剖析PProf】 https://segmentfault.com/a/11...

【SO上goroutine调用栈各字段的介绍】https://stackoverflow.com/a/3...

【我的老文,有runtime.main的介绍,想学习调度器,可以看下系列文章 Go调度器系列(2)宏观看调度器】http://lessisbetter.site/2019...

  1. 如果这篇文章对你有帮助,不妨关注下我的Github,有文章会收到通知。
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/05/18/go-goroutine-leak/

查看原文

赞 80 收藏 53 评论 9

xddd 收藏了文章 · 2019-08-24

聊一聊,Golang “相对”路径问题

原文地址:聊一聊,Golang “相对”路径问题

前言

Golang 中存在各种运行方式,如何正确的引用文件路径成为一个值得商议的问题

gin-blog 为例,当我们在项目根目录下,执行 go run main.go 时能够正常运行(go build也是正常的)

[$ gin-blog]# go run main.go
[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:    export GIN_MODE=release
 - using code:    gin.SetMode(gin.ReleaseMode)

[GIN-debug] GET    /api/v1/tags              --> gin-blog/routers/api/v1.GetTags (3 handlers)
...

那么在不同的目录层级下,不同的方式运行,又是怎么样的呢,带着我们的疑问去学习

问题

1、 go run
我们上移目录层级,到 $GOPATH/src 下,执行 go run gin-blog/main.go

[$ src]# go run gin-blog/main.go
2018/03/12 16:06:13 Fail to parse 'conf/app.ini': open conf/app.ini: no such file or directory
exit status 1

2、 go build,执行 ./gin-blog/main

[$ src]# ./gin-blog/main
2018/03/12 16:49:35 Fail to parse 'conf/app.ini': open conf/app.ini: no such file or directory

这时候你要打一个大大的问号,就是我的程序读取到什么地方去了


我们通过分析得知,Golang的相对路径是相对于执行命令时的目录;自然也就读取不到了

思考

既然已经知道问题的所在点,我们就可以寻思做点什么 : )

我们想到相对路径是相对执行命令的目录,那么我们获取可执行文件的地址,拼接起来不就好了吗?

实践

我们编写获取当前可执行文件路径的方法

import (
    "path/filepath"
    "os"
    "os/exec"
    "string"
)

func GetAppPath() string {
    file, _ := exec.LookPath(os.Args[0])
    path, _ := filepath.Abs(file)
    index := strings.LastIndex(path, string(os.PathSeparator))

    return path[:index]
}

将其放到启动代码处查看路径

log.Println(GetAppPath())

我们分别执行以下两个命令,查看输出结果
1、 go run

$ go run main.go
2018/03/12 18:45:40 /tmp/go-build962610262/b001/exe

2、 go build

$ ./main
2018/03/12 18:49:44 $GOPATH/src/gin-blog

剖析

我们聚焦在 go run 的输出结果上,发现它是一个临时文件的地址,这是为什么呢?

go help run中,我们可以看到

Run compiles and runs the main package comprising the named Go source files.
A Go source file is defined to be a file ending in a literal ".go" suffix.

也就是 go run 执行时会将文件放到 /tmp/go-build... 目录下,编译并运行

因此go run main.go出现/tmp/go-build962610262/b001/exe结果也不奇怪了,因为它已经跑到临时目录下去执行可执行文件了


这就已经很清楚了,那么我们想想,会出现哪些问题呢

  • 依赖相对路径的文件,出现路径出错的问题
  • go rungo build 不一样,一个到临时目录下执行,一个可手动在编译后的目录下执行,路径的处理方式会不同
  • 不断go run,不断产生新的临时文件

这其实就是根本原因了,因为 go rungo build 的编译文件执行路径并不同,执行的层级也有可能不一样,自然而然就出现各种读取不到的奇怪问题了

解决方案

一、获取编译后的可执行文件路径

1、 将配置文件的相对路径与GetAppPath()的结果相拼接,可解决go build main.go的可执行文件跨目录执行的问题(如:./src/gin-blog/main

import (
    "path/filepath"
    "os"
    "os/exec"
    "string"
)

func GetAppPath() string {
    file, _ := exec.LookPath(os.Args[0])
    path, _ := filepath.Abs(file)
    index := strings.LastIndex(path, string(os.PathSeparator))

    return path[:index]
}

但是这种方式,对于go run依旧无效,这时候就需要2来补救

2、 通过传递参数指定路径,可解决go run的问题

package main

import (
    "flag"
    "fmt"
)

func main() {
    var appPath string
    flag.StringVar(&appPath, "app-path", "app-path")
    flag.Parse()
    fmt.Printf("App path: %s", appPath)
}

运行

go run main.go --app-path "Your project address"

二、增加os.Getwd()进行多层判断

参见 beego 读取 app.conf 的代码

该写法可兼容 go build 和在项目根目录执行 go run ,但是若跨目录执行 go run 就不行

三、配置全局系统变量

我们可以通过os.Getenv来获取系统全局变量,然后与相对路径进行拼接

1、 设置项目工作区

简单来说,就是设置项目(应用)的工作路径,然后与配置文件、日志文件等相对路径进行拼接,达到相对的绝对路径来保证路径一致

参见 gogs 读取GOGS_WORK_DIR进行拼接的代码

2、 利用系统自带变量

简单来说就是通过系统自带的全局变量,例如$HOME等,将配置文件存放在$HOME/conf/etc/conf

这样子就能更加固定的存放配置文件,不需要额外去设置一个环境变量

(这点今早与一位SFer讨论了一波,感谢)

拓展

go test 在一些场景下也会遇到路径问题,因为go test只能够在当前目录执行,所以在执行测试用例的时候,你的执行目录已经是测试目录了

需要注意的是,如果采用获取外部参数的办法,用 os.args 时,go test -argsgo rungo build 会有命令行参数位置的不一致问题

小结

这三种解决方案,在目前可见的开源项目或介绍中都能找到这些的身影

优缺点也是显而易见的,我认为应在不同项目选定合适的解决方案即可

建议大家不要强依赖读取配置文件的模块,应当将其“堆积木”化,需要什么配置才去注册什么配置变量,可以解决一部分的问题

大家又有什么想法呢,在SF 这里 讨论一波?

查看原文

xddd 收藏了文章 · 2019-08-22

树莓派学习手记——制作一个空调遥控器(红外接收、发射的实现)

使用树莓派搭配红外管,进行接收、发射红外信号是很方便的,同时红外信号也有很广泛的用途。这次我们将总结使用树莓派制作一个空调红外遥控器的过程。

准备工具

  • 红外接收管(参考型号HS0038B)
  • 红外发射管(参考型号TSAL6200)
  • 遥控器(或能使用万能遥控器的手机)
  • 用作开关的三极管、限流电阻(非必须、参考型号S9013)

使用开关三极管可以有效增强红外发射管的性能,但不是必须的。不使用三极管也能在三五米范围内成功遥控空调。这些材料总共费用不超过1块钱,反而是快递费比较贵了。

看到遥控器、接收管、发射管,相信已经有人明白了制作遥控器的原理。是的,我们只需要事先把遥控器发射出的红外信号记录下来,然后通过树莓派依样画葫芦地把这个信号发射出去,一个“克隆”版的遥控器就做好了。

硬件连接

*注意:两个GPIO引脚是固定的,与后续安装的软件有关。

接收管信号输出脚 OUT → GPIO18

发射管正极(不使用开关三极管的情况下) → GPIO17

如果你手头上没有开关三极管,直接将红外发射管正极接在GPIO17,如下所示:

如果接入三极管,用GPIO17连接基极,控制发射极和集电极的通断:

(偷懒了没有接入限流电阻,在意的同学自行接入)

安装lirc

解决方案来自:LIRC: Linux Infrared Remote Control for Raspberry Pi
sudo apt update
sudo apt install lirc

修改CONFIG.TXT

修改文件 /boot/config.txt

sudo nano /boot/config.txt

找到 infrared module 的部分,去除注释,17改18,18改17

# Uncomment this to enable infrared communication.
dtoverlay=gpio-ir,gpio_pin=18
dtoverlay=gpio-ir-tx,gpio_pin=17

!!!注意:config.txt的配置内容,似乎根据不同Linux内核版本有微妙的变化,手头上暂时没有其他平台可以测试。如果后续测试时出问题,请Google关键词“lirc lirc-rpi gpio-ir”查阅相关资料。

修改驱动配置

修改文件 /etc/lirc/lirc_options.conf

sudo nano /etc/lirc/lirc_options.conf
# 把:
driver = devinput
device = auto

# 修改为:
driver = default
device = /dev/lirc1

最后,重启树莓派。

简单测试是否正常

# 必须停止lircd服务才能进入接收红外信号模式
sudo service lircd stop
mode2 -d /dev/lirc1

运行上述命令后,用遥控器对着接收管随便按一些按钮,如果出现形式如下的输出就表示正常:

space 16777215
pulse 8999
space 4457
pulse 680
space 1627
......

录入红外信号

解决方案来自:How to Control Your Air Conditioner with Raspberry Pi Board and ANAVI Infrared pHAT

lirc有一个自动录入红外信号、生成遥控器文件的功能。但此方法只适用于简单设备,比如风扇,这里就不记录过程了。有需要的直接运行 irrecord -d /dev/lirc0 --disable-namespace ,按提示做完后把生成的文件放到 /etc/lirc/lircd.conf.d/ 目录就行了。

这边就主要针对空调这种复杂设备,记录录入红外信号的过程。

另外,简单了解一下红外NEC协议可以帮助你理解配置的过程。


为什么无法直接录制复杂设备的红外控制信号?

因为空调遥控器每次发送的信号不是单纯的一个"byte",与其说它是“控制信号”,不如说是一个“状态”、“情景”。后文还会有实例帮助你理解。

生成遥控器配置文件的样板

空调这类复杂设备的遥控器配置文件,是需要自己手动输入的。但不可能整个文件都自己写——我们连格式都不知道。

所以我们需要用刚才提到的自动录入功能生成一个样板,但请记住,这个样板中记录的信号极可能是不正确的!我们只是通过它来了解配置内容的格式。

开始自动录制:

# 请cd到有读写权限的目录下,因为需要创建一个遥控器配置文件
# 参数-f --force 表示 Force raw mode
irrecord -f -d /dev/lirc1 --disable-namespace

认真阅读提示信息,根据提示按Enter、输入 遥控器名称 、按Enter、按照要求随机按遥控器、输入 按钮名称 、按对应的遥控器按钮。由于只是为了生成样板,所以录制一个按钮就够了。完成录制后,当前目录下会生成一个遥控器配置文件 遥控器名称.lircd.conf

如果发现录制过程十分缓慢,最后提示“未发现gap”之类的信息,请尝试跳过自动生成这一步,复制下面的配置文件当做生成的配置,直接进入下一步。(我在录制一些老式空调的命令时遇到了这种问题,只能这样解决,如果你有什么想法恳请提出)

我在录制时输入的 遥控器名称 是aircon,录制的一个按钮是on,所以配置文件的内容形式如下:

begin remote

  name  aircon
  flags RAW_CODES
  eps            30
  aeps          100

  gap          19991

      begin raw_codes

          name on
             9042    4438     700    1602     705     526
              678     528     681     531     674     527
              679     528     679     528     677     527
              677     528     679     528     678     528
              677    1632     676     529     676     531
              676     531     649     556     672     532
              650     558     654     552     652     553
              649     558     648    1661     650     558
              648     558     648    1661     649     562
              644     558     647     558     648    1657
              651     558     647    1659     650     557
              653     553     648    1660     648     557
              649

      end raw_codes

end remote

如果你阅读了红外NEC协议,就能马上意识到,这一串数字其实就是红外信号脉冲(pulse)、空白(space)的持续时间。

手动编辑遥控器配置文件

打开刚才生成的样板文件 遥控器名称.lircd.conf ,很容易发现 begin raw_codesend raw_codes 之间的内容就是需要我们手动修改的内容。刚才也提到过,样板中记录的信号极可能是不正确的,所以我们先把自动生成的 on 按钮下方的信号数据删除掉。

还记得刚才测试时使用的mode2命令吗。我们现在需要做的就是使用mode2命令接收遥控器发出的信号,然后将其加入到文件 遥控器名称.lircd.conf 中。首先,我们来录入正确on 按钮的信号数据:

# -m --mode 使用行列显示模式,不显示pulse、space
mode2 -m -d /dev/lirc1

按下遥控器上的“开”按钮,得到形式如下的输出:

 16777215

     9059     4432      706     1604      706      528
      679      524      681     1603      703      526
      680     1602      715     1596      704      526
      679      527      679      527      680      527
      679     1604      705      530      673      530
      674      529      682      529      675      530
      674      532      674      532      650      557
      648      556      654     1653      676      533
      649      559      647     1667      639      559
      648      558      656      553      647     1658
      648      558      650     1659      649      559
      647      559      648     1659      648      558
      646    19991

      648      558      648      558      650      567
      638      557      648     1668      640      557
      649      558      650      558      646     1660
      650      556      649      557      649      559
      654      552      648     1657      651      558
      647      554      660      549      649      559
      647      557      649      559      648      559
      647      557      644      561      648      559
      648      556      647      560      648      556
      652      563      642     1658      648     1661
      649     1660      646     1658      650

除去第一行很大的那个数,把其他数据全部复制,粘贴到配置文件的 name on 下方。例如现在我必须删除“16777215”这个数,剩下的内容粘贴到配置文件的 name on 下方。

重复上述操作,增加更多的按钮,例如 name offname 26C 等。最后我录制了3个按钮,配置文件编辑成了这样:

begin remote

  name  aircon
  flags RAW_CODES
  eps            30
  aeps          100

  gap          19991

      begin raw_codes

          name on
             9059     4432      706     1604      706      528
              679      524      681     1603      703      526
              680     1602      715     1596      704      526
              679      527      679      527      680      527
              679     1604      705      530      673      530
              674      529      682      529      675      530
              674      532      674      532      650      557
              648      556      654     1653      676      533
              649      559      647     1667      639      559
              648      558      656      553      647     1658
              648      558      650     1659      649      559
              647      559      648     1659      648      558
              646    19991

              648      558      648      558      650      567
              638      557      648     1668      640      557
              649      558      650      558      646     1660
              650      556      649      557      649      559
              654      552      648     1657      651      558
              647      554      660      549      649      559
              647      557      649      559      648      559
              647      557      644      561      648      559
              648      556      647      560      648      556
              652      563      642     1658      648     1661
              649     1660      646     1658      650
          
          name off
             9029     4432      715     1594      706      526
              682      523      681      525      680      526
              681     1601      708     1607      699      524
              688      519      682      526      678      527
              681     1601      708      524      687      520
              682      525      677      527      677      529
              675      531      676      531      674      532
              651      558      646     1659      650      557
              648      557      650     1659      653      554
              650      559      647      558      649     1657
              649      558      648     1661      648      557
              646      562      645     1666      643      558
              649    19992

              651      555      650      558      648      562
              645      557      648     1661      653      552
              646      560      650      557      648     1657
              649      561      647      557      647      558
              650      556      650     1659      649      559
              647      557      649      558      648      559
              647      557      651      564      642      559
              646      557      649      557      657      552
              647      557      648      558      650      557
              645      560      653     1653      646     1661
              650     1659      648      558      647

          name 26C
             9026     4430      705     1604      706      528
              679      535      670     1604      705      527
              675      532      679     1607      702      530
              673      531      683     1625      672      535
              672     1633      676      530      673      534
              649      558      648      563      642      556
              651      556      650      558      672      532
              649      556      652     1659      648      558
              656      551      646     1659      650      558
              648      558      648      558      649     1658
              649      561      648     1659      647      559
              650      556      648     1660      646      559
              647    19990

              648     1659      649      558      648      558
              647      558      650     1658      650      557
              650      555      650      558      648      558
              649      555      652      561      667      534
              648      559      648     1658      656      550
              650      557      672      533      649      555
              650      559      649      558      647      559
              648      558      648      566      641      558
              647      558      648      558      650      558
              648      558      648     1660      646      558
              648      558      646      562      647

      end raw_codes

end remote

是的,如果你想要实现完整的控制,你就需要把所有按钮都录制一遍。如果你对配置文件中开头的eps、aeps等参数感兴趣,或者最后遥控不太正常,阅读lircd.conf manual或许能帮到你。我使用的是默认的数值,一切工作正常。

(2020 年 12 月 1 日修改)
这里插入一步,由于软硬件更新,较新版本的树莓派内核和 lircd 软件,需要修改这个配置文件:

sudo nano /etc/lirc/lirc_options.conf
# device          = /dev/lirc1 修改为
device          = /dev/lirc0

最后,把配置文件复制到指定目录 /etc/lirc/lircd.conf/ 并重启lircd服务:

sudo cp aircon.lircd.conf /etc/lirc/lircd.conf.d/
sudo service lircd restart

*后续步骤出现问题的同学可以使用service lircd status查看服务启动的log,帮助定位bug。

发射信号

终于,我们可以尝试着使用树莓派控制空调了。如果你没有使用开关三极管,你可能需要把树莓派拿到靠近空调的地方,并且把红外发射管对准空调。如果你使用了三极管,那么注意树莓派和空调之间不要有明显的物体阻隔即可。

# 发射命令:irsend SEND_ONCE 遥控器名称 按钮名称
irsend SEND_ONCE aircon on

如果前面的步骤一切正常,但在发射信号时报错“transmission failed”。请检查生成的遥控器配置文件,查看flags项,若是flags RAW_CODES|CONST_LENGTH,请尝试将其修改成flags RAW_CODES并重启lircd服务。再测试能否发射信号。

按钮?不如说是情景

最后,我们来讨论一个比较有意思的东西。

考虑一下这种情况:我为了录入 + 按钮,运行mode2命令开始录制。在遥控器显示温度23℃时按 + ,然后按照前面的方法编辑配置文件,写入了按钮 name add

此时空调屏幕上显示温度是24℃。提问:如果我运行

irsend SEND_ONCE aircon add

空调会:

  1. 温度提升到25℃
  2. “滴”地响一声,然后什么都没发生,保持在24℃

很遗憾,后者发生了。

实际上遥控器每按下一次按钮发送的信息是一个“情景”,我刚才录制的 add 按钮实际上是表示“温度设为24℃、进入制冷模式、风速设为自动...”这样的一个“情景”。如果你在空调温度20℃时运行add命令,那么它就会一次性提升到24℃!

这意味着,如果你想要设置任意温度,你需要把每一度都录制一遍,因为 +- 命令根本就不存在。

当然,这也不全是坏事。

我录制了一个按钮 26C ,功能是将温度调到26℃。然后我意识到, 26C 这个按钮同时包含了开关状态的信息。是的!在空调关闭的情况下,如果我直接发送命令:

irsend SEND_ONCE aircon 26C

那么空调会打开,并且调整到26℃!

于是,我录制了一个按钮 Sleep ,它将空调设置为“26℃、风速设为低、开启扫风、开启静音睡眠模式”。睡前运行一次 irsend SEND_ONCE aircon Sleep ,感觉离智能家居又近了一步 23333 (•̀ω•́)✧。

小结

其实写完这篇总结还是有点慌的,因为不管是树莓派版本、软件版本、红外管型号还是空调的型号,大家都是不一样的,说不准哪一步我这么做放别人那就是错的呢。事实上,我自己在做的过程中参考的一些博客就和我的实际情况有些出入了。只能希望这篇总结能够有一定的参考价值。最后,感谢你阅读文章!

查看原文

xddd 赞了文章 · 2019-08-22

树莓派学习手记——制作一个空调遥控器(红外接收、发射的实现)

使用树莓派搭配红外管,进行接收、发射红外信号是很方便的,同时红外信号也有很广泛的用途。这次我们将总结使用树莓派制作一个空调红外遥控器的过程。

准备工具

  • 红外接收管(参考型号HS0038B)
  • 红外发射管(参考型号TSAL6200)
  • 遥控器(或能使用万能遥控器的手机)
  • 用作开关的三极管、限流电阻(非必须、参考型号S9013)

使用开关三极管可以有效增强红外发射管的性能,但不是必须的。不使用三极管也能在三五米范围内成功遥控空调。这些材料总共费用不超过1块钱,反而是快递费比较贵了。

看到遥控器、接收管、发射管,相信已经有人明白了制作遥控器的原理。是的,我们只需要事先把遥控器发射出的红外信号记录下来,然后通过树莓派依样画葫芦地把这个信号发射出去,一个“克隆”版的遥控器就做好了。

硬件连接

*注意:两个GPIO引脚是固定的,与后续安装的软件有关。

接收管信号输出脚 OUT → GPIO18

发射管正极(不使用开关三极管的情况下) → GPIO17

如果你手头上没有开关三极管,直接将红外发射管正极接在GPIO17,如下所示:

如果接入三极管,用GPIO17连接基极,控制发射极和集电极的通断:

(偷懒了没有接入限流电阻,在意的同学自行接入)

安装lirc

解决方案来自:LIRC: Linux Infrared Remote Control for Raspberry Pi
sudo apt update
sudo apt install lirc

修改CONFIG.TXT

修改文件 /boot/config.txt

sudo nano /boot/config.txt

找到 infrared module 的部分,去除注释,17改18,18改17

# Uncomment this to enable infrared communication.
dtoverlay=gpio-ir,gpio_pin=18
dtoverlay=gpio-ir-tx,gpio_pin=17

!!!注意:config.txt的配置内容,似乎根据不同Linux内核版本有微妙的变化,手头上暂时没有其他平台可以测试。如果后续测试时出问题,请Google关键词“lirc lirc-rpi gpio-ir”查阅相关资料。

修改驱动配置

修改文件 /etc/lirc/lirc_options.conf

sudo nano /etc/lirc/lirc_options.conf
# 把:
driver = devinput
device = auto

# 修改为:
driver = default
device = /dev/lirc1

最后,重启树莓派。

简单测试是否正常

# 必须停止lircd服务才能进入接收红外信号模式
sudo service lircd stop
mode2 -d /dev/lirc1

运行上述命令后,用遥控器对着接收管随便按一些按钮,如果出现形式如下的输出就表示正常:

space 16777215
pulse 8999
space 4457
pulse 680
space 1627
......

录入红外信号

解决方案来自:How to Control Your Air Conditioner with Raspberry Pi Board and ANAVI Infrared pHAT

lirc有一个自动录入红外信号、生成遥控器文件的功能。但此方法只适用于简单设备,比如风扇,这里就不记录过程了。有需要的直接运行 irrecord -d /dev/lirc0 --disable-namespace ,按提示做完后把生成的文件放到 /etc/lirc/lircd.conf.d/ 目录就行了。

这边就主要针对空调这种复杂设备,记录录入红外信号的过程。

另外,简单了解一下红外NEC协议可以帮助你理解配置的过程。


为什么无法直接录制复杂设备的红外控制信号?

因为空调遥控器每次发送的信号不是单纯的一个"byte",与其说它是“控制信号”,不如说是一个“状态”、“情景”。后文还会有实例帮助你理解。

生成遥控器配置文件的样板

空调这类复杂设备的遥控器配置文件,是需要自己手动输入的。但不可能整个文件都自己写——我们连格式都不知道。

所以我们需要用刚才提到的自动录入功能生成一个样板,但请记住,这个样板中记录的信号极可能是不正确的!我们只是通过它来了解配置内容的格式。

开始自动录制:

# 请cd到有读写权限的目录下,因为需要创建一个遥控器配置文件
# 参数-f --force 表示 Force raw mode
irrecord -f -d /dev/lirc1 --disable-namespace

认真阅读提示信息,根据提示按Enter、输入 遥控器名称 、按Enter、按照要求随机按遥控器、输入 按钮名称 、按对应的遥控器按钮。由于只是为了生成样板,所以录制一个按钮就够了。完成录制后,当前目录下会生成一个遥控器配置文件 遥控器名称.lircd.conf

如果发现录制过程十分缓慢,最后提示“未发现gap”之类的信息,请尝试跳过自动生成这一步,复制下面的配置文件当做生成的配置,直接进入下一步。(我在录制一些老式空调的命令时遇到了这种问题,只能这样解决,如果你有什么想法恳请提出)

我在录制时输入的 遥控器名称 是aircon,录制的一个按钮是on,所以配置文件的内容形式如下:

begin remote

  name  aircon
  flags RAW_CODES
  eps            30
  aeps          100

  gap          19991

      begin raw_codes

          name on
             9042    4438     700    1602     705     526
              678     528     681     531     674     527
              679     528     679     528     677     527
              677     528     679     528     678     528
              677    1632     676     529     676     531
              676     531     649     556     672     532
              650     558     654     552     652     553
              649     558     648    1661     650     558
              648     558     648    1661     649     562
              644     558     647     558     648    1657
              651     558     647    1659     650     557
              653     553     648    1660     648     557
              649

      end raw_codes

end remote

如果你阅读了红外NEC协议,就能马上意识到,这一串数字其实就是红外信号脉冲(pulse)、空白(space)的持续时间。

手动编辑遥控器配置文件

打开刚才生成的样板文件 遥控器名称.lircd.conf ,很容易发现 begin raw_codesend raw_codes 之间的内容就是需要我们手动修改的内容。刚才也提到过,样板中记录的信号极可能是不正确的,所以我们先把自动生成的 on 按钮下方的信号数据删除掉。

还记得刚才测试时使用的mode2命令吗。我们现在需要做的就是使用mode2命令接收遥控器发出的信号,然后将其加入到文件 遥控器名称.lircd.conf 中。首先,我们来录入正确on 按钮的信号数据:

# -m --mode 使用行列显示模式,不显示pulse、space
mode2 -m -d /dev/lirc1

按下遥控器上的“开”按钮,得到形式如下的输出:

 16777215

     9059     4432      706     1604      706      528
      679      524      681     1603      703      526
      680     1602      715     1596      704      526
      679      527      679      527      680      527
      679     1604      705      530      673      530
      674      529      682      529      675      530
      674      532      674      532      650      557
      648      556      654     1653      676      533
      649      559      647     1667      639      559
      648      558      656      553      647     1658
      648      558      650     1659      649      559
      647      559      648     1659      648      558
      646    19991

      648      558      648      558      650      567
      638      557      648     1668      640      557
      649      558      650      558      646     1660
      650      556      649      557      649      559
      654      552      648     1657      651      558
      647      554      660      549      649      559
      647      557      649      559      648      559
      647      557      644      561      648      559
      648      556      647      560      648      556
      652      563      642     1658      648     1661
      649     1660      646     1658      650

除去第一行很大的那个数,把其他数据全部复制,粘贴到配置文件的 name on 下方。例如现在我必须删除“16777215”这个数,剩下的内容粘贴到配置文件的 name on 下方。

重复上述操作,增加更多的按钮,例如 name offname 26C 等。最后我录制了3个按钮,配置文件编辑成了这样:

begin remote

  name  aircon
  flags RAW_CODES
  eps            30
  aeps          100

  gap          19991

      begin raw_codes

          name on
             9059     4432      706     1604      706      528
              679      524      681     1603      703      526
              680     1602      715     1596      704      526
              679      527      679      527      680      527
              679     1604      705      530      673      530
              674      529      682      529      675      530
              674      532      674      532      650      557
              648      556      654     1653      676      533
              649      559      647     1667      639      559
              648      558      656      553      647     1658
              648      558      650     1659      649      559
              647      559      648     1659      648      558
              646    19991

              648      558      648      558      650      567
              638      557      648     1668      640      557
              649      558      650      558      646     1660
              650      556      649      557      649      559
              654      552      648     1657      651      558
              647      554      660      549      649      559
              647      557      649      559      648      559
              647      557      644      561      648      559
              648      556      647      560      648      556
              652      563      642     1658      648     1661
              649     1660      646     1658      650
          
          name off
             9029     4432      715     1594      706      526
              682      523      681      525      680      526
              681     1601      708     1607      699      524
              688      519      682      526      678      527
              681     1601      708      524      687      520
              682      525      677      527      677      529
              675      531      676      531      674      532
              651      558      646     1659      650      557
              648      557      650     1659      653      554
              650      559      647      558      649     1657
              649      558      648     1661      648      557
              646      562      645     1666      643      558
              649    19992

              651      555      650      558      648      562
              645      557      648     1661      653      552
              646      560      650      557      648     1657
              649      561      647      557      647      558
              650      556      650     1659      649      559
              647      557      649      558      648      559
              647      557      651      564      642      559
              646      557      649      557      657      552
              647      557      648      558      650      557
              645      560      653     1653      646     1661
              650     1659      648      558      647

          name 26C
             9026     4430      705     1604      706      528
              679      535      670     1604      705      527
              675      532      679     1607      702      530
              673      531      683     1625      672      535
              672     1633      676      530      673      534
              649      558      648      563      642      556
              651      556      650      558      672      532
              649      556      652     1659      648      558
              656      551      646     1659      650      558
              648      558      648      558      649     1658
              649      561      648     1659      647      559
              650      556      648     1660      646      559
              647    19990

              648     1659      649      558      648      558
              647      558      650     1658      650      557
              650      555      650      558      648      558
              649      555      652      561      667      534
              648      559      648     1658      656      550
              650      557      672      533      649      555
              650      559      649      558      647      559
              648      558      648      566      641      558
              647      558      648      558      650      558
              648      558      648     1660      646      558
              648      558      646      562      647

      end raw_codes

end remote

是的,如果你想要实现完整的控制,你就需要把所有按钮都录制一遍。如果你对配置文件中开头的eps、aeps等参数感兴趣,或者最后遥控不太正常,阅读lircd.conf manual或许能帮到你。我使用的是默认的数值,一切工作正常。

(2020 年 12 月 1 日修改)
这里插入一步,由于软硬件更新,较新版本的树莓派内核和 lircd 软件,需要修改这个配置文件:

sudo nano /etc/lirc/lirc_options.conf
# device          = /dev/lirc1 修改为
device          = /dev/lirc0

最后,把配置文件复制到指定目录 /etc/lirc/lircd.conf/ 并重启lircd服务:

sudo cp aircon.lircd.conf /etc/lirc/lircd.conf.d/
sudo service lircd restart

*后续步骤出现问题的同学可以使用service lircd status查看服务启动的log,帮助定位bug。

发射信号

终于,我们可以尝试着使用树莓派控制空调了。如果你没有使用开关三极管,你可能需要把树莓派拿到靠近空调的地方,并且把红外发射管对准空调。如果你使用了三极管,那么注意树莓派和空调之间不要有明显的物体阻隔即可。

# 发射命令:irsend SEND_ONCE 遥控器名称 按钮名称
irsend SEND_ONCE aircon on

如果前面的步骤一切正常,但在发射信号时报错“transmission failed”。请检查生成的遥控器配置文件,查看flags项,若是flags RAW_CODES|CONST_LENGTH,请尝试将其修改成flags RAW_CODES并重启lircd服务。再测试能否发射信号。

按钮?不如说是情景

最后,我们来讨论一个比较有意思的东西。

考虑一下这种情况:我为了录入 + 按钮,运行mode2命令开始录制。在遥控器显示温度23℃时按 + ,然后按照前面的方法编辑配置文件,写入了按钮 name add

此时空调屏幕上显示温度是24℃。提问:如果我运行

irsend SEND_ONCE aircon add

空调会:

  1. 温度提升到25℃
  2. “滴”地响一声,然后什么都没发生,保持在24℃

很遗憾,后者发生了。

实际上遥控器每按下一次按钮发送的信息是一个“情景”,我刚才录制的 add 按钮实际上是表示“温度设为24℃、进入制冷模式、风速设为自动...”这样的一个“情景”。如果你在空调温度20℃时运行add命令,那么它就会一次性提升到24℃!

这意味着,如果你想要设置任意温度,你需要把每一度都录制一遍,因为 +- 命令根本就不存在。

当然,这也不全是坏事。

我录制了一个按钮 26C ,功能是将温度调到26℃。然后我意识到, 26C 这个按钮同时包含了开关状态的信息。是的!在空调关闭的情况下,如果我直接发送命令:

irsend SEND_ONCE aircon 26C

那么空调会打开,并且调整到26℃!

于是,我录制了一个按钮 Sleep ,它将空调设置为“26℃、风速设为低、开启扫风、开启静音睡眠模式”。睡前运行一次 irsend SEND_ONCE aircon Sleep ,感觉离智能家居又近了一步 23333 (•̀ω•́)✧。

小结

其实写完这篇总结还是有点慌的,因为不管是树莓派版本、软件版本、红外管型号还是空调的型号,大家都是不一样的,说不准哪一步我这么做放别人那就是错的呢。事实上,我自己在做的过程中参考的一些博客就和我的实际情况有些出入了。只能希望这篇总结能够有一定的参考价值。最后,感谢你阅读文章!

查看原文

赞 16 收藏 7 评论 54

xddd 收藏了文章 · 2019-08-07

Go 译文之如何构建并发 Pipeline

本文首发于我的博客,如果觉得有用,欢迎点赞收藏,让更多的朋友看到。

作者:Sameer Ajmani | 原文:https://blog.golang.org/pipel...

译者前言

这篇文章来自 Go 官网,不愧是官方的博客,写的非常详细。在开始翻译这篇文章前,先简单说明两点。

首先,这篇文章我之前已经翻译过一遍,但最近再读,发现之前的翻译真是有点烂。于是,决定在完全不参考之前译文的情况下,把这篇文章重新翻译一遍。

其二,文章中有一些专有名字,计划还是用英文来表达,以保证原汁原味,比如 pipeline(管道)、stage (阶段)、goroutine (协程)、channel (通道)。

关于它们之间的关系,按自己的理解简单画了张草图,希望能帮助更好地理解它们之间的关系。如下:

强调一点,如果大家在阅读这篇文章时,感到了迷糊,建议可以回头再看一下这张图。

翻译的正文部分如下。


Go 的并发原语使我们非常轻松地就构建出可以高效利用 IO 和多核 CPU 的流式数据 pipeline。这篇文章将会此为基础进行介绍。在这个过程中,我们将会遇到一些异常情况,关于它们的处理方法,文中也会详细介绍。

什么是管道(pipeline)

关于什么是管道, Go 中并没有给出明确的定义,它只是众多并发编程方式中的一种。非正式的解释,我们理解为,它是由一系列通过 chanel 连接起来的 stage 组成,而每个 stage 都是由一组运行着相同函数的 goroutine 组成。每个 stage 的 goroutine 通常会执行如下的一些工作:

  • 从上游的输入 channel 中接收数据;
  • 对接收到的数据进行一些处理,(通常)并产生新的数据;
  • 将数据通过输出 channel 发送给下游;

除了第一个 stage 和最后一个 stage ,每个 stage 都包含一定数量的输入和输出 channel。第一个 stage 只有输出,通常会把它称为 "生产者",最后一个 stage 只有输入,通常我们会把它称为 "消费者"。

我们先来看一个很简单例子,通过它来解释上面提到那些与 pipeline 相关的概念和技术。了解了这些后,我们再看其它的更实际的例子。

计算平方数

一个涉及三个 stage 的 pipeline。

第一个 stage,gen 函数。它负责将把从参数中拿到的一系列整数发送给指定 channel。它启动了一个 goroutine 来发送数据,当数据全部发送结束,channel 会被关闭。

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

第二个 stage,sq 函数。它负责从输入 channel 中接收数据,并会返回一个新的 channel,即输出 channel,它负责将经过平方处理过的数据传输给下游。当输入 channel 关闭,并且所有数据都已发送到下游,就可以关闭这个输出 channel 了。

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

main 函数负责创建管道并执行最后一个 stage 的任务。它将从第二个 stage 接收数据,并将它们打印出来,直到 channel 关闭。

func main() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

既然,sq 的输入和输出的 channel 类型相同,那么我们就可以把它进行组合,从而形成多个 stage。比如,我们可以把 main 函数重写为如下的形式:

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}

扇出和扇入(Fan-out and Fan-in)

当多个函数从一个 channel 中读取数据,直到 channel 关闭,这称为扇出 fan-out。利用它,我们可以实现了一种分布式的工作方式,通过一组 workers 实现并行的 CPU 和 IO。

当一个函数从多个 channel 中读取数据,直到所有 channel 关闭,这称为扇入 fan-in。扇入是通过将多个输入 channel 的数据合并到同一个输出 channel 实现的,当所有的输入 channel 关闭,输出的 channel 也将关闭。

我们来改变一下上面例子中的管道,在它上面运行两个 sq 函数试试。它们将都从同一个输入 channel 中读取数据。我们引入了一个新的函数,merge,负责 fan-in 处理结果,即 merge 两个 sq 的处理结果。

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    // 分布式处理来自 in channel 的数据
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    // 从 channel c1 和 c2 的合并后的 channel 中接收数据
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

merge 函数负责将从一系列输入 channel 中接收的数据合并到一个 channel 中。它为每个输入 channel 都启动了一个 goroutine,并将它们中接收到的值发送到惟一的输出 channel 中。在所有的 goroutines 启动后,还会再另外启动一个 goroutine,它的作用是,当所有的输入 channel 关闭后,负责关闭唯一的输出 channel 。

在已关闭的 channel 发送数据将导致 panic,因此要保证在关闭 channel 前,所有数据都发送完成,是非常重要的。sync.WaitGroup 提供了一种非常简单的方式来完成这样的同步。

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    // 为每个输入 channel 启动一个 goroutine
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    // 启动一个 goroutine 负责在所有的输入 channel 关闭后,关闭这个唯一的输出 channel
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

中途停止

管道中的函数包含一个模式:

  • 当数据发送完成,每个 stage 都应该关闭它们的输入 channel;
  • 只要输入 channel 没有关闭,每个 stage 就要持续从中接收数据;

我们可以通过编写 range loop 来保证所有 goroutine 是在所有数据都已经发送到下游的时候退出。

但在一个真实的场景下,每个 stage 都接收完 channel 中的所有数据,是不可能的。有时,我们的设计是:接收方只需要接收数据的部分子集即可。更常见的,如果 channel 在上游的 stage 出现了错误,那么,当前 stage 就应该提早退出。无论如何,接收方都不该再继续等待接收 channel 中的剩余数据,而且,此时上游应该停止生产数据,毕竟下游已经不需要了。

我们的例子中,即使 stage 没有成功消费完所有的数据,上游 stage 依然会尝试给下游发送数据,这将会导致程序永久阻塞。

    // Consume the first value from the output.
    // 从 output 中接收了第一个数据
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Since we didn't receive the second value from out,
    // one of the output goroutines is hung attempting to send it.
    // 我们并没有从 out channel 中接收第二个数据,
    // 所以上游的其中一个 goroutine 在尝试向下游发送数据时将会被挂起。
}

这是一种资源泄露,goroutine 是需要消耗内存和运行时资源的,goroutine 栈中的堆引用信息也是不会被 gc。

我们需要提供一种措施,即使当下游从上游接收数据时发生异常,上游也能成功退出。一种方式是,把 channel 改为带缓冲的 channel,这样,它就可以承载指定数量的数据,如果 buffer channel 还有空间,数据的发送将会立刻完成。

// 缓冲大小 2 buffer size 2 
c := make(chan int, 2)
// 发送立刻成功 succeeds immediately 
c <- 1
// 发送立刻成功 succeeds immediately
c <- 2 
//blocks until another goroutine does <-c and receives 1
// 阻塞,直到另一个 goroutine 从 c 中接收数据
c <- 3

如果我们在创建 channel 时已经知道将发送的数据量,就可以把前面的代码简化一下。比如,重写 gen 函数,将数据都发送至一个 buffer channel,这还能避免创建新的 goroutine。

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

译者按:channel 关闭后,不可再写入数据,否则会 panic,但是仍可读取已发送数据,而且可以一直读取 0 值。

继续往下游 stage,将又会返回到阻塞的 goroutine 中,我们也可以考虑给 merge 的输出 channel 加点缓冲。

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup

    // enough space for the unread inputs
    // 给未读的输入 channel 预留足够的空间
    out := make(chan int, 1)    
    // ... the rest is unchanged ...

虽然通过这个方法,我们能解决了 goroutine 阻塞的问题,但是这并非一个优秀的设计。比如 merge 中的 buffer 的大小 1 是基于我们已经知道了接下来接收数据的大小,以及下游将能消费的数量。很明显,这种设计非常脆弱,如果上游多发送了一些数据,或下游并没接收那么多的数据,goroutine 将又会被阻塞。

因而,当下游不再准备接收上游的数据时,需要有一种方式,可以通知到上游。

明确的取消

如果 main 函数在没把 out 中所有数据接收完就退出,它必须要通知上游停止继续发送数据。如何做到?我们可以在上下游之间引入一个新的 channel,通常称为 done。

示例中有两个可能阻塞的 goroutine,所以, done 需要发送两个值来通知它们。

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.
    // 通知发送方,我们已经停止接收数据了
    done <- struct{}{}
    done <- struct{}{}
}

发送方 merge 用 select 语句替换了之前的发送操作,它负责通过 out channel 发送数据或者从 done 接收数据。done 接收的值是没有实际意义的,只是表示 out 应该停止继续发送数据了,用空 struct 即可。output 函数将会不停循环,因为上游,即 sq ,并没有阻塞。我们过会再讨论如何退出这个循环。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... the rest is unchanged ...

这种方法有个问题,下游只有知道了上游可能阻塞的 goroutine 数量,才能向每个 goroutine 都发送了一个 done 信号,从而确保它们都能成功退出。但多维护一个 count 是很令人讨厌的,而且很容易出错。

我们需要一种方式,可以告诉上游的所有 goroutine 停止向下游继续发送信息。在 Go 中,其实可通过关闭 channel 实现,因为在一个已关闭的 channel 接收数据会立刻返回,并且会得到一个零值。

这也就意味着,main 仅需通过关闭 done channel,就可以让所有的发送方解除阻塞。关闭操作相当于一个广播信号。为确保任意返回路径下都成功调用,我们可以通过 defer 语句关闭 done。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    // 为每个输入 channel 启动一个 goroutine,将输入 channel 中的数据拷贝到
    // out channel 中,直到输入 channel,即 c,或 done 关闭。
    // 接着,退出循环并执行 wg.Done()
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...

同样地,一旦 done 关闭,sq 也将退出。sq 也是通过 defer 语句来确保自己的输出 channel,即 out,一定被成功关闭释放。

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

都这里,Go 中如何构建一个 pipeline,已经介绍的差不多了。

简单总结下如何正确构建一个 pipeline。

  • 当所有的发送已经完成,stage 应该关闭输出 channel;
  • stage 应该持续从输入 channel 中接收数据,除非 channel 关闭或主动通知到发送方停止发送。

Pipeline 中有量方式可以解除发送方的阻塞,一是发送方创建充足空间的 channel 来发送数据,二是当接收方停止接收数据时,明确通知发送方。

摘要树

一个真实的案例。

MD5,消息摘要算法,可用于文件校验和的计算。下面的输出是命令行工具 md5sum 输出的文件摘要信息。

$ md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们的例子和 md5sum 类似,不同的是,传递给这个程序的参数是一个目录。程序的输出是目录下每个文件的摘要值,输出的顺序按文件名排序。

$ go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

主函数,第一步调用 MD5All,它返回的是一个以文件名为 key,摘要值为 value 的 map,然后对返回结果进行排序和打印。

func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

MD5All 函数将是我们接下来讨论的重点。串行版的实现没有并发,仅仅是从文件中读取数据再计算。

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

并行计算

并行版 中,我们会把 MD5All 的计算拆分开含有两个 stage 的 pipeline。第一个 stage,sumFiles,负责遍历目录和计算文件摘要值,摘要的计算会启动一个 goroutine 来执行,计算结果将通过一个类型 result 的 channel 发出。

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles 返回了 2 个 channel,一个用于接收计算的结果,一个用于接收 filepath.Walk 的 err 返回。walk 会为每个文件启动一个 goroutine 执行摘要计算和检查 done。如果 done 关闭,walk 将立刻停止。

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c.  Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done.  Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait()
            close(c)
        }()
        // No select needed here, since errc is buffered.
        // 不需要使用 select,因为 errc 是带有 buffer 的 channel
        errc <- err
    }()
    return c, errc
}

MD5All 将从 c channel 中接收计算的结果,如果发生错误,将通过 defer 关闭 done。

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

并行限制

并行版本 中,MD5All 为每个文件启动了一个 goroutine。但如果一个目录中文件太多,这可能会导致分配的内存过大以至于超过了当前机器的限制。

我们可以通过限制并行读取的文件数,限制内存分配。在 并发限制版本中,我们创建了固定数量的 goroutine 读取文件。现在,我们的 pipeline 涉及 3 个 stage:遍历目录、文件读取与摘要计算、结果收集。

第一个 stage,遍历目录并通过 paths channel 发出文件。

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

第二个 stage,启动固定数量的 goroutine,从 paths channel 中读取文件名称,处理结果发送到 c channel。

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

和之前的例子不同,digester 将不会关闭 c channel,因为多个 goroutine 共享这个 channel,计算结果都将发给这个 channel 上。

相应地,MD5All 会负责在所有摘要完成后关闭这个 c channel。

    // Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

我们也可以为每个 digester 创建一个单独的 channel,通过自己的 channel 传输结果。但这种方式,我们还要再启动一个新的 goroutine 合并结果。

最后一个 stage,负责从 c 中接收处理结果,通过 errc 检查是否有错误发生。该检查无法提前进行,因为提前执行将会阻塞 walkFile 往下游发送数据。

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

总结

这篇文章介绍了,在 Go 中如何正确地构建流式数据 pipeline。它的异常处理非常复杂,pipeline 中的每个 stage 都可能导致上游阻塞,而下游可能不再关心接下来的数据。关闭 channel 可以给所有运行中的 goroutine 发送 done 信号,这能帮助我们成功解除阻塞。如何正确地构建一条流式数据 pipeline,文中也总结了一些指导建议。

查看原文

认证与成就

  • 获得 147 次点赞
  • 获得 23 枚徽章 获得 1 枚金徽章, 获得 4 枚银徽章, 获得 18 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2016-03-22
个人主页被 3k 人浏览