Ethan

Ethan 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织 xiaobaiai.net 编辑
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

Ethan 发布了文章 · 3月19日

CMake---优雅地构建软件项目实践(1)

本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net 或者我的CSDN http://blog.csdn.net/freeape

[TOC]

首先说明的是本篇文章不从cmake的整个语法上去讲述,而是从一个实际项目的构建上入手,去了解如何优雅的去构建一个软件项目,搭建一个C/C++软件项目基本的依赖组件,最后形成一个构建C/C++软件项目的模板,方便后面新项目的重复使用。相信对我们日常的软件项目构建都会有很好的收获。废话不都说,开始。

1 我们需要知道的基础

首先熟悉cmake的一些基操,我们就可以信手捏来的、优雅去构建一个项目,避免踩到不必要的坑。涉及到的有:

  • cmake的变量作用域?
  • cmake中的数据结构?
  • 宏函数与函数?
  • 如何去构建动静态库和找到这些库?
  • 如何去实现支持多平台的项目构建?
  • 如何去构建一个应用?
  • 如何实现项目的最后install?
  • 如何很友好的去展示构建过程的各种级别信息?
  • 如何适配cmake-gui,采用友好的ccmake或者cmake-gui实现构建?

这里概括性说明下常用的cmake知识,总的来说cmake的作用就是让我们找到依赖的头文件和库文件,去编译源文件、链接目标文件(静态库也是目标文件的一个集合),最后生成可执行文件或动/静态库:

  • INCLUDE_DIRECTORIES 将给定的目录添加到编译器用于搜索包含文件(如头文件)的目录中,相对路径被解释为相对于当前源目录。注意目录仅是被添加到当前CMakeLists文件,作用于当前CMakeLists文件相关的库、可执行文件或者子模块编译,对于两个不同CMakeLists.cmake并列的作用是无效的。区别于TARGET_INCLUDE_DIRECTORIES,这个命令的作用只是作用于指定的目标,为指定的目标添加搜索路径。类似的还有TARGET_LINK_LIBRARIES命令(添加需要链接的库文件目录)。
  • PROJECT_SOURCE_DIR: 无疑只要是有包含最新PROJECT()命令声明的CMakeLists.txt,则都是相对当该CMakeLists.txt路径。
  • CMAKE_SOURCE_DIR: 构建整个项目时,可能你依赖的第三方项目,这个变量的值就是最顶层CMakeLists.txt的路径。
  • find_pathfind_library以及 find_package 时,会搜索一些默认的路径。当我们将一些lib安装在非默认搜索路径时,cmake就没法搜索到了,可设置:

    • SET(CMAKE_INCLUDE_PATH "include_path") // find_path,查找头文件
    • SET(CMAKE_LIBRARY_PATH "lib_path") // find_library,查找库文件
    • SET(CMAKE_MODULE_PATH "module_path") // find_package
  • 寻找3rdparty也不一定需要自己去编写FindXX.cmake,也可以直接用include(xxx.cmake)结合find_file命令实现寻找依赖库,find_file寻找到的结果存放到CACHE变量,示例:
# Once done, this will define                                                      
#                                                                                  
#  NANOMSG_INCLUDE_DIR - the NANOMSG include directory                             
#  NANOMSG_LIBRARY_DIR - the SPDLOG library directory                                                                                                                                          
#  NANOMSG_LIBS - link these to use NANOMSG                                        
#                                                                                  
#  SPDLOG_INCLUDE_DIR - the SPDLOG include directory                               
#  SPDLOG_LIBRARY_DIR - the SPDLOG library directory                               
#  SPDLG_LIBS - link these to use SPDLOG                                           
                                                                                   
MACRO(LOAD_LIBNANOMSG os arch)                                                     
    SET(3RDPARTY_DIR ${PROJECT_SOURCE_DIR}/3rdparty/target/${${os}}_${${arch}}) 
    MESSAGE(STATUS "3RDPARTY_DIR: ${3RDPARTY_DIR}")                                
    FIND_FILE(NANOMSG_INCLUDE_DIR include ${3RDPARTY_DIR} NO_DEFAULT_PATH)         
    FIND_FILE(NANOMSG_LIBRARY_DIR lib ${3RDPARTY_DIR} NO_DEFAULT_PATH)             
                                                                                   
    SET(NANOMSG_LIBS                                                               
        nanomsg                                                                    
        pthread                                                                    
        anl          
        PARENT_SCOPE
    )                                                                              
    IF(NANOMSG_INCLUDE_DIR)                                                        
        MESSAGE(STATUS "NANOMSG_LIBS : ${NANOMSG_LIBS}")                           
    ELSE()                                                                         
        MESSAGE(FATAL_ERROR "NANOMSG_LIBS not found!")                             
    ENDIF()                                                                        
ENDMACRO()
  • 条件控制切换示例:
# set target
if (NOT YOUR_TARGET_OS)
    set(YOUR_TARGET_OS linux)
endif()

if (NOT YOUR_TARGET_ARCH)
    set(YOUR_TARGET_ARCH x86_64)
endif()

if (NOT YOUR_BUILD_TYPE)
    set (YOUR_BUILD_TYPE Release)
endif()

......

if(${YOUR_TARGET_ARCH} MATCHES "(arm*)|(aarch64)")
    ......
elseif(${YOUR_TARGET_ARCH} MATCHES x86*)
    ......
  • 交叉编译: CMAKE_TOOLCHAIN_FILE变量,
MESSAGE(STATUS "Configure Cross Compiler")

IF(NOT TOOLCHAIN_ROOTDIR)                                                       
    MESSAGE(STATUS "Cross-Compiler defaut root path: $ENV{HOME}/Softwares/arm-himix200-linux")
    SET(TOOLCHAIN_ROOTDIR "$ENV{HOME}/Softwares/arm-himix200-linux")            
ENDIF()                                                                         

SET(CMAKE_SYSTEM_NAME Linux)                                                    
SET(CMAKE_SYSTEM_PROCESSOR arm)                                                 

SET(CMAKE_C_COMPILER       ${TOOLCHAIN_ROOTDIR}/bin/arm-himix200-linux-gcc)        
SET(CMAKE_CXX_COMPILER     ${TOOLCHAIN_ROOTDIR}/bin/arm-himix200-linux-g++)        

# set searching rules for cross-compiler                                        
SET(CMAKE_FIND_ROOT_PATH_MODE_PROGRAM NEVER)                                    
SET(CMAKE_FIND_ROOT_PATH_MODE_LIBRARY ONLY)                                     
SET(CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY)                                     

SET(YOUR_TARGET_OS linux)                                                       
SET(YOUR_TARGET_ARCH armv7-a) 

SET(CMAKE_CXX_FLAGS "-std=c++11 -march=armv7-a -mfloat-abi=softfp -mfpu=neon-vfpv4 ${CMAKE_CXX_FLAGS}")
  • AUX_SOURCE_DIRECTORY 不会递归包含子目录,仅包含指定的dir目录
  • ADD_SUBDIRECTORY子模块的编译,可以将子文件夹中或者指定外部文件夹下CMakeLists.txt执行相关编译工作。
  • ADD_LIBRARY编译一个动/静态库或者模块,设定的名字需在整个工程中是独一无二的,而且在整个同一个工程中,跟父子文件夹路径无关,我们便可以通过TARGET_LINK_LIBRARIES依赖该模块。
  • ADD_DEFINITIONS(-DTEST -DFOO="foo")添加FOOTEST宏定义。

2 我们要优雅做到的构建

对于一个较大的软件项目,我们会依赖很多第三方的项目,包括源码依赖或者库依赖,然后完整的构建自己的软件项目,则需要去构建依赖项目或者找到我们所需要库;另外,软件项目会考虑到可移植性,即能够在不同的平台上也能够很好友的去构建项目以及将项目转移到另一个开发环境时能够快速的开始构建。

除了上面所说的,我们还需要考虑我们实际软件项目的架构结构,源码结构,可以让开发人员更清晰的、更快速的了解整个项目。

除此之外,C/C++ 程序员长期以来手动管理依赖,即手动查找、安装依赖,再配置构建工具(如 cmake)使用依赖。 cmake 还提供了一系列 find_package 方法帮助简化配置依赖, cmake 还支持多项目/模块管理,如果依赖源码同时被 cmake 管理构建,那么情况会简单很多,这种方式称为源码级依赖管理。 随着代码管理工具 git 出现并被广泛使用,git submodule 提供了一种不错的源码级依赖管理办法。

综上,优雅的构建软件项目,我们实现:

  • 软件项目源码依赖第三方项目
  • 软件项目库依赖第三方项目
  • 软件项目结构清晰
  • 软件项目构建在转换新环境下快速实现构建
  • 软件项目构建过程中的信息友好展示
  • 软件项目构建完成后打包发布
  • 软件项目支持跨平台构建
  • 软件项目支持交叉构建
  • git submodule & cmake管理/构建源码级依赖

另外,我们还实现一个可复用的C/C++最小开发框架(这个到后续文章中讲述):

  • 支持日志记录
  • 支持任务池/线程池
  • 支持常用相关基础操作组件

    • 时间日期操作
    • 文件读写操作
  • 支持valgrind内存泄露检查工具
  • 支持静态代码检查
  • 支持项目文档自动化
  • .....

3 优雅的软件项目结构模板

3.1 模板一

一个独立的应用,应用模块之间是相互联系的,彼此难以分开,这样简单的将所有源文件放一起,头文件放一起,这个对于不是很复杂的应用是很快速的去开始构建和源文件修改操作:

.
├── 3rdparty
├── cmake
├── include
├── src
├── doc
├── tests
├── benchmarks
├── docker
├── CMakeLists.txt

3.2 模板二

源文件与头文件分功能模块存放,这种方式是比较简单,但是如果成为其他项目的3rdparty,则需要在安装上将头文件分离出来,不能很方便的被其他项目直接引用,个人觉得适用于App类项目,而非SDK项目(比如nanomsg这个开源消息中间件库就是将头文件和源文件放一起,但是作为SDK供外部链接就不是很直接、很方便了,需要做install操作之后才可以或者是将头文件搜索范围设置到依赖项目的src级别,且src目录下模块分类很明确):

├── 3rdparty
    ├── submodule # 存放源码依赖
    ├── target # 存放库依赖
    ├── CMakeLists.txt
    ├── cmake # 存放 find_package cmake文件
├── cmake
├── platforms
│   └── linux
│       └── arm.toolchain.cmake
├── src
    ├── moudle1
        ├── source & include file 
    ├── moudle2
        ├── source & include file
    ├── ......
├── doc
├── tests
├── samples
├── benchmarks
├── docker
├── CMakeLists.txt

3.3 模板三

该软件项目可以分为很多模块,各个模块可以互相独立,也可以组合在一起,典型的如opencv项目,当然这个也适用于应用项目,但是应用项目的话目录结构太深,开发编辑上稍有不便:

├── 3rdparty
├── cmake
├── platforms
│   └── linux
│       └── arm.toolchain.cmake
├── include  该目录只是各功能模块头文件的一个汇总包含 
├── modules
    ├── moudle1
        ├── src
        ├── include
    ├── moudle2
    ├── ......
├── doc
├── tests
├── samples
├── benchmarks
├── docker
├── CMakeLists.txt

4 优雅的软件项目结构模板CMake实现

这里我们只去实现模板二,其他模板大同小异。如上面模板章节所述,我们

4.1 目录结构确定

.
├── 3rdparty                # 第三方库源码依赖和库依赖存放位置
│   ├── CMakeLists.txt      # 第三方库源码依赖编译CMakeLists文件
│   ├── spdlog              # 源码依赖示例项目spdlog(github可搜索)
│   └── target              # 库依赖存放目录
│       ├── linux_armv7-a   # 以平台和架构命名区分
│       │   ├── include     # 头文件存放目录
│       │   └── lib         # 库文件存放目录
│       └── linux_x86-64
│           ├── include
│           └── lib
├── cmake                   # 存放项目相关的cmakem模块文件
│   ├── load_3rdparty.cmake
│   ├── messagecolor.cmake
│   ├── toolchain_options.cmake
│   └── utils.cmake
├── CMakeLists.txt          # 项目根目录CMakeLists文件,cmake入口文件
├── conf                    # 项目配置文件存放目录
├── doc                     # 项目文档存放目录
├── platforms               # 项目平台性相关内容存放目录,包括交叉编译
│   └── linux
│       └── arm.himix200.cmake
├── README.md               # 项目说明
├── scripts                 # 相关脚本存放目录,包括持续集成和部署相关
├── src                     # 项目源码目录
│   ├── CMakeLists.txt
│   ├── common
│   ├── logger
│   └── main
└── tests                   # 测试示例源码存放目录
    ├── CMakeLists.txt
    └── test_logger.cpp

4.2 项目版本的管理

不管是SDK或者是APP项目,都会有一个版本,用来记录软件发布的每个节点。软件版本可以方便用户或者自己清楚的知道每个版本都有哪些内容的更新,可以对版本做出使用的选择或者解决版本中遇到的bug。实现版本的管理,需要能够在编译过程中清楚的体现当前版本号,在软件中也能够获取版本号。这里版本编号的管理使用常见的major.minor(.patch)格式,major是最大的版本编号,minor为其次,patch对应着小版本里的补丁级别。当有极大的更新时,会增加major的版号,而当有大更新,但不至于更新major时,会更新minor的版号,若更新比较小,例如只是bug fixing,则会更新patch的版号。版本号格式示例:v1.0v1.2.2等。

在优雅的构建软件模板中,我们将版本信息放置于src/common/version.hpp文件中:

注:所有的文件路径都是相对项目根目录而言。
#pragma once                                                                       

// for cmake
// 用于在CMakeLists文件中解析用
// 0.1.0                                                                 
#define HELLO_APP_VER_MAJOR 0                                                      
#define HELLO_APP_VER_MINOR 1                                                      
#define HELLO_APP_VER_PATCH 0                                                      

#define HELLO_APP_VERSION (HELLO_APP_VER_MAJOR * 10000 + HELLO_APP_VER_MINOR * 100 + HELLO_APP_VER_PATCH)

// for source code
// 用于在项目源码中获取版本号字符串
// v0.1.0                                                           
#define _HELLO_APP_STR(s) #s                                                       
#define HELLO_PROJECT_VERSION(major, minor, patch) "v" _HELLO_APP_STR(major.minor.patch)

在CMakeLists模块文件中我们去解析该文件获取版本号到CMake变量中,在cmake/utils.cmake添加宏函数:

FUNCTION(hello_app_extract_version)                                 
    FILE(READ "${CMAKE_CURRENT_LIST_DIR}/src/common/version.hpp" file_contents) 
    STRING(REGEX MATCH "HELLO_APP_VER_MAJOR ([0-9]+)" _  "${file_contents}")       
    IF(NOT CMAKE_MATCH_COUNT EQUAL 1)                                           
        MESSAGE(FATAL_ERROR "Could not extract major version number from version.hpp")
    ENDIF()                                                                     
    SET(ver_major ${CMAKE_MATCH_1})                                             

    STRING(REGEX MATCH "HELLO_APP_VER_MINOR ([0-9]+)" _  "${file_contents}")       
    IF(NOT CMAKE_MATCH_COUNT EQUAL 1)                                           
        MESSAGE(FATAL_ERROR "Could not extract minor version number from version.hpp")
    ENDIF()                                                                     
    SET(ver_minor ${CMAKE_MATCH_1})                                             
    STRING(REGEX MATCH "HELLO_APP_VER_PATCH ([0-9]+)" _  "${file_contents}")       
    IF(NOT CMAKE_MATCH_COUNT EQUAL 1)                                           
        MESSAGE(FATAL_ERROR "Could not extract patch version number from version.hpp")
    ENDIF()                                                                     
    SET(ver_patch ${CMAKE_MATCH_1})                                             

    SET(HELLO_APP_VERSION_MAJOR ${ver_major} PARENT_SCOPE)                      
    SET (HELLO_APP_VERSION "${ver_major}.${ver_minor}.${ver_patch}" PARENT_SCOPE)
ENDFUNCTION()

在根目录CMakeLists中调用版本宏:

CMAKE_MINIMUM_REQUIRED(VERSION 3.4)                                             

#--------------------------------------------                                   
# Project setting                                                               
#--------------------------------------------                                   
INCLUDE(cmake/utils.cmake)                                                      
HELLO_APP_EXTRACT_VERSION()                                                     

PROJECT(HelloApp VERSION ${HELLO_APP_VERSION} LANGUAGES CXX)                    

MESSAGE(INFO "--------------------------------")                                
MESSAGE(STATUS "Build HelloApp: ${HELLO_APP_VERSION}")

在后面的动静态库生成中就可以设定SOVERSION了,如:

SET_TARGET_PROPERTIES(MyLib PROPERTIES VERSION ${HELLO_APP_VERSION}
                                          SOVERSION ${HELLO_APP_VERSION_MAJOR})

这样就会生成一个liMyLibr.so => liMyLib.so.0 => libMyLib.so.0.1.1的库和相关软链接。不过这个操作谨慎使用,因为在android平台jni依赖带版本的库是无法找到的。

4.3 第三方库库依赖

第三方库依赖需要我们自己写库和头文件查找函数,三方库存放位置以平台和架构作为区分,目录结构随着工程的创建就基本不会改变了。库发现宏函数如下示例:

# Once done, this will define                                                                                                                                                                  
#                                                                               
#  SPDLOG_INCLUDE_DIR - the SPDLOG include directory                            
#  SPDLOG_LIBRARY_DIR - the SPDLOG library directory                            
#  SPDLG_LIBS - link these to use SPDLOG                                        
#                                                                               
#  ......                                                                       
                                                                                
MACRO(LOAD_LIBSPDLOG os arch)                                                   
    SET(3RDPARTY_DIR ${PROJECT_SOURCE_DIR}/3rdparty/target/${${os}}_${${arch}}) 
    MESSAGE(STATUS "3RDPARTY_DIR: ${3RDPARTY_DIR}")                             
    FIND_FILE(SPDLOG_INCLUDE_DIR include ${3RDPARTY_DIR} NO_DEFAULT_PATH)          
    FIND_FILE(SPDLOG_LIBRARY_DIR lib ${3RDPARTY_DIR} NO_DEFAULT_PATH)           
                                                                                
    SET(SPDLOG_LIBS                                                             
        spdlog          
        pthread
        #PARENT_SCOPE no parent                                                 
    )                                                                           
    IF(SPDLOG_INCLUDE_DIR)                                                      
        SET(SPDLOG_LIBRARY_DIR "${SPDLOG_LIBRARY_DIR}/spdlog")                  
        MESSAGE(STATUS "SPDLOG_INCLUDE_DIR : ${SPDLOG_INCLUDE_DIR}")            
        MESSAGE(STATUS "SPDLOG_LIBRARY_DIR : ${SPDLOG_LIBRARY_DIR}")            
        MESSAGE(STATUS "SPDLOG_LIBS : ${SPDLOG_LIBS}")                          
    ELSE()                                                                      
        MESSAGE(FATAL_ERROR "SPDLOG_LIBS not found!")                           
    ENDIF()                                                                     
ENDMACRO()
注意:如SPDLOG_LIBS变量如果宏函数在根目录CMakeLists中调用,所以变量作用域可以作用到所有子目录,如果不是在根目录调用,则需要设置PARENT_SCOPE属性。

在主CMakeLists中调用宏函数实现三方库的信息导入:

INCLUDE(cmake/load_3rdparty.cmake)                                              
                                                                                
IF(NOT YOUR_TARGET_OS)                                                          
    SET(YOUR_TARGET_OS linux)                                                   
ENDIF()                                                                         
IF(NOT YOUR_TARGET_ARCH)                                                        
    SET(YOUR_TARGET_ARCH x86-64)                                                
ENDIF()                                                                         
MESSAGE(STATUS "Your target os : ${YOUR_TARGET_OS}")                            
MESSAGE(STATUS "Your target arch : ${YOUR_TARGET_ARCH}")                        
                                                                                
LOAD_LIBSPDLOG(YOUR_TARGET_OS YOUR_TARGET_ARCH)

4.4 第三方库源码依赖

如果你想依赖第三方项目源码,一起编译,则我们可以通过git submodule来管理第三方源码,实现源码依赖和它的版本管理。当然你可以不用git submodule,直接将源码手动放入3rdparty目录中。

添加一个git submodule:

# url为git项目地址
# path为项目存放目录,可以多级目录,目录名一般为项目名称
# git add <url.git> <path>
# 示例,执行后,会直接拉取项目源码到3rdparty/spdlog目录下,并创建.gitmodule在仓库根目录下
$ git submodule add  https://github.com/gabime/spdlog.git 3rdparty/spdlog

还可以做到带指定分支进行添加操作:

# 注意:命令需要在项目根目录下执行,第一次会直接拉取源码,不用update
$ git submodule add -b v1.x   https://github.com/gabime/spdlog.git 3rdparty/spdlog
$ git submodule update --remote 

最后的.gitmodules文件为:

[submodule "3rdparty/spdlog"]
    path = 3rdparty/spdlog                                                   
    url = https://github.com/gabime/spdlog.git 
    branch = v1.x

实现三方项目源码编译(首先你依赖的三方项目源码是支持CMake构建方式的),在3rdparty/CMakeLists.txt中编写:

CMAKE_MINIMUM_REQUIRED(VERSION 3.4)                                             
PROJECT(HiApp3rdparty)

ADD_SUBDIRECTORY(spdlog) 

在根目录CMakeLists.txt中包含3rdparty中CMakeLists.txt,就可以编译第三方库了:

ADD_SUBDIRECTORY(3rdparty)

通过TARGET_LINK_LIBRARIES就可以指定第三方项目名称实现链接。

4.5 功能模块添加

4.5.1 功能模块编译

比如我们要添加一个日志模块,实现对spdlog项目的一个二次封装,更好的在自己的项目中使用,那么我们建立src/logger目录,里面新建logger.hpplogger.cppCMakeLists.txt三个文件,其中CMakeLists.txt内容是对该日志模块实现编译:

CMAKE_MINIMUM_REQUIRED(VERSION 3.4)

AUX_SOURCE_DIRECTORY(. CURRENT_DIR_SRCS)                                        
ADD_LIBRARY(module_logger ${CURRENT_DIR_SRCS})   
# SPDLOG_LIBS 为spdlog项目库名称
TARGET_LINK_LIBRARIES(module_logger ${SPDLOG_LIBS}) 

然后在src/CMakeLists.txt中包含该日志模块的编译:

ADD_SUBDIRECTORY(logger)

在根目录CMakeLists.txt中包含子目录src,从而实现功能模块的构建:

ADD_SUBDIRECTORY(src)
注: 为了演示,库依赖和源码依赖都是用的spdlog,这里实现日志模块的话需要选择其中一种方式。

4.5.2 可执行文件编译

如果我们需要实现可执行文件对日志模块的调用,我们可以添加src/main/main.cpp文件,在src/CMakeLists.txt中添加对可执行文件的编译:

# main app                                                                      
SET(SRC_LIST ./main/main.cpp)                                                   

ADD_EXECUTABLE(HiApp ${SRC_LIST})
# 配置可执行文件输出目录
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)                           
TARGET_LINK_LIBRARIES(HelloApp module_logger)     

当然,如果使用c++11的特性,我们可以专门创建一个cmake文件cmake/toolchain_options.cmake来配置编译选项,在其中配置c++11编译选项,并在主CMakeLists.txt中包含该cmake文件:

# compiler configuration            
# 从cmake3.1版本开始才支持CMAKE_CXX_STANDARD配置项
IF(CMAKE_VERSION VERSION_LESS "3.1")                                            
    IF(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")                                    
        SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=gnu++11")                  
    ENDIF()                                                                     
ELSE()                                                                          
    SET(CMAKE_CXX_STANDARD 11)                                                  
ENDIF() 

4.6 测试样例添加

测试样例放于tests目录,并在该目录下建立CMakeLists.txt文件用于构建所有测试demo,并在主CMakeLists.txt下包含tests目录:

CMAKE_MINIMUM_REQUIRED(VERSION 3.4)

PROJECT(Tests)                                                                  

INCLUDE_DIRECTORIES(                                                            
    ${SPDLOG_INCLUDE_DIR}                                                       
    ${CMAKE_SOURCE_DIR}/src                                                     
)                                                                               

LINK_DIRECTORIES(                                                               
    ${SPDLOG_LIBRARY_DIR}                                                       
)                                                                               

FILE(GLOB APP_SOURCES *.cpp)                                                    
FOREACH(testsourcefile ${APP_SOURCES})                                          
    STRING(REGEX MATCH "[^/]+$" testsourcefilewithoutpath ${testsourcefile})    
    STRING(REPLACE ".cpp" "" testname ${testsourcefilewithoutpath})             
    ADD_EXECUTABLE( ${testname} ${testsourcefile})                              
    SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin/tests)                   
    TARGET_LINK_LIBRARIES(${testname}                                           
        ${SPDLOG_LIBS}                                                          
        module_logger                                                           
        )                                                                       
ENDFOREACH(testsourcefile ${APP_SOURCES})

然后就可以在tests目录下添加测试程序了,比如test_logger.cpp或者更多的测试demo,tests/CMakeLists.txt会自动将tests目录下所有源文件逐个进行可执行文件生成构建。整个测试样例的构建就完成了。

4.7 交叉编译配置

CMake给我们提供了交叉编译的变量设置,即CMAKE_TOOLCHAIN_FILE这个变量,只要我们指定交叉编译的cmake配置文件,那么cmake会导入该配置文件的中编译器配置,编译选项配置等。我们设计的交叉编译工具链配置文件存放目录在platforms/下,这里我们使用华为海思的一个编译工具,我们按类别命名,创建一个工具栏cmake配置文件platforms/linux/arm.himix200.cmake:

MESSAGE(STATUS "Configure Cross Compiler") 
SET(CMAKE_SYSTEM_NAME Linux)                                                    
SET(CMAKE_SYSTEM_PROCESSOR arm)                                                 

SET(CMAKE_C_COMPILER       arm-himix200-linux-gcc)                              
SET(CMAKE_CXX_COMPILER     arm-himix200-linux-g++)                              

# set searching rules for cross-compiler                                        
SET(CMAKE_FIND_ROOT_PATH_MODE_PROGRAM NEVER)                                    
SET(CMAKE_FIND_ROOT_PATH_MODE_LIBRARY ONLY)                                     
SET(CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY)                                     

SET(YOUR_TARGET_OS linux)                                                       
SET(YOUR_TARGET_ARCH armv7-a)                                                   

SET(CMAKE_SKIP_BUILD_RPATH TRUE)                                                
SET(CMAKE_SKIP_RPATH TRUE)                                                      

# set ${CMAKE_C_FLAGS} and ${CMAKE_CXX_FLAGS}flag for cross-compiled process       
#SET(CROSS_COMPILATION_ARM himix200)                                            
#SET(CROSS_COMPILATION_ARCHITECTURE armv7-a)                                    

# set g++ param                                                                 
# -fopenmp link libgomp                                                         
SET(CMAKE_CXX_FLAGS "-std=c++11 -march=armv7-a -mfloat-abi=softfp -mfpu=neon-vfpv4 \
    -ffunction-sections \                                                       
    -fdata-sections -O2 -fstack-protector-strong -lm -ldl -lstdc++\             
    ${CMAKE_CXX_FLAGS}")
注意:交叉编译工具链是需要在编译主机上安装好的。另外第三方库库依赖也需要对应编译出工具链版本(源码依赖除外)。

命令行执行交叉编译:

$ mkdir build
$ cd build
$ cmake .. -DCMAKE_TOOLCHAIN_FILE=../platforms/linux/arm.himix200.cmake
$ make -j

这样就实现了交叉编译,你也可以配置其他的交叉编译工具链。

4.8 其他

4.8.1 cmake message命令颜色凸显

我们还可以自定义初始化cmake构建的message命令打印颜色,可以方便快速的凸显出错误信息,我们可以创建一个文件cmake/messagecolor.cmake

IF(NOT WIN32)                                
    STRING(ASCII 27 Esc)                                                        
    SET(ColourReset "${Esc}[m")                                                 
    SET(ColourBold  "${Esc}[1m")                                                
    SET(Red         "${Esc}[31m")                                               
    SET(Green       "${Esc}[32m")                                               
    SET(Yellow      "${Esc}[33m")                                               
    SET(Blue        "${Esc}[34m")                                               
    SET(MAGENTA     "${Esc}[35m")                                               
    SET(Cyan        "${Esc}[36m")                                               
    SET(White       "${Esc}[37m")                                               
    SET(BoldRed     "${Esc}[1;31m")                                             
    SET(BoldGreen   "${Esc}[1;32m")                                             
    SET(BoldYellow  "${Esc}[1;33m")                                             
    SET(BOLDBLUE    "${Esc}[1;34m")                                             
    SET(BOLDMAGENTA "${Esc}[1;35m")                                             
    SET(BoldCyan    "${Esc}[1;36m")                                             
    SET(BOLDWHITE   "${Esc}[1;37m")                                             
ENDIF()                                                                         

FUNCTION(message)                                                               
    LIST(GET ARGV 0 MessageType)                                                
    IF(MessageType STREQUAL FATAL_ERROR OR MessageType STREQUAL SEND_ERROR)        
        LIST(REMOVE_AT ARGV 0)                                                  
        _message(${MessageType} "${BoldRed}${ARGV}${ColourReset}")              
    ELSEIF(MessageType STREQUAL WARNING)                                        
        LIST(REMOVE_AT ARGV 0)                                                  
        _message(${MessageType}                                                 
        "${BoldYellow}${ARGV}${ColourReset}")                                   
    ELSEIF(MessageType STREQUAL AUTHOR_WARNING)                                 
        LIST(REMOVE_AT ARGV 0)                                                  
        _message(${MessageType} "${BoldCyan}${ARGV}${ColourReset}")             
    ELSEIF(MessageType STREQUAL STATUS)                                         
        LIST(REMOVE_AT ARGV 0)                                                  
        _message(${MessageType} "${Green}${ARGV}${ColourReset}")                
    ELSEIF(MessageType STREQUAL INFO)                                           
        LIST(REMOVE_AT ARGV 0)                                                  
        _message("${Blue}${ARGV}${ColourReset}")                                
    ELSE()                                                                      
        _message("${ARGV}")                                                     
ENDIF()   

在主CMakeLists.txt中导入该cmake文件,则可以改变message命令各个级别打印的颜色显示。

4.8.2 Debug与Release构建

为了方便debug,我们在开发过程中一般是编译Debug版本的库或者应用,可以利用gdb调试很轻松的就可以发现错误具体所在。在主cmake文件中我们只需要加如下设置即可:

IF(NOT CMAKE_BUILD_TYPE)                                                        
    SET(CMAKE_BUILD_TYPE "Debug" CACHE STRING "Choose Release or Debug" FORCE)  
ENDIF()                                                                         

MESSAGE(STATUS "Build type: " ${CMAKE_BUILD_TYPE})

在执行cmake命令的时候可以设置CMAKE_BUILD_TYPE变量值切换Debug或者Release版本编译:

$ cmake .. -DCMAKE_BUILD_TYPE=Release

4.8.3 构建后安装

对于SDK项目,我们需要对外提供头文件和编译完成后的库文件,就需要用到cmake提供的install命令了。

我们安装需求是:

  • src目录下的每个模块头文件都能够安装,并按原目录存放安装
  • 库文件安装放于lib目录下
  • 可执行文件包括测试文件放于bin目录

首先模块头文件的安装实现均在src/{module}/CMakeLists.txt中实现,这里是安装目录,并过滤掉.cpp或者.c文件以及CMakeLists.txt文件,以logger模块为例:

INSTALL(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
    DESTINATION ${CMAKE_INSTALL_PREFIX}/include
    FILES_MATCHING 
    PATTERN "*.h"
    PATTERN "*.hpp"
    PATTERN "CMakeLists.txt" EXCLUDE
    )
注意:在UNIX系统上,CMAKE_INSTALL_PREFIX变量默认指向/usr/local,在Windows系统上,默认指向c:/Program Files/${PROJECT_NAME}

然后是库文件的安装,也相关ADD_LIBRARY命令调用后中实现:

INSTALL(TARGETS module_logger
    ARCHIVE DESTINATION lib                                                     
    LIBRARY DESTINATION lib                                                     
    RUNTIME DESTINATION bin)

最后是可执行文件的安装,跟安装库是一样的,添加到ADD_EXECUTABLE命令调用的后面,只是因为是可执行文件,属于RUNTIME类型,cmake会自动安装到我们设置的bin目录,这里以HelloApp为例:

INSTALL(TARGETS HelloApp
    ARCHIVE DESTINATION lib                                                     
    LIBRARY DESTINATION lib                                                     
    RUNTIME DESTINATION bin)

执行安装命令:

$ make install DESTDIR=$PWD/install

则会在相对当前目录install/usr/local目录下生成:

.
├── bin
│   ├── HelloApp
│   └── test_logger
├── include
│   ├── common
│   │   ├── common.hpp
│   │   └── version.hpp
│   └── logger
│       └── logger.hpp
└── lib
    └── libmodule_logger.a

至此,安装完成。

5 总结

“工欲善其事,必先利其器”,把基础筑好,在软件开发过程中也是很重要的,就如项目中需求明确一样,本篇文章我把C/C++项目开发的整体框架形成一个模板,不断总结改进,方便后续类似项目的快速开发。本篇文章也主要实现项目构建方面的内容,下一篇准备实现一个基本C/C++框架所必须的基础模块,包括日志模块、线程池、常用基础功能函数模块、配置导入模块、单元测试、内存泄露检查等。如有问题或者改进,一起来交流学习,最后欢迎大家关注我的公众号小白AI,不打广告,不为了写而写,只为了分享自己的学习过程^_^。

整个构建模板源码可以在我的github上找到,欢迎star: https://github.com/yicm/CMakeCppProjectTemplate

6 参考资料

查看原文

赞 0 收藏 0 评论 1

Ethan 赞了回答 · 2月24日

vue axios 请求 https 需要手工处理证书?

可能因为你的证书是自签发的。浏览器不承认

关注 3 回答 1

Ethan 关注了专栏 · 2019-12-26

阿里云栖号

汇集阿里技术精粹-yq.aliyun.com

关注 11808

Ethan 发布了文章 · 2019-12-26

Spring Boot Kafka概览、配置及优雅地实现发布订阅

本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net

[TOC]

1 前言

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!

本篇文章主要介绍Spring Kafka的常用配置、主题自动创建、发布消息到集群、订阅消息(群组)、流处理配置以及嵌入式Kafka做测试配置相关内容,最后通过两种方式去实现消息的发布和订阅功能,其中一种是基于Spring Integration方式。本文内容基于Spring Kafka2.3.3文档Spring Boot Kafka相关文档,Spring创建了一个名为Spring kafka的项目,它封装了Apache的kafka客户端部分(生产者/消费者/流处理等),以便在Spring项目中快速集成kafka,Spring-Kafka项目提供了Apache Kafka自动化配置,通过Spring Boot的简化配置(以spring.kafka.*作为前缀的配置参数),在Spring Boot中使用Kafka特别简单。并且Spring Boot还提供了一个嵌入式Kafka代理方便做测试。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

实现下面的所涉及到的功能实现,需要有如下环境:

  • Java运行或开发环境(JRE/JDK)
  • Kafka安装成功

更多的配置可以参考《Kafka,ZK集群开发或部署环境搭建及实验》这一篇文章。

本文尽量做到阐述逻辑清晰,主要路线就是全局介绍Spring Kafka的主要功能及重点配置,而Spring Boot对Spring Kafka进一步简化配置,通过Spring Boot中的Kafka几大注解实现发布订阅功能,同时通过Spring Integration + 自定义Kafka配置方式实现一个较为复杂的Kafka发布订阅功能,本文通过自己实验和整理了较久的时间,涵盖了Spring Kafka大部分内容,希望大家耐心读下来,有什么问题随时反馈,一起学习。

2 Spring Kafka功能概览

Spring Kafka、Spring Integration和Kafka客户端版本联系或者兼容性如下(截至2019年12月9日):

Spring for Apache KafkaSpring Integration for Apache Kafka Versionkafka-clients
2.3.x3.2.x2.3.1
2.2.x3.1.x2.0.1, 2.1.x, 2.2.x
2.1.x3.0.x1.0.x, 1.1.x, 2.0.0
1.3.x2.3.x0.11.0.x, 1.0.x
具体更多版本特点可以看官网,spring kafka当前最新为2.3.4版本。

Spring Kafka相关的注解有如下几个:

注解类型描述
EnableKafka启用由AbstractListenerContainerFactory在封面(covers)下创建的Kafka监听器注解端点,用于配置类;
EnableKafkaStreams启用默认的Kafka流组件
KafkaHandler在用KafkaListener注解的类中,将方法标记为Kafka消息监听器的目标的注解
KafkaListener将方法标记为指定主题上Kafka消息监听器的目标的注解
KafkaListeners聚合多个KafkaListener注解的容器注解
PartitionOffset用于向KafkaListener添加分区/初始偏移信息
TopicPartition用于向KafkaListener添加主题/分区信息

如使用@EnableKafka可以监听AbstractListenerContainerFactory子类目标端点,如ConcurrentKafkaListenerContainerFactoryAbstractKafkaListenerContainerFactory的子类。

public class ConcurrentKafkaListenerContainerFactory<K,V>
extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
@Configuration
 @EnableKafka
 public class AppConfig {
        @Bean
        public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() {
                ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
                factory.setConsumerFactory(consumerFactory());
                factory.setConcurrency(4);
                return factory;
        }
        // other @Bean definitions
 }
@EnableKafka并不是在Spring Boot中启用Kafka必须的,Spring Boot附带了Spring Kafka的自动配置,因此不需要使用显式的@EnableKafka。如果想要自己实现Kafka配置类,则需要加上@EnableKafka,如果你不想要Kafka自动配置,比如测试中,需要做的只是移除KafkaAutoConfiguration
@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")

2.1 自动创建主题

💡 要在应用启动时就创建主题,可以添加NewTopic类型的Bean。如果该主题已经存在,则忽略Bean。

2.2 发送消息

Spring的KafkaTemplate是自动配置的,你可以直接在自己的Bean中自动连接它,如下例所示:

@Component
public class MyBean {

    private final KafkaTemplate kafkaTemplate;

    @Autowired
    public MyBean(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}

KafkaTemplate包装了一个生产者,并提供了向kafka主题发送数据的方便方法。提供异步和同步(发送阻塞)方法,异步(发送非阻塞)方法返回ListenableFuture,以此监听异步发送状态,成功还是失败,KafkaTemplate提供如下接口:

ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
    T doInKafka(Producer<K, V> producer);
}

sendDefault API 要求已向模板提供默认主题。部分API接受一个时间戳作为参数,并将该时间戳存储在记录中,如何存储用户提供的时间戳取决于Kafka主题上配置的时间戳类型,如果主题配置为使用CREATE_TIME,则记录用户指定的时间戳(如果未指定则生成)。如果将主题配置为使用LOG_APPEND_TIME,则忽略用户指定的时间戳,并且代理将添加本地代理时间。metricspartitionsFor方法委托给底层Producer上的相同方法。execute方法提供对底层生产者的直接访问

要使用模板,可以配置一个生产者工厂并在模板的构造函数中提供它。下面的示例演示了如何执行此操作:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    // KafkaTemplate构造函数中输入生产者工厂配置
    return new KafkaTemplate<Integer, String>(producerFactory());
}

然后,要使用模板,可以调用其方法之一发送消息。

当你使用包含Message<?>参数的方法时,主题、分区和键信息在消息头中提供,有如下子项:

KafkaHeaders.TOPIC
KafkaHeaders.PARTITION_ID
KafkaHeaders.MESSAGE_KEY
KafkaHeaders.TIMESTAMP

如访问头部信息中某一项信息:

public void handleMessage(Message<?> message) throws MessagingException {
    LOGGER.debug("===Received Msg Topic: {}",  message.getHeaders().get(KafkaHeaders.TOPIC));
}

可选的功能是,可以使用ProducerListener配置KafkaTemplate,以获得带有发送结果(成功或失败)的异步回调,而不是等待将来完成。以下列表显示了ProducerListener接口的定义:

public interface ProducerListener<K, V> {
    void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
    void onError(String topic, Integer partition, K key, V value, Exception exception);
    boolean isInterestedInSuccess();
}

默认情况下,模板配置有LoggingProducerListener,它只记录错误,在发送成功时不执行任何操作。只有当isInterestedInSuccess返回true时才调用onSuccess
为了方便起见,如果你只想实现其中一个方法,那么将提供抽象ProducerListenerAdapter。对于isInterestedInSuccess,它返回false。下面演示了异步结果回调:

public void sendMessage(String msg) {
    LOGGER.info("===Producing message[{}]: {}", mTopic, msg);       
    ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            LOGGER.info("===Producing message success");  
        }

        @Override
        public void onFailure(Throwable ex) {
            LOGGER.info("===Producing message failed");  
        }

    });
}

如果希望阻止式发送线程等待结果,可以调用futureget()方法。你可能希望在等待之前调用flush(),或者为了方便起见,模板有一个带有autoFlush参数的构造函数,该构造函数在每次发送时都会导致模板flush()。不过,请注意,刷新可能会显著降低性能:

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

使用DefaultKafkaProducerFactory:

如上面使用KafkaTemplate中所示,ProducerFactory用于创建生产者。默认情况下,当不使用事务时,DefaultKafkaProducerFactory会创建一个供所有客户机使用的单例生产者,如KafkaProducer javadocs中所建议的那样。但是,如果对模板调用flush(),这可能会导致使用同一个生产者的其他线程延迟。从2.3版开始,DefaultKafkaProducerFactory有一个新属性producerPerThread。当设置为true时,工厂将为每个线程创建(和缓存)一个单独的生产者,以避免此问题。

producerPerThread为true时,当不再需要生产者时,用户代码必须在工厂上调用closeThreadBoundProducer()。这将实际关闭生产者并将其从ThreadLocal中移除。调用reset()或destroy()不会清理这些生产者。

创建DefaultKafkaProducerFactory时,可以通过调用只接受属性映射的构造函数(请参阅使用KafkaTemplate中的示例)从配置中获取键和/或值序列化器类,或者序列化程序实例可以传递给DefaultKafkaProducerFactory构造函数(在这种情况下,所有生产者共享相同的实例)。或者,可以提供Supplier<Serializer> s(从版本2.3开始),用于为每个生产者获取单独的Serializer实例:

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

使用ReplyingKafkaTemplate:

版本2.1.3引入了KafkaTemplate的一个子类来提供请求/应答语义。这个类名为ReplyingKafkaTemplate,并且有一个方法(除了超类中的那些方法之外)。下面的列表显示了方法签名:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

结果是一个ListenableFuture,它被结果异步填充(或者超时时出现异常)。结果还有一个sendFuture属性,这是调用KafkaTemplate.send()的结果。你可以使用此Future确定发送操作的结果。这里就不展开了。

2.3 接收消息

可以通过配置MessageListenerContainer并提供消息监听器或使用@KafkaListener注解来接收消息。

2.3.1 消息监听器

使用消息监听器容器(message listener container)时,必须提供监听器才能接收数据。目前有八个消息监听器支持的接口。下面的列表显示了这些接口:

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 作接收的单个ConsumerRecord实例
public interface MessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例
public interface AcknowledgingMessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例。提供对消费者对象的访问。
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例。提供对消费者对象的访问。
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,因为监听器已获得完整的批处理。
public interface BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。
public interface BatchAcknowledgingMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,因为监听器已获得完整的批处理。提供对使用者对象的访问。
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。提供对使用者对象的访问。
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
上述消费者对象不是线程安全的。只能在调用侦听器的线程上调用其方法。
2.3.1.1 消息监听器容器

提供了两个MessageListenerContainer的实现:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer从单个线程上的所有主题或分区接收所有消息(即一个分区只能分配到一个消费者,一个消费者可以被分配多个分区)。ConcurrentMessageListenerContainer委托给一个或多个KafkaMessageListenerContainer实例,以提供多线程使用,从多线程上去处理主题或分区的所有消息。

从Spring Kafka2.2.7版开始,你可以将RecordInterceptor添加到侦听器容器中;在调用侦听器以允许检查或修改记录之前,将调用它。如果拦截器返回null,则不调用侦听器。侦听器是批处理侦听器时不调用侦听器。从2.3版开始,CompositeRecordInterceptor可用于调用多个拦截器。

默认情况下,使用事务时,侦听器在事务启动后调用。从2.3.4版开始,你可以设置侦听器容器的interceptBeforeTx属性,以便在事务启动之前调用侦听器。没有为批处理侦听器提供侦听器,因为Kafka已经提供了ConsumerInterceptor

2.3.1.2 使用KafkaMessageListenerContainer

有如下构造函数可用:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties,
                    TopicPartitionOffset... topicPartitions)

每个都获取一个ConsumerFactory以及有关主题和分区的信息,以及ContainerProperties对象中的其他配置。ConcurrentMessageListenerContainer(稍后介绍)使用第二个构造函数跨使用者实例分发TopicPartitionOffsetContainerProperties具有以下构造函数:

public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)

第一个构造函数接受一个TopicPartitionOffset参数数组来显式地指示容器要使用哪些分区(使用消费者的 assign()方法)和可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值是相对于分区内的当前最后偏移量。提供了TopicPartitionOffset的构造函数,该构造函数接受一个附加的布尔参数。如果是true,则初始偏移(正偏移或负偏移)相对于该消耗器的当前位置。容器启动时应用偏移量。第二个是主题数组,Kafka基于group.id属性:在组中分布分区来分配分区。第三个使用regex表达式来选择主题。

要将MessageListener分配给容器,可以在创建容器时使用ContainerProps.setMessageListener方法。下面的示例演示了如何执行此操作:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

注意当创建一个Defaultkafkafkaconsumerfactory时,使用构造器,该构造器仅以其特性为基础,就意味着从配置中获取了key/value的Deserializer类别。或者,反序列化程序实例可以传递给key/value的DefaultKafkaConsumerFactory构造函数,在这种情况下,所有消费者共享相同的实例。另一个选项是提供Supplier<Deserializer>s(从版本2.3开始),用于为每个使用者获取单独的反序列化程序实例:

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有关可以设置的各种属性的更多信息,请参阅Javadoc 中ContainerProperties

从版本Spring Kafka 2.1.1开始,一个名为logContainerConfig的新属性就可用了。当启用true和INFO日志记录时,每个侦听器容器都会写入一条日志消息,总结其配置属性。

例如,要将日志级别更改为INFO,可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO)

从版本Spring Kafka 2.2开始,添加了名为missingtopicsfailal的新容器属性(默认值:true)。如果代理上不存在任何客户端发布或订阅涉及到的主题,这将阻止容器启动。如果容器配置为侦听主题模式(regex),则不适用。以前,容器线程在consumer.poll()方法中循环,等待在记录许多消息时出现主题。除了日志,没有迹象表明有问题。要恢复以前的行为,可以将属性设置为false,这个时候,Broker设置项allow.auto.create.topics=true,且这个容器属性为false,则会自动创建不存在的topic。

2.3.1.3 使用 ConcurrentMessageListenerContainer

单个构造函数类似于第一个KafkaListenerContainer构造函数。下面的列表显示了构造函数的签名:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它还有一个并发属性。例如,container.setConcurrency(3)即表示创建三个KafkaMessageListenerContainer实例。对于第一个构造函数,Kafka使用它的组管理功能将分区分布到消费者之间。

当监听多个主题时,默认的分区分布可能不是你期望的那样。例如,如果你有三个主题,每个主题有五个分区,并且希望使用concurrency=15,那么你只看到五个活动的消费者,每个消费者从每个主题中分配一个分区,其他十个消费者处于空闲状态。这是因为默认的Kafka PartitionAssignorRangeAssignor(参见其Javadoc)。对于这种情况,你可能需要考虑改用RoundRobinAssignor,它将分区分布到所有使用者。然后,为每个使用者分配一个主题或分区。若要更改PartitionAssignor,你可以在提供给DefaultKafkaConsumerFactory的属性中设置partition.assignment.strategy消费者配置参数(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。

使用Spring Boot时,可以按如下方式分配设置策略:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

对于第二个构造函数,ConcurrentMessageListenerContainerTopicPartition实例分布在委托KafkaMessageListenerContainer实例上。

例如,如果提供了六个TopicPartition实例,并发性为3;每个容器得到两个分区。对于五个TopicPartition实例,两个容器得到两个分区,第三个容器得到一个分区。如果并发性大于TopicPartitions的数量,则会向下调整并发性,以便每个容器获得一个分区。调整分区的方式可以使用命令行工具kafka-topics.sh查询和调整主题上的分区数。还可以添加一个NewTopic Bean,如果NewTopic设定的数目大于当前数目,spring boot的自动配置的KafkaAdmin将向上调整分区。

client.id属性(如果已设置)将附加-n,其中n是对应于并发的消费者实例。当启用JMX时,这是为MBeans提供唯一名称所必需的。

从版本Spring Kafka 1.3开始,MessageListenerContainer提供了对底层KafkaConsumer的度量的访问。对于ConcurrentMessageListenerContainermetrics()方法返回所有目标KafkaMessageListenerContainer实例的度量(metrics)。根据为底层KafkaConsumer提供的client-id度量被分组到Map<MetricName, ?extends Metric>

从2.3版开始,ContainerProperties提供了一个idleBetweenPolls选项,允许侦听器容器中的主循环在KafkaConsumer.poll()调用之间睡眠。从提供的选项中选择实际睡眠间隔作为最小值,并且选择max.poll.interval.ms 消费者配置和当前记录批处理时间之间的差异。

2.3.1.4 提交偏移量

提供了几个提交偏移量的选项。如果enable.auto.commit使用者属性为true,则Kafka将根据其配置自动提交偏移量。如果为false,则容器支持多个AckMode设置(在下一个列表中描述)。默认的确认模式是批处理。从2.3版开始,框架将enable.auto.commit设置为false,除非在配置中显式设置。以前,如果未设置属性,则使用Kafka默认值(true)。消费者 poll()方法返回一个或多个ConsumerRecords。为每个记录调用MessageListener。以下列表描述了容器对每个AckMode采取的操作:

  • RECORD: 当侦听器在处理记录后返回时提交偏移量。
  • BATCH: 处理完poll()返回的所有记录后提交偏移量。
  • TIME: 在处理完poll()返回的所有记录后提交偏移量,只要超过上次提交后的ackTime
  • COUNT: 在处理完poll()返回的所有记录后提交偏移量,只要上次提交后收到ackCount记录。
  • COUNT_TIME: 类似于TIMECOUNT,但如果两个条件都为true,则执行提交。
  • MANUAL: 消息侦听器负责acknowledge()Acknowledgment。之后,应用与BATCH相同的语义。
  • MANUAL_IMMEDIATE: 侦听器调用Acknowledgement.acknowledge()方法时立即提交偏移量。
MANUAL和MANUAL_IMMEDIATE 要求侦听器是AcknowledgingMessageListenerBatchAcknowledgingMessageListener。请参见消息侦听器。

根据syncCommits容器属性,使用消费者上的commitSync()commitAsync()方法。默认情况下,syncCommits为true;另请参阅setSyncCommitTimeout。请参阅setCommitCallback以获取异步提交的结果;默认回调是LoggingCommitCallback,它记录错误(以及调试级别的成功)。

因为侦听器容器有自己的提交偏移的机制,所以它希望Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG为false。从2.3版开始,除非在使用者工厂或容器的使用者属性重写中特别设置,否则它将无条件地将其设置为false。

Acknowledgment有以下方法:

public interface Acknowledgment {
    void acknowledge();
}

此方法使侦听器可以控制何时提交偏移。

从版本2.3开始,确认接口有两个附加方法nack(long sleep)nack(int index, long sleep)。第一个用于记录侦听器,第二个用于批处理侦听器。为侦听器类型调用错误的方法将引发IllegalStateException

nack()只能在调用侦听器的消费者线程上调用。

使用批处理侦听器时,可以在发生故障的批内指定索引。调用nack()时,将在对失败和丢弃的记录的分区执行索引和查找之前提交记录的偏移量,以便在下次poll()时重新传递这些偏移量。这是对SeekToCurrentBatchErrorHandler的改进,SeekToCurrentBatchErrorHandler只能查找整个批次以便重新交付。

注意:通过组管理使用分区分配时,确保sleep参数(加上处理上一次轮询记录所花费的时间)小于consumer max.poll.interval.ms属性非常重要。
2.3.1.5 侦听器容器自动启动和手动启动

侦听器容器实现了SmartLifecycle(通过SmartLifecycle在Spring加载和初始化所有bean后,接着执行一些任务或者启动需要的异步服务),默认情况下autoStartuptrue。容器在后期启动(Integer.MAX-VALUE - 100)。实现SmartLifecycle以处理来自侦听器的数据的其他组件应该在较早的阶段启动。-100为以后的阶段留出了空间,使组件能够在容器之后自动启动。比如我们通过@Bean将监听器容器交给Spring管理,这个时候通过SmartLifecycle自动执行了初始化的任务,但是当我们手动通过new监听器容器实例,则后初始化则不会执行,比如KafkaMessageListenerContainer实例需要手动执行start()

autoStartup在手动执行start中设置true与false没有作用,可以参见@KafkaListener声明周期管理这一小节。

2.3.2 @KafkaListener注解

2.3.2.1 Record Listeners

@KafkaListener注解用于将bean方法指定为侦听器容器的侦听器。bean包装在一个MessagingMessageListenerAdapter中,该适配器配置有各种功能,如转换器,用于转换数据(如有必要)以匹配方法参数。通过使用属性占位符(${…}),或者可以使用SpEL(#{…​})配置注释上的大多数属性。有关更多信息,请参阅Javadoc。

@KafkaListener:

  • id:listener唯一id,当GroupId没有被配置的时候,默认id为自动产生,此值指定后会覆盖group id。
  • containerFactory:上面提到了@KafkaListener区分单数据还是多数据消费只需要配置一下注解的containerFactory属性就可以了,这里面配置的是监听容器工厂,也就是ConcurrentKafkaListenerContainerFactory,配置Bean名称
  • topics:需要监听的Topic,可监听多个,可以是表达式或者占位符关键字或者直接是主题名称,如多个主题监听:{"topic1" , "topic2"}
  • topicPattern: 此侦听器的主题模式。条目可以是“主题模式”、“属性占位符键”或“表达式”。框架将创建一个容器,该容器订阅与指定模式匹配的所有主题,以获取动态分配的分区。模式匹配将针对检查时存在的主题周期性地执行。表达式必须解析为主题模式(支持字符串或模式结果类型)。这使用组管理,Kafka将为组成员分配分区。
  • topicPartitions:用于使用手动主题/分区分配时
  • errorHandler:监听异常处理器,配置Bean名称,默认为空
  • groupId:消费组ID
  • idIsGroup:id是否为GroupId
  • clientIdPrefix:消费者Id前缀
  • beanRef:真实监听容器的Bean名称,需要在 Bean名称前加 "__"

@KafkaListener注解为简单的POJO侦听器提供了一种机制。下面的示例演示如何使用它:

public class Listener {
    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }
}

此机制生效需要@Configuration类之一上的@EnableKafka注解和用于配置基础ConcurrentMessageListenerContainer的侦听器容器工厂。默认情况下,需要名为kafkaListenerContainerFactory的bean。以下示例演示如何使用ConcurrentMessageListenerContain:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

注意,要设置容器属性,必须在工厂上使用getContainerProperties()方法。它用作注入容器的实际属性的模板。

从版本2.1.1开始,现在可以为注解创建的消费者设置client.id属性。clientdprefix的后缀是-n,其中n是一个整数,表示使用并发时的容器号。

从2.2版开始,现在可以通过使用批注本身的属性来重写容器工厂的并发性和自动启动属性。属性可以是简单值、属性占位符或SpEL表达式。下面的示例演示了如何执行此操作:

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

你还可以使用显式主题和分区(以及可选的初始偏移量)配置POJO侦听器。下面的示例演示了如何执行此操作:

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

你可以在partitionspartitionOffsets属性中指定每个分区,但不能同时指定两者。

使用手动AckMode时,还可以向侦听器提供Acknowledgment。下面的示例还演示了如何使用不同的容器工厂:

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

最后,可以从消息头获得有关消息的元数据。你可以使用以下头名称来检索消息头内容:

KafkaHeaders.OFFSET
KafkaHeaders.RECEIVED_MESSAGE_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION_ID
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE

示例:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}
2.3.2.2 批处理侦听器

从版本1.1开始,可以配置@KafkaListener方法来接收从消费者接收的整批消费者记录。要将侦听器容器工厂配置为创建批处理侦听器,可以设置batchListener属性。下面的示例演示了如何执行此操作:

@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    return factory;
}

以下示例显示如何接收有效载荷列表:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

主题、分区、偏移量等在与有效负载并行的头中可用。下面的示例演示如何使用标题:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

或者,您可以接收消息列表Message<?>对象,其中包含每个偏移量和每个消息中的其他详细信息,但它必须是唯一的参数(除了使用手动提交时的Acknowledgment和/Consumer<?, ?>参数)。下面的示例演示如何执行此操作:

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

在这种情况下,不会对有效载荷执行转换。如果BatchMessagingMessageConverter配置了RecordMessageConverter,则还可以向消息参数添加泛型类型,并转换有效负载。有关详细信息,请参阅使用批处理侦听器的负载转换。

你还可以收到一个ConsumerRecord<?, ?>对象,但它必须是唯一的参数(当使用手动提交或Consumer<?, ?>参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操作:

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

从版本2.2开始,侦听器可以接收poll()方法返回的完整的ConsumerRecords<?, ?>对象,允许侦听器访问其他方法,例如partitions()(返回列表中的TopicPartition实例)和records(TopicPartition)(获取选择性记录)。同样,这必须是唯一的参数(当使用手动提交或Consumer<?, ?>参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操作:

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}

2.3.3 @KafkaListener@Payload验证

从2.2版开始,现在更容易添加验证程序来验证@KafkaListener`@Payload参数。以前,你必须配置一个自定义的DefaultMessageHandlerMethodFactory`并将其添加到注册器中。现在,你可以将验证器添加到注册器本身。以下代码说明了如何执行此操作:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(new MyValidator());
    }
}

当你在Spring Boot使用validation starter,会自动配置LocalValidatorFactoryBean,如下例所示:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

以下示例演示如何验证:

public static class ValidatedClass {

  @Max(10)
  private int bar;

  public int getBar() {
    return this.bar;
  }

  public void setBar(int bar) {
    this.bar = bar;
  }

}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

2.3.4 重新平衡监听者

ContainerProperties有一个名为consumerRebalanceListener的属性,该属性接受Kafka客户端的consumerRebalanceListener接口的实现。如果未提供此属性,则容器将配置日志侦听器,该侦听器将在信息级别记录重新平衡事件。该框架还添加了一个子接口ConsumerRawareRebalanceListener。以下列表显示了ConsumerRawareRebalanceListener接口定义:

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}

2.3.5 转发监听者消息

从2.0版开始,如果还使用@SendTo注解注释@KafkaListener,并且方法调用返回结果,则结果将转发到@SendTo指定的主题。如:

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }

}

2.3.6 @KafkaListener生命周期管理

@KafkaListener注解创建的侦听器容器不是应用程序上下文中的bean。相反,它们是用KafkaListenerEndpointRegistry类型的基础设施bean注册的。这个bean由框架自动声明并管理容器的生命周期;它将自动启动任何autoStartup设置为true的容器。所有容器工厂创建的所有容器必须处于同一phase。有关详细信息,请参阅侦听器容器自动启动。你可以使用注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有已注册的容器。或者,可以通过使用单个容器的id属性来获取对该容器的引用。可以在批注上设置autoStartup ,这将覆盖容器工厂中配置的默认设置(setAutoStartup(true))。你可以从应用程序上下文中获取对bean的引用,例如自动连接,以管理其注册的容器。以下示例说明了如何执行此操作:

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

注册表只维护其管理的容器的生命周期;声明为bean的容器不受注册表管理,可以从应用程序上下文中获取。可以通过调用注册表的getListenerContainers()方法来获取托管容器的集合。Spring Kafka版本2.2.5添加了一个方便方法getAllListenerContainers(),它返回所有容器的集合,包括由注册表管理的容器和声明为bean的容器。返回的集合将包括任何已初始化的原型bean,但它不会初始化任何延迟bean声明。

2.4 流处理

Spring for Apache Kafka提供了一个工厂bean来创建StreamsBuilder对象并管理其流的生命周期。只要kafka流在classpath上并且kafka流通过@EnableKafkaStreams注解开启,Spring Boot就会自动配置所需的KafkaStreamsConfiguration bean。

启用Kafka流意味着必须设置应用程序id和引导服务器(bootstrap servers)。前者可以使用spring.kafka.streams.application-id配置,如果未设置,则默认为spring.application.name。后者可以全局设置,也可以专门为流覆写。

使用专用属性可以使用其他几个属性;可以使用spring.Kafka.streams.properties命名空间设置其他任意Kafka属性。有关详细信息,Additional Kafka Properties

默认情况下,由它创建的StreamBuilder对象管理的流将自动启动。可以使用spring.kafka.streams.auto-startup属性自定义此行为。

要使用工厂bean,只需将StreamsBuilder连接到@bean,如下例所示:

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public static class KafkaStreamsExampleConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
                Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

}

默认情况下,由它创建的StreamBuilder对象管理的流将自动启动。可以使用spring.kafka.streams.auto-startup属性自定义此行为。

2.5 附加配置

自动配置支持的属性显示在公用应用程序属性中。注意,在大多数情况下,这些属性(连字符或驼峰样式)直接映射到Apache Kafka点式属性。有关详细信息,请参阅Apache Kafka文档。

前面提到的几个属性应用于所有组件(生产者、消费者、管理员和流),但如果希望使用不同的值,则可以在组件级别指定。Apache Kafka指定重要性为HIGHMEDIUMLOW的属性。Spring Boot自动配置支持所有高重要性属性、某些选定的中、低属性以及任何没有默认值的属性。

只有Kafka支持的属性的一个子集可以通过KafkaProperties类直接使用,如果要使用不直接支持的其他属性配置生产者或消费者,请使用以下属性:

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth

上面的参数设置示例将公共prop.oneKafka属性设置为first(适用于生产者、消费者和管理员),prop.two admin属性设置为secondprop.three consumer属性设置为thirdprop.four producer属性设置为fourthprop.five streams属性设置为fifth

你还可以配置Spring Kafka JsonDeserializer,如下所示:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

类似地,可以禁用JsonSerializer在头中发送类型信息的默认行为:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
注意: 以这种方式设置的属性将覆盖Spring Boot显式支持的任何配置项。

2.6 使用Embdded Kafka做测试

Spring for Apache Kafka提供了一种使用嵌入式Apache Kafka代理测试项目的便捷方法。要使用此功能,请使用Spring Kafka测试模块中的@EmbeddedKafka注解测试类。有关更多信息,请参阅Spring For Apache Kafka参考手册。

要使Spring Boot自动配置与前面提到的嵌入式Apache Kafka代理一起工作,需要将嵌入式代理地址(由EmbeddedKafkaBroker填充)的系统属性重新映射到Apache Kafka的Spring Boot配置属性中。有几种方法可以做到这一点:

  • 提供系统属性以将嵌入的代理地址映射到测试类中的spring.kafka.bootstrap-servers:
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
  • @EmbeddedKafka注解上配置属性名:
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers")
  • 在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

2.7 Spring Integration支持

Spring Integration也有Kafka的适配器,因此我们可以很方便的采用Spring Integration去实现发布订阅,当然你也可以不使用Spring Integration。

Spring Integration是什么,具体有什么作用,可以参考另一篇文章《Spring Integration最详解》。

3 Spring Kafka配置参数

这里对所有配置做个说明的是,spring kafka配置分全局配置和子模块配置,子模块配置会复写全局配置,比如SSL认证可以全局配置,但是也可以在每个子模块,如消费者、生产者、流式处理中都可以单独配置SSL(可能是微服务部署,消费者和生产者不在同一个应用中)。这里重点介绍生产者和消费者配置吧,其他就不展开了,用到的时候再去查找和补充。

3.1 全局配置

# 用逗号分隔的主机:端口对列表,用于建立到Kafka群集的初始连接。覆盖全局连接设置属性
spring.kafka.bootstrap-servers
# 在发出请求时传递给服务器的ID。用于服务器端日志记录
spring.kafka.client-id,默认无
# 用于配置客户端的其他属性,生产者和消费者共有的属性
spring.kafka.properties.*
# 消息发送的默认主题,默认无
spring.kafka.template.default-topic

3.2 生产者

Spring Boot中,Kafka 生产者相关配置(所有配置前缀为spring.kafka.producer.):

# 生产者要求Leader在考虑请求完成之前收到的确认数
spring.kafka.producer.acks
# 默认批量大小。较小的批处理大小将使批处理不太常见,并可能降低吞吐量(批处理大小为零将完全禁用批处理)
spring.kafka.producer.batch-size
spring.kafka.producer.bootstrap-servers
# 生产者可用于缓冲等待发送到服务器的记录的总内存大小。
spring.kafka.producer.buffer-memory
# 在发出请求时传递给服务器的ID。用于服务器端日志记录。
spring.kafka.producer.client-id
# 生产者生成的所有数据的压缩类型
spring.kafka.producer.compression-type
# 键的序列化程序类
spring.kafka.producer.key-serializer
spring.kafka.producer.properties.*
# 大于零时,启用失败发送的重试次数
spring.kafka.producer.retries
spring.kafka.producer.ssl.key-password
spring.kafka.producer.ssl.key-store-location
spring.kafka.producer.ssl.key-store-password
spring.kafka.producer.ssl.key-store-type
spring.kafka.producer.ssl.protocol
spring.kafka.producer.ssl.trust-store-location
spring.kafka.producer.ssl.trust-store-password
spring.kafka.producer.ssl.trust-store-type
# 非空时,启用对生产者的事务支持
spring.kafka.producer.transaction-id-prefix
spring.kafka.producer.value-serializer

3.3 消费者

Spring Boot中,Kafka 消费者相关配置(所有配置前缀为spring.kafka.consumer.):

# 如果“enable.auto.commit”设置为true,设置消费者偏移自动提交到Kafka的频率,默认值无,单位毫秒(ms)
spring.kafka.consumer.auto-commit-interval
# 当Kafka中没有初始偏移或服务器上不再存在当前偏移时策略设置,默认值无,latest/earliest/none三个值设置
# earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset
# 用逗号分隔的主机:端口对列表,用于建立到Kafka群集的初始连接。覆盖全局连接设置属性
spring.kafka.consumer.bootstrap-servers
# 在发出请求时传递给服务器的ID,用于服务器端日志记录
spring.kafka.consumer.client-id
# 消费者的偏移量是否在后台定期提交
spring.kafka.consumer.enable-auto-commit
# 如果没有足够的数据来立即满足“fetch-min-size”的要求,则服务器在取回请求之前阻塞的最大时间量
spring.kafka.consumer.fetch-max-wait
# 服务器应为获取请求返回的最小数据量。
spring.kafka.consumer.fetch-min-size
# 标识此消费者所属的默认消费者组的唯一字符串
spring.kafka.consumer.group-id
# 消费者协调员的预期心跳间隔时间。
spring.kafka.consumer.heartbeat-interval
# 用于读取以事务方式写入的消息的隔离级别。
spring.kafka.consumer.isolation-level
# 密钥的反序列化程序类
spring.kafka.consumer.key-deserializer
# 在对poll()的单个调用中返回的最大记录数。
spring.kafka.consumer.max-poll-records
# 用于配置客户端的其他特定于消费者的属性。
spring.kafka.consumer.properties.*
# 密钥存储文件中私钥的密码。
spring.kafka.consumer.ssl.key-password
# 密钥存储文件的位置。
spring.kafka.consumer.ssl.key-store-location
# 密钥存储文件的存储密码。
spring.kafka.consumer.ssl.key-store-password
# 密钥存储的类型,如JKS
spring.kafka.consumer.ssl.key-store-type
# 要使用的SSL协议,如TLSv1.2, TLSv1.1, TLSv1
spring.kafka.consumer.ssl.protocol
# 信任存储文件的位置。
spring.kafka.consumer.ssl.trust-store-location
# 信任存储文件的存储密码。
spring.kafka.consumer.ssl.trust-store-password
# 信任存储区的类型。
spring.kafka.consumer.ssl.trust-store-type
# 值的反序列化程序类。
spring.kafka.consumer.value-deserializer

3.4 监听器

Spring Boot中,Kafka Listener相关配置(所有配置前缀为spring.kafka.listener.):

# ackMode为“COUNT”或“COUNT_TIME”时偏移提交之间的记录数
spring.kafka.listener.ack-count=
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.client-id
spring.kafka.listener.concurrency
spring.kafka.listener.idle-event-interval
spring.kafka.listener.log-container-config
# 如果Broker上不存在至少一个配置的主题(topic),则容器是否无法启动,
# 该设置项结合Broker设置项allow.auto.create.topics=true,如果为false,则会自动创建不存在的topic
spring.kafka.listener.missing-topics-fatal=true
# 非响应消费者的检查间隔时间。如果未指定持续时间后缀,则将使用秒作为单位
spring.kafka.listener.monitor-interval
spring.kafka.listener.no-poll-threshold
spring.kafka.listener.poll-timeout
spring.kafka.listener.type

3.5 管理

spring.kafka.admin.client-id
# 如果启动时代理不可用,是否快速失败
spring.kafka.admin.fail-fast=false
spring.kafka.admin.properties.*
spring.kafka.admin.ssl.key-password
spring.kafka.admin.ssl.key-store-location
spring.kafka.admin.ssl.key-store-password
spring.kafka.admin.ssl.key-store-type
spring.kafka.admin.ssl.protocol
spring.kafka.admin.ssl.trust-store-location
spring.kafka.admin.ssl.trust-store-password
spring.kafka.admin.ssl.trust-store-type

3.6 授权服务(JAAS)

spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=false
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule
spring.kafka.jaas.options.*

3.7 SSL认证

spring.kafka.ssl.key-password
spring.kafka.ssl.key-store-location
spring.kafka.ssl.key-store-password
spring.kafka.ssl.key-store-type
spring.kafka.ssl.protocol
spring.kafka.ssl.trust-store-location
spring.kafka.ssl.trust-store-password
spring.kafka.ssl.trust-store-type

3.8 Stream流处理

spring.kafka.streams.application-id
spring.kafka.streams.auto-startup
spring.kafka.streams.bootstrap-servers
spring.kafka.streams.cache-max-size-buffering
spring.kafka.streams.client-id
spring.kafka.streams.properties.*
spring.kafka.streams.replication-factor
spring.kafka.streams.ssl.key-password
spring.kafka.streams.ssl.key-store-location
spring.kafka.streams.ssl.key-store-password
spring.kafka.streams.ssl.key-store-type
spring.kafka.streams.ssl.protocol
spring.kafka.streams.ssl.trust-store-location
spring.kafka.streams.ssl.trust-store-password
spring.kafka.streams.ssl.trust-store-type
spring.kafka.streams.state-dir

4 Kafka订阅发布基本特性回顾

  • 同一消费组下所有消费者协同消费订阅主题的所有分区

    • 同消费组,多消费者订阅单主题单分区,则分区只会分配给其中一个消费者,除非这个消费者挂掉,才会分配给其他一个消费者消费消息,意思就是其他消费者在旁边看着吃东西
    • 同消费组,N个消费者订阅单主题N个分区,则默认每个消费者都会被分配一个分区
    • 同消费组,N个消费者订阅单主题M个分区,当M > N时,则会有消费者多分配多于一个分区的情况;当M < N时,则会有空闲消费者,类似第一条
    • 所有上面所说的消费者实例可以是线程方式或者是进程方式存在,所说的分区分配机制叫做重平衡(rebalance)
    • 当消费者内成员个数发生变化会触发重平衡;订阅的主题个数发生变化会触发重平衡;订阅的主题分区个数发生变化会触发重平衡;
    • 总之就是一个分区只能分配到一个消费者,一个消费者可以被分配多个分区
  • 消费者offset管理机制

    • 每个主题分区中的消息都有一个唯一偏移值,具有先后顺序,与消费者具有对应关系,消费者每消费一条消息,偏移量加1,并记录在消费者本地,并定期的将记录同步到服务端(Broker),这里的同步机制是可以设置的
    • 消息是被持久化的,当组内所有消费者重新订阅主题时,可以设置是否从头开始消费消息或者是从最后记录的偏移值位置开始消费
  • 分区和消费者个数如何设置

    • 我们知道主题分区是分布在不同的Broker上的,每个分区对应一个消费者,从而具有消息处理具有很高的吞吐量
    • 分区是调优Kafka并行度的最小单元,多线程消费者连接多分区消费消息,在实现上,通过socket连接,因此也会占用文件句柄个数
    • 创建分区都是会占用一定内存的,并不是分区越多越好,当然现在kafka社区在优化这一部分,让分区数达到更大,性能也不会有所影响

具体怎么调优副本、分区、消费者等这里就不展开了,后面专门来研究这个问题。

5 发布订阅示例

实现下面的示例需要的环境:

  • Kafka + Zookeeper单点服务器或集群已配置好(如果环境搭建不熟悉,可以去翻看前面写的关于Kafka的环境搭建和测试那一篇),或者是使用Spring-kafka-testembedded Kafka Server
  • Spring Boot开发环境(2.2.1)

    • JDK(1.8或以上)
    • STS(4.4.RELEASE)
    • MARVEN构建方式

5.1 使用Embedded Kafka Server

我们知道Kafka是Scala+Zookeeper构建的,可以从官方网站下载部署包并在本地部署。不过,Spring Kafka Test已经封装了Kafka测试的带注解的一键式功能,以打开Kafka服务器,从而简化了验证Kafka相关功能的开发过程,使用起来也非常简单。

添加依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

启动服务,下面使用Junit测试用例直接启动Kafka服务器服务,包括四个代理节点,Run as JUnit Test。:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
    @Test
    public void contextLoads()throws IOException {
        System.in.read();
    }
}

@EmbeddedKafka中可以设置相关参数:

  • value: 设置创建代理的个数
  • count: 同value
  • ports: 代理端口号列表
  • brokerPropertiesLocation:指定配置文件,如 "classpath:application.properties"
注意:EmbeddedKafka这样默认是没有创建主题的。会提示Topic(s) [test] is/are not present and missingTopicsFatal is true错误。@EmbeddedKafka默认情况是创建一个代理,该代理具有一个不带任何参数的随机端口,它将在启动日志中输出特定端口和默认配置项。

5.2 简单的发布订阅实现(无自定义配置)

下面实现一个简单发布订阅功能,通过前端WEB调用一个API,然后在该API控制器中得到请求后生产者开始发送消息,消费者后台监听消息,如果收到消费者消息,则打印出来。

5.2.1 添加依赖及配置Kafka

添加Kafka依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>            
</dependency>

配置Kafka,这里消费者和生产者在同一应用中,我们只需要配置Kafka Brokers的服务地址+端口:

server: 
  port: 9000
spring:
  kafka:
    bootstrap-servers: 10.151.113.57:9092,10.151.113.57:9093,10.151.113.57:9094
    listener:
        # 设置不监听主题错误,false时,如果broker设置了llow.auto.create.topics = true,生产者发送到未创建主题时,会默认自动创建主题
        # 且默认创建的主题是单副本单分区的
        missing-topics-fatal: false
    consumer:
        # 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)
        auto-offset-reset: earliest

5.2.2 添加生产者

@Service
public class Producer {

    private static final Logger LOGGER = LogManager.getLogger(Producer.class);
    private static final String TOPIC = "users";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        LOGGER.info(String.format("===Producing message: {}", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}

5.2.3 添加消费者

@Service
public class Consumer {

    private static final Logger LOGGER = LogManager.getLogger(Consumer.class);

    @KafkaListener(topics = "test", groupId = "group_test")
    public void consume(String message) throws IOException {
        LOGGER.info(String.format("#### -> Consumed message -> %s", message));
    }
   
}

5.2.4 添加WEB控制器

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        this.producer = producer;
    }

    @GetMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
    }
}

5.2.5 测试

添加Spring Boot Application:

@SpringBootApplication
public class TestKafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestKafkaApplication.class, args);
    }

}

启动Kafka Brokers后,需要手动创建主题(如果想自动创建,则需要借助KafkaAdmin,或者是Kafka Broker设置了allow.auto.create.topics=true且应用设置了listener.missing-topics-fatal=false):

# 如果对kafka-topics.sh这里不熟悉,可以去翻看前面写的关于Kafka的相关文章(环境搭建和测试那一篇)
# 创建test主题
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 1 --partitions 2 --topic test

打开浏览器测试:

http://localhost:9000/kafka/publish?message=hello

则应用控制台会打印hello。整个发布订阅的实现只使用了跟Kafka相关的@KafkaListener注解接收消息和KafkaTemplate模板发送消息,很是简单。

5.3 基于自定义配置发布订阅实现

上面是简单的通过Spring Boot依赖的Spring Kafka配置即可快速实现发布订阅功能,这个时候我们是无法在程序中操作这些配置的,因此这一小节就是利用我们之前《Spring Boot从零入门7_最新配置文件配置及优先级详细介绍》文章中讲述的自定义配置文件方式去实现发布订阅功能。

实现内容有:

  • 自定义Kafka配置参数文件(非application.properties/yml)
  • 可实现多生产者(每个生产者为单服务单线程),多消费者(非@KafkaListener实现消息监听)
  • 支持SSL安全配置
  • 监听生产者
源码不会直接贴,只给出主体部分。

配置文件:

@Configuration
@ConfigurationProperties(prefix = "m2kc")
@PropertySource("classpath:kafka.properties")
@Validated
public class M2KCKafkaConfig {

    @Value("${m2kc.kafka.bootstrap.servers}")
    private String kafkaBootStrapServers;

    @Value("${m2kc.kafka.key.serializer.class}")
    private String kafkaKeySerializerClass;

    ......
    ......
}

生产者:

@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaProducer {
    private static final Logger LOGGER = LogManager.getLogger(KafkaProducer.class);
    private String mTopic = "test";
    private M2KCKafkaConfig mM2KCKafkaConfig;
    private KafkaTemplate<String, String> mKafkaTemplate;
   
    @Autowired
    public KafkaProducer(M2KCKafkaConfig kafkaConfig) {
        mTopic = kafkaConfig.getKafkaSourceTopic();
        mM2KCKafkaConfig = kafkaConfig;     
        mKafkaTemplate = getKafkaTemplate();
    }

    public KafkaTemplate<String, String> getKafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
        return kafkaTemplate;
    }

    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeySerializerClass());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueSerializerClass());        
        if (mM2KCKafkaConfig.isKafkaSslEnable()) {
            // TODO : to test
            properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
            properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

            properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
            properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());            
        }

        return new DefaultKafkaProducerFactory<String, String>(properties);
    }
    
    public void sendMessage(String msg) {
        LOGGER.info("===Producing message[{}]: {}", mTopic, msg);       
        ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.info("===Producing message success");  
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.info("===Producing message failed");  
            }

        });
    }
}

消费者:

@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaConsumer implements InitializingBean {
    private static final Logger LOGGER = LogManager.getLogger(KafkaConsumer.class);

    private String mTopic;
    private M2KCKafkaConfig mM2KCKafkaConfig;
    private KafkaMessageListenerContainer<String, String> mKafkaMessageListenerContainer; 

    @Autowired
    public KafkaConsumer(M2KCKafkaConfig kafkaConfig) {
        LOGGER.info("===KafkaConsumer construct");
        mTopic = kafkaConfig.getKafkaSourceTopic();
        mM2KCKafkaConfig = kafkaConfig;
    }
    
    @PostConstruct
    public void start(){
        LOGGER.info("===KafkaConsumer start");        
    }
    
    @Override  
    public void afterPropertiesSet() throws Exception {          
        LOGGER.info("===afterPropertiesSet is called");      
        createContainer();
    }  

    private void createContainer() {
        mKafkaMessageListenerContainer =  createKafkaMessageListenerContainer();
        mKafkaMessageListenerContainer.setAutoStartup(false);;
        mKafkaMessageListenerContainer.start();
        LOGGER.info("===", mKafkaMessageListenerContainer);
    }
    
    private KafkaMessageListenerContainer<String, String> createKafkaMessageListenerContainer() {
        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(),
                createContainerProperties());
        LOGGER.info("===createKafkaMessageListenerContainer");
        return container;
    }
   
    private ContainerProperties createContainerProperties() {
        ContainerProperties containerProps = new ContainerProperties(mTopic);
        containerProps.setMessageListener(createMessageListener());
        return containerProps;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeyDeserializerClass());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueDeserializerClass());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, mM2KCKafkaConfig.getKafkaConsumerGroupID());
        if (mM2KCKafkaConfig.isKafkaSslEnable()) {
            // TODO : to test
            properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
            properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

            properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
            properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());
        }

        return new DefaultKafkaConsumerFactory<String, String>(properties);
    }

    private MessageListener<String, String> createMessageListener() {
        return new MessageListener<String, String>() {
            @Override
            public void onMessage(ConsumerRecord<String, String> data) {
                // TODO Auto-generated method stub
                LOGGER.info("===Consuming msg: {}", data.value());
            }

        };
    }
}

继承InitializingBean只是为了初始化,也可以去掉,将初始化写入了构造函数中。这里的消费者和生产者都使用@Scope,所以需要手动获取实例,通过context去调用getBean()。另外配置文件没有写全,这里需要注意。

5.3 基于Spring Integration发布订阅实现

Spring Integration也有对Kafka支持的适配器,采用Spring Integration,我们也能够快速的实现发布订阅功能,且实现群组多消费者批量消费功能:

  • 实现Kafka自定义配置类
  • 采用Spring Integration
  • 发布订阅
  • 群组多消费者批量消费
  • 采用DSL特定领域语法去编写
  • 生产者发布成功与失败异常处理

我们可以先看看整体的Kafka消息传递通道:

  • 出站通道中KafkaProducerMessageHandler用于将消息发送到主题
  • KafkaMessageDrivenChannelAdapter用于设置入站通道和消息处理

具体的Demo可以参考Github中的一个sample :

6 总结

本篇文章详细介绍了Spring Kafka的发送消息和接收消息功能,其他包括Spring Kafka Stream的简单介绍、Spring Kafka参数配置,以及在Spring Boot中如何通过三种方式去实现Kafka的发布订阅功能,涉及了Kafka的多消费者多订阅者,SSL安全传输,Spring Integration Kafka等。文章很长,把握总体,结合实际,差不多基本内容都有所涉及了。

7 知识扩展

Spring Expression Language(简称SpEL),在Spring中,不同于属性占位符${...},而SpEL表达式则要放到#{...}中(除代码块中用Expression外)。如配置文件中有topics参数spring.kafka.topics,则可以将配置文件中参数传入注解@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")

SpEL表达式常用示例:

// 字面量
#{3.1415926}    // 浮点数
#{9.87E4}       // 科学计数法表示98700
#{'Hello'}      // String 类型
#{false}        // Boolean 类型
// 引用Bean、属性和方法
#{sgtPeppers}                                   // 使用这个bean
#{sgtPeppers.artist}                            // 引用bean中的属性
#{sgtPeppers.selectArtist()}                    // 引用bean中的方法
#{sgtPeppers.selectArtist().toUpperCase()}      // 方法返回值的操作
#{sgtPeppers.selectArtist()?.toUpperCase()}     // 防止selectArtist()方法返回null,?表示非null则执行toUpperCase()
// 访问类作用域的方法和常量的话,使用T()这个关键的运算符
#{T(java.lang.Math)}   
#{T(java.lang.Math).PI}             // 引用PI的值
#{T(java.lang.Math).random()}       // 获取0-1的随机数
#{T(System).currentTimeMillis()}    // 获取时间到当前的毫秒数
// 替代属性占位符获取配置文件属性值
@Value("#{表达式}" 
private String variable;

8 参考资料

查看原文

赞 0 收藏 0 评论 0

Ethan 发布了文章 · 2019-12-22

Spring Boot 基于Spring Integration 实现MQTT客户端简单订阅发布功能

本文属于翻译,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net

[TOC]

1 简介

Spring Integration 提供入站(inbound)和出站(outbound)通道适配器,以支持MQTT消息协议。使用这两适配器,需要加入依赖:

<!-- Maven -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.2.1.RELEASE</version>
</dependency>
// Gradle
compile "org.springframework.integration:spring-integration-mqtt:5.2.1.RELEASE"

当前的MQTT Integration实现使用的是Eclipse Paho MQTT客户端库。两个适配器的配置都是使用DefaultMqttPahoClientFactory实现的。有关配置选项的更多信息,请参阅Eclipse Paho MQTT文档定义。

建议配置MqttConnectOptions对象并将其注入工厂(factory),而不是在工厂本身里设置(不推荐使用)MQTT连接选项。

2 Inbound(消息驱动)通道适配器

入站通道适配器由MqttPahoMessageDrivenChannelAdapter实现。常用的配置项有:

  • 客户端ID
  • MQTT Broker URL
  • 待订阅的主题列表
  • 带订阅的主题QoS值列表
  • MqttMessageConverter(可选)。默认情况下,默认的DefaultPaHomeMessageConverter生成一条带有字符串有效负载的消息,其头部内容如下:

    • mqtt_topic: 接收消息的主题
    • mqtt_duplicate: 如果消息是重复的,则为true
    • mqtt_qos: 服务质量,你可以将DefaultPahoMessageConverter声明为<bean />并将payloadAsBytes属性设置为true,从而将DefaultPahoMessageConverter返回有效负载中的原始byte[]
  • 客户端工厂
  • 发送超时。仅当通道可能阻塞(例如当前已满的有界队列通道)时才适用。
  • 错误通道。下游异常将以错误消息的形式发送到此通道(如果提供)。有效负载是包含失败消息和原因的MessagingException
  • 恢复间隔。它控制适配器在发生故障后尝试重新连接的时间间隔。默认为10000毫秒(10秒)。
从Spring 4.1版开始,可以省略URL。相反,你可以在DefaultMqttPahoClientFactoryserver URIs属性中提供服务器uri。例如,这样做允许连接到高可用(HA)集群。

Spring 4.2.2开始,当适配器成功订阅到主题了,MqttSubscribedEvent事件就会被触发。当连接失败或者订阅失败,MqttConnectionFailedEvent事件会被触发。这两个事件都能够被一个Bean通过实现ApplicationListener而接收到。另外,名为recoveryInterval的新属性控制适配器在失败后尝试重新连接的时间间隔。默认为10000毫秒(10秒)。

@Component
public class MQTTSubscribedListener implements ApplicationListener<MqttSubscribedEvent> {
    private static final Logger LOGGER = LogManager.getLogger(MQTTSubscribedListener.class);

    @Override
    public void onApplicationEvent(MqttSubscribedEvent event) {
        LOGGER.debug("Subscribed Success: " + event.getMessage());
    }
}
在版本Spring 4.2.3之前,当适配器停止时,客户端总是取消订阅。这是不正确的,因为如果客户端QOS大于0,我们需要保持订阅处于活动状态,以便在下次启动时传递适配器停止时到达的消息。这还需要将客户机工厂上的cleanSession属性设置为false。默认为true。从4.2.3版开始,如果cleanSession属性为false,则适配器不会取消订阅(默认情况下),这个默认行为可以通过在工厂上设置consumerCloseAction属性来重写此行为。它可以有以下值:UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN,最后一项(默认设置)仅在cleanSession属性为true时取消订阅。若要还原到4.2.3之前的行为,请始终使用“取消订阅”设置项。

注意:从Spring 5.0开始,topic、qos和retained属性映射到.RECEIVED_…headers(MqttHeaders.RECEIVED_topic、MqttHeaders.RECEIVED_qos和MqttHeaders.RECEIVED_retained),以避免意外传播到(默认情况下)使用MqttHeaders.topic、MqttHeaders.qos和MqttHeaders.retained headers的出站消息。

public MessageHandler handler() {
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            LOGGER.debug("===Received Msg(topic {}): {}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getPayload());
        }
    };
}

2.1 在运行时添加和删除主题

Spring4.1开始,你可以通过编程更改适配器订阅的主题。Spring Integration提供了addTopic()removeTopic()方法。添加主题时,可以选择指定QoS值(默认是1)。你还可以通过向具有适当有效负载的<control bus/>发送适当的消息来修改主题。示例:

myMqttAdapter.addTopic('foo', 1)

停止和启动适配器对主题列表(topics设置项)没有影响(它不会还原到配置中的原始设置)。这些更改不会保留到应用程序上下文的生命周期之外。新的应用程序上下文将还原为配置的设置。

在适配器停止(或与代理断开连接)时更改主题列表(topics)将在下次建立连接时生效。

2.2 使用Java配置配置

以下Spring Boot应用程序显示了如何使用Java配置配置入站(inbound)适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }
}

2.3 使用Java DSL配置

下面的Spring Boot应用程序提供了使用Java DSL配置入站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlows.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                        "testClient", "topic1", "topic2");)
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

3 出站通道适配器

出站通道适配器由MqttPahoMessageHandler实现,MqttPahoMessageHandler包装在ConsumerEndpoint中。为了方便起见,可以使用名称空间配置它。

从Spring 4.1开始,适配器支持异步发送操作,在确认交付之前避免阻塞。如果需要,可以发出应用程序事件以使应用程序确认传递。

以下列表显示出站通道适配器可用的属性:

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  
    url="tcp://localhost:1883"  
    converter="myConverter"  
    client-factory="clientFactory"  
    default-qos="1"  
    qos-expression="" 
    default-retained="true"  
    retained-expression="" 
    default-topic="bar"  
    topic-expression="" 
    async="false"  
    async-events="false"  
    channel="target" />
  • MQTT Client ID
  • MQTT Broker URL
  • Converter(MqttMessageConver,可选的),默认的DefaultPaHomeMessageConverter可识别以下标题:

    • mqtt_topic: 消息将发送到的主题
    • mqtt_retained: 如果要保留消息,则为true
    • mqtt_qos:消息服务质量
  • 客户端工厂
  • default-qos,默认的服务质量。如果找不到mqtt_qos头或qos表达式返回空值,则使用它。如果提供自定义转换器,则不使用它。
  • 用于计算以确定qos的表达式。缺省值是headers[mqtt_qos]
  • 保留标志的默认值。如果找不到mqtt_retained头,则使用它。如果提供了自定义转换器,则不使用它。
  • 要计算以确定保留布尔值的表达式。默认为headers[mqtt_retained]
  • 消息发送到的默认主题(如果找不到mqtt_topic头,则使用)
  • 要计算以确定目标主题的表达式。默认为headers['mqtt_topic']
  • async如果为true,则调用方不会阻塞。而是在发送消息时等待传递确认。默认值为false(发送将阻塞,直到确认发送)
  • async-events,当async和async事件(async-events)都为true时,将发出MqttMessageSentEvent。它包含消息、主题、客户端库生成的消息id、clientId和clientInstance(每次连接客户端时递增)。当客户端库确认传递时,将发出MqttMessageDeliveredEvent。它包含messageId、clientId和clientInstance,使传递与发送相关。任何ApplicationListener或事件入站通道适配器都可以接收这些事件。请注意,MqttMessageDeliveredEvent可能在MqttMessageSentEvent之前收到。默认值为false
注意,同样地,从Spring 4.1开始,可以省略URL。相反,可以在DefaultMqttPahoClientFactorserver URIs属性中提供服务器uri。例如,这允许连接到高可用(HA)集群。

3.1 使用Java配置配置

下面的Spring Boot应用程序展示了如何使用Java配置配置出站适配器的示例:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

3.2 使用Java DSL配置

下面的Spring Boot应用程序提供了使用Java DSL配置出站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

       @Bean
       public IntegrationFlow mqttOutboundFlow() {
           return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

4 参考资料

本文属于原创,转载注明出处,欢迎关注CSDNfreeape或微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net

查看原文

赞 0 收藏 0 评论 0

Ethan 发布了文章 · 2019-12-15

Kafka,ZK集群开发或部署环境搭建及实验

本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net 或者我的CSDN http://blog.csdn.net/freeape

[TOC]

1 前言

Kafka其他特点这里就不罗嗦了,本篇主要是集群开发环境的搭建。具体Kafka相关介绍内容和周边分析可以见另一篇文章《Kafka及周边深度了解》。Kafka是依赖Zookeeper的,单机单服务也需要。Apacke Kafka维护团队开始讨论去除Zookeeper了(2019年11月6日),目前,Kafka使用ZooKeeper来存储分区和代理的元数据,并选择一个Broker作为Kafka控制器,而希望通过删除对ZooKeeper的依赖,将使Kafka能够以一种更具伸缩性和健壮性的方式管理元数据,实现对更多分区的支持,它还将简化Kafka的部署和配置。但是目前我们还是需要Zookeeper。

Zookeeper是由Java编写的,所以需要先安装JDK。

本篇文章主要实现以下内容:

  • 单机单Kafka Broker和单Zookeeper(默认安装好后启动ZK和Kafka就是了)
  • 单机Kafka Broker集群和Zookeeper集群
  • 多机Kafka Broker集群和Zookeeper集群
另,Zookeeper在Kafka中是自带的,这里就不另外安装Zookeeper了,目的只是构建开发环境。安装环境为Ubuntu16.04。

2 安装

2.1 OpenJDK

这里选择OpenJDK 8安装:

# 软件源用的是清华的
$ sudo apt-get update
$ sudo apt-get install openjdk-8-jdk
# 多版本JDK管理和切换,怎么设置更加参考update-alternatives的用法
sudo update-alternatives --config java
# 查看安装后版本(openjdk version 1.8.0_222)
$ java -version

2.2 Kafka版本和安装

2.2.1 版本说明

Kafka也与Hadoop类似,也有着不同的发行版:

  • Apache Kafka : 迭代速度快,社区响应高,缺乏高级功能,没有提供任何监控框架或工具,有开源监控框架如Kafka manager,连接器(connector)也比较单一,没有与其他外部系统交互的连接器,需要自行编码;
  • Confluent Kafka : LinkedIn离职员工创办了Confluent,专注于提供基于Kafka的企业级流处理解决方案,比如跨数据中心备份、Schema;Confluent Kafka目前分为免费版和企业版两种,免费版包含Schema注册中心、REST proxy访问Kafka的各种功能、更多连接器,但是没有集群监控、跨数据中心备份等高级功能;
  • Cloudera/Hortonworks Kafka : 安装、部署、管理、监控方便,但是降低了kafka的掌控,全由界面操作,而且版本相对社区版滞后;

Kafka1.0和2.0这两个大版本主要还是Kafka Streams的各种改进,在消息引擎方面并未引入太多的重大功能特性。0.11版本在消息引擎处理方面是比较稳定的一个版本。

Apache官网上发布的Kafka版本有如下信息:

因为Kafka是由Scala语言编写的,图中的2.112.12指的是Scala的版本,意思就是由Scala2.11或2.12编译出的Kafka2.3.1二进制版本,而2.3.1才是Kafka的版本。

2.2.2 下载安装

从官方网站 https://kafka.apache.org/downloads 下载Kafka,或者从Github上项目Releases下下载,目前为止最新版本为2.3.1(2019年10月24号发布)。这里我们直接下载二进制版本,如果下载源码版本需要自己去编译。

解压后就可以了,里面是支持LinuxWindows启动脚本的。

3 集群及配置

3.1 默认相关配置(单机单Kafka Broker和单ZK)

  • Kafka服务默认配置文件./config/server.properties

    • broker id配置
    • 日志文件输出目录/tmp/kafka-logs
    • 每个主题的默认日志分区数为1
    • 相关的线程数配置
    • 相关的IO接收发送缓存大小设置
    • ZK服务地址配置,默认为localhost:2181
    • 日志文件保留的最短时限设置,默认为168小时
    • 服务默认监听端口为9092
    • ......
  • ZK的默认配置文件./config/zookeeper.properties

    • 快照数据目录/tmp/zookeeper
    • 服务端口2181
    • 每个客户端IP使用的连接数限制为不限(0)
    • ......
  • 其他配置文件(这里就不展开了,具体使用的时候再来了解)

    • ./config/log4j.properties
    • ./config/connect-log4j.properties
    • ./config/producer.properties
    • ./config/consumer.properties
    • ./config/connect-console-sink.properties
    • ./config/connect-console-source.properties
    • ......

启动Zookeeper,守护进程状态运行: ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

启动Kafka,后台运行:./bin/kafka-server-start.sh config/server.properties &

注意:启动Kafka之前是一定要先启动Zookeeper的,如开始前言所说,Kafka是依赖ZK的。

3.2 单机Kafka Broker集群和Zookeeper集群配置

单机Kafka Broker集群和Zookeeper集群配置的实现,只需要启动多个多个Broker和ZK,每个服务设置不同的监听端口就好了,并设置不同的日志目录(这里举例三个broker):

# Kafka 3个broker配置
# kafka broker1
broker.id=1
listeners = PLAINTEXT://localhost:9092
log.dir=/data/kafka/logs-1
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# kafka broker2
broker.id=2
listeners = PLAINTEXT://localhost:9093
log.dir=/data/kafka/logs-2
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# kafka broker3
broker.id=3
listeners = PLAINTEXT://localhost:9094
log.dir=/data/kafka/logs-3
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# Zookeeper 3个节点配置
# zk 1
dataDir=/tmp/zookeeper-1
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=localhost:2666:3666
server.2=localhost:2667:3667
server.3=localhost:2668:3668
# zk 2
dataDir=/tmp/zookeeper-2
clientPort=2182
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=localhost:2666:3666
server.2=localhost:2667:3667
server.3=localhost:2668:3668
# zk 3
dataDir=/tmp/zookeeper-3
clientPort=2183
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=localhost:2666:3666
server.2=localhost:2667:3667
server.3=localhost:2668:3668

说明:ZK的配置中server.x,这个x是Kafka的Broker ID。而Zookeeper有三个服务,其中一个是主服务,其他两个是从服务,如果有一台服务器关闭,则Zookeeper会自动选择leader。服务器监听三个端口,如上举例中:2181用于客户端连接;2666用于从服务器连接(如果它是领导者);3666用于leader选举阶段的其他服务器连接;ZooKeeper服务器以两种模式运行:独立和复制模式(或叫仲裁模式,复制模式常用于生产环境),独立模式就是只有一台服务器,或者说是只有一个服务。此外,复制模式下initLimit是集群中的follower(从)服务器与leader(主)服务器之间初始连接 时能容忍的最多心跳数(tickTime的数量),而tickTime是Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,默认值3000,也就是每个 tickTime 时间就会发送一个心跳,单位毫秒;syncLimit是集群中的从服务器与主服务器之间请求和应答之间能容忍的最多心跳数(tickTime的数量)。

一一启动ZK服务:

$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper1.properties
$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper2.properties
$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper3.properties

但是会报/tmp/zookeeper-1/myid file is missing异常,这是因为我们现在是单机多服务部署,要想ZK能够识别每一个服务,需要在dataDir这个参数对应目录创建并设置myid文件,里面填一个数字,就是server.xx这个对应数字。

启动好所有ZK服务后,一一启动Kafka Broker:

$ ./bin/kafka-server-start.sh  config/server1.properties &
$ ./bin/kafka-server-start.sh  config/server2.properties &
$ ./bin/kafka-server-start.sh  config/server3.properties &

3.3 多机Kafka Broker集群和Zookeeper集群配置

多级Kafka Broker集群和Zookeeper集群配置跟单机多Kafka Broker集群+Zookeeper集群是一样的。不过ZK端口和Kafka端口可以设置成一样,注意此时的连接ip都是各个主机的ip地址。如果没有多机,可以用Docker去模拟实现。

4 日志配置

$KAFKA_HOME/bin/kafka-run-class.sh

5 实验

5.1 消息发布订阅

接下来我们用Kafka里提供了脚本工具来测试主题的发布/订阅,只有单机单Broker和单ZK时,--zookeeperbroker-listbootstrap-server指定单个即可。

  1. 使用 kafka-topics.sh 创建单分区单副本的主题users
# 创建后,主题会持久化到本地,重启服务后还有,需要用--delete选项删除
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 1 --partitions 1 --topic users
# 删除主题(主题已经在订阅的过程中是无法删除的)
$ ./bin/kafka-topics.sh --delete --zookeeper localhost:2181,localhost:2182,localhost:2183 --topic users
  1. 查看我们创建的主题
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181,localhost:2182,localhost:2183
  1. 往主题users发送消息
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic users
> hello
> world
  1. 接收消息并在终端打印
# --from-beginning 是指将历史未消费消息开始消费(针对同一个消费者,不同的消费者都会从最早的消息开始消费)
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic users --from-beginning

然后就可以收到生产者发送的消息了。

注意:Kafka 从 2.2 版本开始将 kafka-console-consumer.shkafka-topics.sh等脚本中的 −−zookeeper (用该参数,则消息由ZK管理)参数标注为 “过时”,推荐使用 −−bootstrap-server 参数,−−bootstrap-server指定的不是zookeeper的服务地址,而是Kafka的服务地址,消息由Kafka管理。很显然,zookeeper就会慢慢被取代了。另副本和分区的详细理解可以参考《Kafka及周边深度了解》。

5.2 Kafka Connect导入导出数据

Kafka Connect的作用我们从上图中可以看到,可以从本地数据库或者文件中导入数据,通过Kafka Connect Source到Brokers集群再到Kafka Connect Sink(或者到指定的Topic,图中没有示意出来),再到消费者或者其他目标数据库。这里我们展示通过本地一个文本文件中写入数据,然后实现上面的数据通路。需要用到connect-standalone.sh或者connect-distributed.sh(Kafka Connect集群)。

  1. 配置connect-standalone.sh的配置文件./config/connect-standalone.properties,单Broker可以不用配置
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
  1. 配置connect-file-source.properties参数(没做任何修改,保持默认配置)
# 默认输入是文件流类型,这里主要是配置输入的文件名,和创建的主题
name=local-file-source                                                                                                                      
connector.class=FileStreamSource                                                                                                                             
tasks.max=1                                                                                                                                                  
file=test.txt                                                                                                                                                
topic=connect-test
  1. 配置connect-file-sink.properties参数(没做任何修改,保持默认配置)
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1    
file=test.sink.txt                                                                                                                                           topics=connect-test   
  1. 本地创建配置中输入源test.txt
echo -e "xiaobaiai.net\nethan" > test.txt
  1. 开始导入
$ ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  1. 查看结果,本地会生成一个sink配置文件中生成的文件test.sink.txt,另外如果继续往test.txt中输入内容,则这个生成文件里也会接收到内容,或者直接消费connect-test主体,也可以获取内容:
xiaobaiai.net
ethan
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"xiaobaiai.net"}
{"schema":{"type":"string","optional":false},"payload":"ethan"}

5.3 Kafka Streams实验

具体参见 https://kafka.apache.org/23/d... ,这里暂时不展开了。

6 Kafka其他配置说明

示例:

# ---------------------------
# ./config/server.properties
# ---------------------------
# 默认设置创建主题时副本为1
default.replication.factor=1
# 指定创建主题时默认分区数为3
num.partitions=3
配置项类型默认值示例描述
broker.id整型00kafka broker的id
num.network.threads整型33kafka接受和发送消息的线程个数,跟机器cpu核数有关系
num.io.threads整型88服务器用于处理IO请求的(可能包括磁盘I/O)的线程数
socket.send.buffer.bytes整型102400102400linux os的SO_SNDBUF配置参数,如果-1用OS默认配置。
socket.receive.buffer.bytes整型102400102400linux os的SO_RCVBUF配置参数,如果-1用OS默认配置。
socket.request.max.bytes整型104857600104857600套接字消息最大多少字节
log.dirs字符串/tmp/kafka-logs/tmp/kafka-logskafka的数据目录
num.partitions整型11默认partition个数
num.recovery.threads.per.data.dir整型11启动时用于日志恢复和关闭时刷新的每个数据目录的线程数
offsets.topic.replication.factor整型11自动创建topic的时候,当可用节点个数小于这个数字时候,会创建失败直到有充足的节点可用
transaction.state.log.replication.factor整型11transaction topic的复制因子数
transaction.state.log.min.isr整型11覆盖min.insync.replicas 的配置
log.retention.hours整型168168数据最久保存7天
log.segment.bytes整型10737418241073741824kafka一个topic有多个partition组成,一个partition一个segment文件存储,当达到log.segment.bytes或者log.roll.hours(log.roll.ms)阈值的时候会新建一个新的segment文件
log.retention.check.interval.ms整型300000300000log clean检查的interval, 默认5分钟
zookeeper.connect字符串localhost:2181localhost:2181zookeeper服务连接配置
zookeeper.connection.timeout.ms整型60006000zk连接超时设置
group.initial.rebalance.delay.ms整型00更多的group JVM消费进程加入进来的时候,距离上次rebalance的时间间隔

更多可以参见:https://kafka.apache.org/docu...

7 扩展

Kafka是一个具有多种代理架构的高可用性服务。一个主题对应于多个分区,一个分区可以有多个副本。这些副本存储在多个代理中以获得高可用性。但是,尽管有多个分区副本集,但只有一个工作的副本集。默认情况下,第一个分配的副本集(首选副本)是负责写入和读取数据的Leader。当我们升级代理或更新代理配置时,我们需要重新启动服务,然后我们需要将分区转移到可用的代理。这里有三种情况:

  • 直接关闭Broker:当Broker关闭时,Broker集群将重新选择一个新的Broker作为分区领导,并且Broker上的分区在选举期间将短期不可用
  • 打开controlledShutdown:当代理关闭时,代理本身将首先尝试将领导角色转移到其他可用的代理
  • 使用命令行工具:使用bin/kafka-preferred-replica-election.sh手动触发分区负责人角色转换

8 总结

本篇是实践的第一环节,实现了Kafka的集群开发环境搭建,并做了主题创建、消息发布、订阅的实验,下一篇将实现Spring Boot集成Kafka,继续!

9 参考资料

本文属于原创,转载注明出处,欢迎关注CSDNfreeape或微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net

查看原文

赞 0 收藏 0 评论 0

Ethan 发布了文章 · 2019-12-08

Kafka及周边深度了解

本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net 或者我的CSDN http://blog.csdn.net/freeape

[TOC]

0 前言

文章有点长,但是写的都挺直白的,慢慢看下来还是比较容易看懂,从Kafka的大体简介到Kafka的周边产品比较,再到Kafka与Zookeeper的关系,进一步理解Kafka的特性,包括Kafka的分区和副本以及消费组的特点及应用场景简介。

1 简介

Apache Kafka 是一个分布式流处理平台,注意是平台:

  • 发布 & 订阅,类似消息系统,并发能力强,通过集群可以实现数据总线作用,轻轻松松实现流式记录数据分布式读写
  • 以高容错的方式存储海量流式数据
  • 可以在流式记录数据产生时就进行处理

从上面的一个Kafka小型应用架构图可以了解Kafka周边及它的实际能扮演的角色,图中Kafka集群连接了六个数据输入输出部分,分别是Kafka ProducerKafka Connect Source Kafka Streams/KSQLKafka ConsumerKafka Connect Sink。而这些数据的输入输出都可以通过Kafka提供的四个核心API组去解决(除Kafka AdminClient API外):

  • Kafka Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka主题(Topic)
  • Kafka Consumer API 允许一个应用程序订阅一个或多个主题(Topic) ,并且对接收到的流式数据进行处理
  • Kafka Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个主题(Topic)产生的输入流,然后生产一个输出流到一个或多个主题(Topic)中去,在输入输出流中进行有效的转换
  • Kafka Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka Topics连接到已存在的应用程序或者数据库系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

我们对Kafka的发布 & 订阅功能的作用比较清楚,而图中的KSQL和Kafka Streams是怎么个回事呢?

首先我们需要清楚什么是流处理?流处理可以认为是消息的实时处理,比如在一个时间段内,源源不断地有数据信息进来,而每时每刻都能够对这些数据有一个最后的结果处理,那么这就是流处理,而如果是每隔一个小时或者更久处理一次,那叫大数据分析或者批处理。它的特点更多是实时性的分析,在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算,同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。类似的比较有:HadoopStorm以及Spark StreamingFlink是常用的分布式计算组件,其中Hadoop是对非实时数据做批量处理的组件;StormSpark StreamingFlink是针对实时数据做流式处理的组件,而Kafka Streams是后起之秀。

关于KSQL呢?

  • KSQL 是 Apache Kafka 的数据流 SQL 引擎,它使用 SQL 语句替代编写大量代码去实现流处理任务,而Kafka Streams是Kafka中专门处理流数据的
  • KSQL 基于 Kafka 的 Stream API 构建,它支持过滤(filter)、转换(map)、聚合(aggregations)、连接(join)、加窗操作和 Sessionization(即捕获单一会话期间的所有的流事件)等流处理操作,简化了直接使用Stream API编写 Java 或者 Scala 代码,只需使用简单的 SQL 语句就可以开始处理流处理
  • KSQL 语句操作实现上都是分布式的、容错的、弹性的、可扩展的和实时的
  • KSQL 的用例涉及实现实时报表和仪表盘、基础设施和物联网设备监控、异常检测和欺骗行为报警等

2 相关概念简介

  • Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker
  • Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic
  • Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition
  • Replication:副本,一个partition可以设置一个或者多个副本,副本主要保证系统能够持续不丢失地对外提供服务。在创建topic的时候可以设置partition的replication数
  • Segment:段文件,kafka中最小数据存储单位,kafka可以存储多个topic,各个topic之间隔离没有影响,一个topic包含一个或者多个partition,每个partition在物理结构上是一个文件夹,文件夹名称以topic名称加partition索引的方式命名,一个partition包含多个segment,每个segment以message在partition中的起始偏移量命名以log结尾的文件,producer向topic中发布消息会被顺序写入对应的segment文件中。kafka为了提高写入和查询速度,在partition文件夹下每一个segment log文件都有一个同名的索引文件,索引文件以index结尾。
  • Offset:消息在分区中偏移量,用来在分区中唯一地标识这个消息。
  • Producer:消息生产者,负责发布消息到Kafka broker
  • Consumer:消息消费者,向Kafka broker读取消息的客户端
  • Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

3 Kafka与ActiveMQ、ZeroMQ、RabbitMQ、RocketMQ、Redis

这个主要是针对消息中间件的选型评估,这里我们讲述一些概念。其他更详细的这里有篇文章讲了: https://juejin.im/post/5b3204...

3.1 消息队列、点对点和PUB/SUB

在开始之前,我们也需要稍微了解下JMS(Java Messaging System),是一个Java平台中关于面向消息中间件(MOM)的API。JMS支持两种消息模式,一个就是P2P模式,一个就是发布订阅模式。后面会说到哪些消息件支持JMS。

消息队列有两种消息模型:点对点(Point to Point,PTP)和发布/订阅(PUB/SUB)模式。

消息队列点对点,顾名思义,是一个队列,信息只能一对一,一个消息被一个消费者使用完了,那么就不会存在队列中了,就像邮差给别人投递邮件,不可能这封信还有副本,而且还能保证这封信安全送到指定的人手里(这是框架赋予的能力)。

PUB/SUB消息订阅发布就不一样了,它的特征就是支持多对一,一对一,一对多,就像期刊报社一样,出版的期刊或者报纸,需要可以传递到不同人手里,而且还可以拿到以前日期的期刊或者报纸(这是框架赋予的能力)。

  • RabbitMQ是消息代理,持多种消息传递协议,如AMQP,MQTT3.1,XMPP, SMTP, STOMP,HTTP, WebSockets协议,由内在高并发的Erlanng语言开发,用在实时的对可靠性要求比较高的消息传递上。它既支持消息队列点对点,也支持PUB/SUB。RabbitMQ对JMS所有特性并不完全支持(https://www.rabbitmq.com/jms-...
  • Redis以内存数据库而闻名。但是,也可以将其用作消息队列点对点和PUB/SUB管理工具,不过因为内存缓冲区的效率,如果消费者失去了与队列的连接,那么很有可能在连接丢失时丢失消息。另外,在实现消息队列点对点功能上,至少要创建3个队列:主队列、工作队列、被拒绝队列,实现有点复杂。
  • Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件,PUB/SUB就是基本功能了,支持消息优先级、消息有序保证、消息过滤,保证每个消息至少投递一次。RocketMQ的集群消费功能大等于PTP模型。因为RocketMQ单个Consumer Group内的消费者类似于PTP,单个Consumer Group里面的消费者均摊消息,等于实现点对点功能,接收者单位是Group。
  • Apache ActiveMQ支持点对点和PUB/SUB,支持多种跨语言客户端和协议,具有易于使用的企业集成模式和许多高级功能,同时完全支持JMS 1.1和j2ee1.4
  • ZeroMQ是用C实现的,性能高、轻量级自然是它的特点。ZeroMQ 并非严格意义上的 at least once 或者 at most once,以其 Pub/Sub 模式来说,ZeroMQ 构建了消息确认和重传机制,却未对消息进行持久化,那么内存耗尽或者进程崩溃都会造成消息丢失,而重传则可能会造成消息被发送 1 到 n 次。当然,在企业级WEB服务中,尤其是微服务中我们对ZeroMQ的选择是偏少的。
  • Kafka更多的是作为发布/订阅系统,结合Kafka Stream,也是一个流处理系统

3.2 关于持久化

  • ZeroMQ支持内存、磁盘,不支持数据库持久化
  • Kafka支持内存、磁盘(主),支持数据库持久化,支持大量数据堆积
  • RabbitMQ支持内存、磁盘,支持数据堆积,但是数据堆积影响生产效率
  • ActiveMQ支持内存、磁盘,自持数据库持久化
  • RocketMQ的所有消息都是持久化的,先写入系统 pagecache(页高速缓冲存储器),然后刷盘,可以保证内存与磁盘都有一份数据,访问时,直接从内存读取

3.3 关于吞吐量

  • RabbitMQ在吞吐量方面稍逊于Kafka,他们的出发点不一样,RabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的操作;基于存储的可靠性的要求存储可以采用内存或者硬盘。
  • Kafka具有高的吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度,消息处理的效率很高
  • ZeroMQ也具有很高的吞吐量
  • RocketMQ相比RabbitMQ的吞吐量要大,但是没有Kafka的大
  • ActiveMQ相对RabbitMQ而言要弱

3.4 关于集群

  • Kafka:天然的'Leader-Slave'无状态集群,每台服务器既是Master也是Slave
  • ZeroMQ:去中心化,不支持集群
  • RabbitMQ:支持简单集群
  • RocketMQ:支持集群,常用多对'Master-Slave' 模式
  • ActiveMQ:支持简单集群模式,比如'主-备',对高级集群模式支持不好。

3.5 关于负载均衡

  • Kafka:支持负载均衡,结合内置Zookeeper,有效的实现Kafka集群的Load Balancer
  • ZeroMQ:去中心化,不支持负载均衡,本身只是一个多线程网络库
  • RocketMQ:支持负载均衡
  • RabbitMQ:对负载均衡的支持不好
  • ActiveMQ:支持负载均衡,可以基于Zookeeper实现负载均衡

3.6 关于单机队列数

单机队列数越大,单机可以创建更多主题,因为每个主题都是由一批队列组成,消费者的集群规模和队列数成正比,队列越多,消费类集群可以越大。

  • Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长。Kafka分区数无法过多的问题
  • RocketMQ单机支持最高5万个队列,负载不会发生明显变化

4 Kafka Streams与Storm、Spark Streaming、Flink

4.1 流处理框架特点和处理方式

上面我们说过了流处理就是对数据集进行连续不断的处理,聚合,分析的过程,它的延迟要求尽可能的低(毫秒级或秒级),从流处理的几个重要方面来讲述,分布式流处理框架需要具有如下特点:

  • 消息传输正确性保证,保证区分有:

    • 消息At Most Once,即消息可以丢失或者传递一次
    • 消息At Least Once,即消息至少一次,存在重复传递的情况
    • 消息Exactly Once,即消息不会丢失也不会重复传递
  • 高容错性:在发生诸如节点故障、网络故障等故障时,框架应该能够恢复,并且应该从它离开的地方重新开始处理。这是通过不时地检查流到某个持久性存储的状态来实现的。
  • 状态管理:绝大部分分布式系统都需要保持状态处理的逻辑。流处理平台应该提供存储,访问和更新状态信息的能力
  • 高性能:这包括低延迟(记录处理的时间)、高吞吐量(throughput,记录处理/秒)和可伸缩性。延迟应尽可能短,吞吐量应尽可能多,不过这很难同时兼顾到这两者,需要做一个平衡
  • 高级特性Event Time Processing(事件时间处理)、水印、支持窗口,如果流处理需求很复杂,则需要这些特性。例如,基于在源代码处生成记录的时间来处理记录(事件时间处理)
  • 成熟度:如果框架已经被大公司证明并在大规模上进行了测试,这就很好。更有可能在论坛或者其他地方获得良好的社区支持和帮助

流处理的方式有两种:

  • Native Streaming

    • 每一个传入的记录一到达就被处理,而不必等待其他记录。有一些持续运行的进程(我们称之为operators/tasks/bolts,命名取决于框架)会永远运行,并且每个记录都会经过这些进程来进行处理,示例:Storm、Flink、Kafka Streams。
  • Micro-batching

    • 快速批处理,这意味着每隔几秒钟传入的记录都会被批处理在一起,然后以几秒的延迟在一个小批中处理,例如: Spark Streaming

这两种方法都有一些优点和缺点。当每个记录一到达就被处理时,处理结果就感觉很自然,允许框架实现尽可能最小的延迟。但这也意味着在不影响吞吐量的情况下很难实现容错,因为对于每个记录,我们需要在处理后跟踪和检查点。此外,状态管理也很容易,因为有长时间运行的进程可以轻松地维护所需的状态;而小批处理方式,则完全相反,容错是附带就有了,因为它本质上是一个批处理,吞吐量也很高,因为处理和检查点将一次性完成记录组。但它会以一定的延迟为代价,让人感觉不像是自然的流处理。同时,高效的状态管理也将是一个挑战。

4.2 主流流处理框架比对

流处理框架特点缺点
Strom是流处理界的hadoop。它是最古老的开源流处理框架,也是最成熟、最可靠的流处理框架之一非常低的延迟,真正的流处理,成熟和高吞吐量;非常适合不是很复杂流式处理场景;消息至少一次保证机制;没有高级功能,如事件时间处理、聚合、窗口、会话、水印;
Spark Streaming支持Lambda架构,免费提供Spark;高吞吐量,适用于许多不需要子延迟的场景;简单易用的高级api;社区支持好;此外,结构化流媒体更为抽象,在2.3.0版本中可以选择在微批处理和连续流媒体模式之间切换;保证消息恰好传递一次;不是真正的流媒体,不适合低延迟要求;参数太多,很难调参;在许多高级功能上落后于Flink;
Flink支持Lambda架构;开源流媒体领域的创新领导者;第一个真正的流式处理框架,具有所有高级功能,如事件时间处理、水印等;低延迟,高吞吐量,可根据需要配置;自动调整,没有太多参数需要调整;保证消息恰好传递一次;在像Uber、阿里巴巴这样的规模大公司接受。进入流处理界晚,还没被广泛接受;社区支持相对较少,不过在蓬勃发展;
Kafka Streams非常轻量级的库,适用于微服务和物联网应用;不需要专用群集;继承了卡夫卡所有的优良品质;支持流连接,内部使用rocksDb来维护状态。保证消息恰好传递一次;与卡夫卡紧密结合,否则无法使用;刚刚起步,还未有大公司选择使用;不合适重量级的流处理;

总的来说,Flink作为专门流处理是一个很好的选择,但是对于轻量级并且和Kafka一起使用时,Kafka Streaming是个不错的选择。

5 Zookeeper & Kafka?

Zookeeper在Kafka集群中主要用于协调管理,主要作用:

  • Kafka将元数据信息保存在Zookeeper中
  • 通过Zookeeper的协调管理来实现整个kafka集群的动态扩展
  • 实现整个集群的负载均衡
  • Producer通过 Zookeeper 感知 partition 的Leader
  • 保存Consumer消费的状态信息。
  • 通过 ZK 管理集群配置,选举 Kafka Leader,以及在 Consumer Group 发生变化时进行 Rebalance
Zookeeper是由java编写的,所以需要先安装JDK。

5.1 Zookeeper是必须要有的吗?

是的,在Kafka中,尽管你只想使用一个代理、一个主题和一个分区,其中有一个生产者和多个消费者,不希望使用Zookeeper,浪费开销,但是这情况也需要Zookeeper,协调分布式系统中的任务、状态管理、配置等,而且使用单节点的场景显然没有利用到Kafka的优点。

另外,Apacke Kafka维护团队开始讨论去除Zookeeper了(2019年11月6日),目前,Kafka使用ZooKeeper来存储分区和代理的元数据,并选择一个Broker作为Kafka控制器,而希望通过删除对ZooKeeper的依赖,将使Kafka能够以一种更具伸缩性和健壮性的方式管理元数据,启用对更多分区的支持,它还将简化Kafka的部署和配置,因为ZooKeeper是一个单独的系统,具有自己的配置文件语法,管理工具和部署模式。另外Kafka和ZooKeeper配置是分开的,所以很容易出错。例如,管理员可能在Kafka上设置了SASL,并且错误地认为他们已经保护了通过网络传输的所有数据。实际上,这样做还必须在单独的外部ZooKeeper系统中配置安全性。统一两个系统将提供统一的安全配置模型。将来Kafka可能希望支持单节点Kafka模式,这对于想要快速测试Kafka而无需启动多个守护程序的人很有用,删除掉ZooKeeper的依赖关系使之成为可能。

5.2 Zookeeper在Kafka中是自带的,可以使用自定义安装的ZK吗?

这个当然是可以的,你可以不启动Kafka自带的ZK。

6 理解Kafka数据模型: Topics、Partitions及Replication

Kafka的分区机制实现了Topic的水平扩展和顺序性保证。这一节我们深度了解下是怎么回事?

Topic在逻辑上可以被认为是一个队列。每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个队列里。为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件,比如我们创建了一个主题叫xiaobiao,然后Kafka有三个Brokers,结合《Kafka,ZK集群开发或部署环境搭建及实验》这一篇文章中的实验环节,我们创建主题的时候需要指定:

# 利用Kafka提供的命令行脚本,创建两分区两副本的主题xiaobiao
./bin/kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 2 --partitions 2 --topic xiaobiao

两分区,两副本,如何理解呢?我们指定了三个服务,我们将xiaobiao主题分为两个子部分,可以认为是两个子队列,对应的在物理上,我们可以在log.dir参数设定的目录下看到两个文件夹xiaobiao-0xiaobiao-1,不过根据Kafka的分区策略,对于多个Kafka Brokers,分区(多个文件夹)一般会分散在不同的Broker上的log.dir设定的目录下,当只有一个Broker时,所有的分区就只分配到该Broker上,消息会通过负载均衡发布到不同的分区上,消费者会监测偏移量来获取哪个分区有新数据,从而从该分区上拉取消息数据。这是分区的表现。不过分区数越多,在一定程度上会提升消息处理的吞吐量,因为Kafka是基于文件进行读写,因此也需要打开更多的文件句柄,也会增加一定的性能开销,但是Kafka社区已经在制定解决方案,实现更多的分区,而性能不会受太多影响。

如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机 I/O 这个时候对性能影响很大。所以一般来说 Kafka 不能有太多的 Partition。

那副本呢?顾名思义,即主题的副本个数,即我们上面有两个主题分区,即物理上两个文件夹,那么指定副本为2后,则会复制一份,则会有两个xiaobai-0两个xiaobai-1,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时就会失败。那么副本有什么用呢?当Kafka某个代理(Broker)出现故障且无法为请求(Consumer)提供服务时,为了达到可用性的唯一目的而设置有多个数据副本,这样就不担心集群中某个Broker挂掉了,这里也进一步可以知道,达到这个作用,那么一个主题的分区副本是需要在不同的Broker上的,而且对应副本分区是保持数据同步的。不可避免地,副本越多,那么对Kafka的吞吐量是会造成影响的。下图就是Replication Factor等于2时数据同步示意图:

分区Leader: 对于每个分区,都有一个副本被指定为Leader。Leader负责发送和接收该分区的数据,所有其他副本都称为分区的同步副本(或跟随者)。

In sync replicas是分区的所有副本的子集,该分区与主分区具有相同的消息。

比如当Broker2 挂掉后,由于broker 2是分区1的负责人(Leader),因此现在无法访问分区1。发生这个情况的时候Kafka会自动选择一个同步副本(在上图中只有一个副本)并使它成为领导者(Leader)。现在,当broker 2重新上线时,broker 2中分区1可以再次尝试成为Leader。

当然,上面所说副本和分区没有具体深入到内部机制是怎么实现的,怎么保证的,这里就先不展开了。

7 Kafka的Consumer Group

Consumer Group:每一个消费者实例都属于一个消费Group,每一条消息只会被同一个消费Group里的一个消费者实例消费(不同消费Group可以同时消费同一条消息)。不同于一般的队列,Kafka实现了消息被消费完后也不会将消息删除的功能,即我们能够借助Kafka实现离线处理和实时处理,跟Hadoop和Flink这两者特性可以对应起来,因此可以分配两个不同消费组分别将数据送入不同处理任务中。

8 总结

这一篇文章让我们对Kafka有了个基本的认识,可以做消息订阅/发布系统,可以做实时流处理,对Kafka的分区和副本有了一定的认识,对Kafka的消费组的特性也有了个基本了解,接下来就进入实践,实践过后,我们再深入探讨Kafka的内部原理和实现机制。

9 参考资料

本文属于原创,转载注明出处,欢迎关注CSDNfreeape或微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net

查看原文

赞 0 收藏 0 评论 0

Ethan 回答了问题 · 2019-11-26

org.bouncycastle.openssl.PEMParser 需要导入jar包

maven方式导入需要三个依赖,拿走不谢:

        <!-- Oct, 2019 -->
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.64</version>
        </dependency>
        <!-- Feb, 2011 -->
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcpg-jdk16</artifactId>
            <version>1.46</version>
        </dependency>
        <!-- Oct, 2019 -->
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcpkix-jdk15on</artifactId>
            <version>1.64</version>
        </dependency>

关注 2 回答 1

Ethan 发布了文章 · 2019-11-22

Spring Boot从零入门7_最新配置文件配置及优先级详细介绍

本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net 或者我的CSDN http://blog.csdn.net/freeape

[TOC]

0 前言

进入实际项目开发中,我们不仅仅是靠着默认的全局配置文件application.properties来配置我们的项目了,Spring Boot中的配置文件也有不少需要注意的地方,掌握后,可以方便的让我们在做项目中游刃于各种配置了,让配置一目了然,层次清楚,模块清晰,写成总结,另外方便以后参考查阅。该文章的内容是最新的配置文件介绍内容,全部参考最新的官方文档所写,截至2019年11月21日,此时Spring是5.2.1版本,Spring Boot是2.2.1版本。

1 我们需要了解

Spring Boot提供了一个spring-boot-devtoolsjar包,提供了一些方便程序开发的功能,主要是监控程序的变化,然后进行自动重新启动。使用spring-boot-devtools需要在pom.xml中添加依赖项,同时需要设置<optional>true</optional>spring-boot-devtools默认将只在开发环境生效,通过Spring Boot插件打包时默认是不会包含spring-boot-devtools

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <optional>true</optional>
</dependency>

2 简介和全局概述

创建一个Spring Boot项目后,会在src/main/resources目录下默认生成一个全局配置文件application.properties,不过里面啥内容也没有。Spring Boot已经将所有默认配置参数都自动配置好了(https://docs.spring.io/spring...,如果我们在外部配置文件中修改配置,则默认配置参数就会被修改(Externalized Configuration,配置外部化),配置外部化的方式有好几种,可以使用属性文件(properties file)、YAML文件、环境变量和命令行参数将配置外部化,下面内容会详细介绍。

比如配置文件可以是application.properties或者是application.yml,都可以配置,只是里面的配置语法不一样而已,yml格式层次相比properties格式要分明些。

.properties写法:

server.port = 9090
spring.application.name = demoservice

.yml写法:

spring:
    application:
        name: demoservice
   server:
port: 9090
注意:yml格式中一定不要用制表符tab,冒号和值之间一定要有空格。

Spring Boot对参数的重写(覆盖)有一个顺序,这是我们需要注意的,这里概况如下:

  1. 当使用了Devtools时,$HOME/.config/spring-boot文件夹中的Devtools全局设置属性
  2. @TestPropertySource针对对测试的注解
  3. 测试的properties。在@SpringBootTest和测试注释中提供,用于测试应用程序的特定部分
  4. 命令行参数
  5. 来自SPRING_APPLICATION_JSON(内嵌在环境变量或系统属性中的JSON)的属性
  6. ServletConfig初始化参数
  7. ServletContext初始化参数
  8. JNDI属性:java:comp/env
  9. Java系统属性: System.getProperties()
  10. 操作系统环境变量
  11. RandomValuePropertySource,其属性仅在random.*
  12. 打包jar之外的特定于概要文件的应用程序属性(如application-{profile}.properties和对应的YAML变量)
  13. 打包在jar中的特定于概要文件的应用程序属性(如application-{profile}.properties和YAML变量)
  14. 打包jar之外的应用程序属性(application.properties和YAML变量)
  15. 打包在jar中的应用程序属性(application.properties和YAML变量)
  16. @Configuration类上的@PropertySource注解
  17. 默认属性(通过设置SpringApplication.setDefaultProperties指定)

举一个具体的例子来说明上述的顺序是如何生效的:

import org.springframework.stereotype.*;
import org.springframework.beans.factory.annotation.*;

@Component
public class MyBean {

    @Value("${name}")
    private String name;

    // ...
}

比如在应用程序类路径(例如,打包在jar内)上,可以有一个application.properties文件,该文件为name属性设置了默认属性值。在新环境中运行时,可以在jar外部提供application.properties文件,该文件覆盖会覆盖在jar内application.properties。又如对于一次性测试,可以使用特定的命令行开关启动(例如,java -jar app.jar --name="Spring")也可以覆盖name属性值。又如可以JSON格式环境变量$ java -Dspring.application.json='{"name":"test"}' -jar myapp.jar来覆盖。其他方式就不一一举例了。

3 配置参数

3.1 参数设置及加载目录的顺序及作用优先级

参数设置有两种方式语法,一个properties,一个是ymlSpringApplicationapplication.properties文件加载以下位置的属性,并将它们添加到Spring环境中。

  1. 当前项目目录的config子目录
  2. 当前项目根目录
  3. classpath设定目录下的config子目录
  4. classpath设定目录下

上述列表按优先级排序(在列表中较高位置定义的属性将覆盖在较低位置定义的属性,如1中设置的属性值将覆盖2中同属性的属性值)。

注意:用maven构建项目时,src/main/resources目录就是默认的classpath

另外这里说下yml的注意点和特殊用法。

  • yml格式中一定不要用制表符tab,冒号和值之间一定要有空格一定要有空格一定要有空格
  • yml的双引号不会转义字符串里面的特殊字符,特殊字符按本身功能输出,比如
  • yml的单引号会转义字符串里面的特殊字符,输出原来的字符
# 下面会输出得到hello换行xiaobaiai.net
name: "hello\nxiaobaiai.net"
# 下面会输出得到hello\nxiaobaiai.net
name: 'hello\nxiaobaiai.net'
  • yml支持对象、数组、纯量(字符串、布尔值true/false、整数、浮点数、null、时间、日期new Date('1976-07-31'))
# 对象行内写法
students: { name: Steve, age: 22 }
# 数组行内写法
animal: [Cat, Dog]
# 或者数组非行内写法
animal:
    - Cat
    - Dog

3.2 生成配置参数随机值

生成配置参数随机值在测试或者某些场景下是非常有用的。

如我们配置:

#random int
app.location-x=${random.int}
app.location-y=${random.int}

#random int with max
app.user-age=${random.int(100)}

#random int range
app.max-users=${random.int[1,10000]}

#random long with max
app.refresh-rate-milli=${random.long(1000000)}

#random long range
app.initial-delay-milli=${random.long[100,90000000000000000]}

#random 32 bytes
app.user-password=${random.value}

#random uuid. Uses java.util.UUID.randomUUID()
app.instance-id=${random.uuid}

最后输出:locationX=-449689812, locationY=-2048116572, userAge=88, maxUsers=8927, refreshRateMilli=418859, initialDelayMilli=12542150790115755, userPassword=95ea8b43fd16dc26aad0030c1340e723, instanceId=bd252902-54e9-47b3-bebf-a81b1300ff69

3.3 参数间引用(占位符)

在配置参数中可以通过占位符来实现引用之前定义的参数值,如:

app.name=MyApp
app.description=${app.name} is a Spring Boot application

有些人喜欢(例如)使用--port=9000而不是--server.port=9000在命令行上设置配置属性。可以通过在application.properties中使用占位符来启用此行为:

server.port=${port:8080}
注意:如果继承自spring-boot-starter-parentPOM,则maven资源插件的默认筛选标记已从${*}更改为@(即,@maven.token@而不是${maven.token}),以防止与spring样式占位符冲突。如果直接为application.properties启用了Maven筛选,则可能还需要将默认筛选标记更改为其他分隔符,而不是@

3.4 自定义配置文件

3.4.1 方式一

如果不喜欢将application.properties作为配置文件名,可以通过指定spring.config.name环境属性切换到另一个文件名。还可以使用spring.config.location环境属性(目录位置或文件路径的逗号分隔列表)指定配置文件位置。以下示例演示如何指定其他文件名:

$ java -jar myproject.jar --spring.config.name=myConfig

下面的示例演示如何指定两个位置:

$ java -jar myproject.jar --spring.config.location=classpath:/default.properties,classpath:/override.properties

如果spring.config.location包含目录(而不是文件),则需要以/结尾,并且,运行的时候,在加载配置之前,应该附加从spring.config.name配置的名称或者默认配置名称。默认不配置spring.config.location则搜索配置文件顺序为:

file:./config/
file:./
classpath:/config/
classpath:/

使用时配置自定义配置位置时spring.config.location,它们会替换默认位置。例如,如果spring.config.location配置了值classpath:/custom-config/,file:./custom-config/,则搜索顺序将变为:

file:./custom-config/
classpath:custom-config/

当使用配置自定义配置位置时spring.config.additional-location,除了额外配置路径外,还会使用默认位置。在默认位置之前搜索其他位置。

注意:在编程环境中,直接去application.properties中设置spring.config.name是无法生效的,只有在命令行或者设置环境变量export SPRING_CONFIG_NAME=myConfig,或则在代码中去手动编码导入指定路径中的配置文件。

3.4.2 方式二

直接通过编码加载指定配置文件去实现,这种方式其实跟后面小节讲到的自定义属性是相通的,只多了一个指定文件名的注解,更详细的可以看后面操作。比如我们创建test.properties,路径跟也放在src/main/resources下面:

my.app.name=hello
my.app.func=test

然后新建一个参数Bean:

@Configuration
@ConfigurationProperties(prefix="my.app") 
@PropertySource("classpath:test.properties")
public class ConfigTestBean {
    private String name;
    private String func;
    // 省略getter和setter
}

这样就Ok了,怎么是验证可以看自定义配置参数小节。

3.5 命令行配置参数

默认情况下,SpringApplication将任何命令行选项参数(即以--开头的参数,例如--server.port=9000)转换为属性,并将它们添加到Spring环境中。如前所述,命令行属性顺序排在第四,始终优先于其下面属性源。

如果不希望命令行属性添加到Spring环境中,可以在程序中使用SpringApplication.setAddCommandLineProperties(false)禁用它们。

3.6 特定于配置文件的属性(激活profile)

除了application.properties文件外,还可以使用以下命名约定定义特定于配置文件的属性:application-{profile}.properties。环境有一组默认配置文件(默认情况下profile为default,即application-default.properties),如果未设置活动配置文件,则使用默认的application-default.properties文件,加载顺序和优先级还是与application.properties一样的。Spring可使用Profile决定程序在不同环境下执行情况,包含配置、加载Bean、依赖等,Spring的Profile一般项目包含:dev(开发), test(单元测试), qa(集成测试), prod(生产环境)。同样地,Maven也有Profile配置,可在构建过程中针对不同的Profile环境执行不同的操作,包含配置、依赖、行为等,每个Profile中可设置:id(唯一标识), properties(配置属性), activation(自动触发的逻辑条件), dependencies(依赖)等。

说到这里,如何激活profile呢?下面介绍三种方式。

注意:application-{profile}.properties的重写优先级是要高于application.properties的,这个跟配置文件加载顺序没有关系。另外创建application-{profile}.yml文件跟properties是一样的。

3.6.1 方式一

在配置文件中设置,这种方式不灵活,实际开发中不不太会用到

spring.profiles.active=test

3.6.2 方式二

使用占位符,在打包时替换,以Maven为例

第一步在properties中添加(package.target是自定义的参数):

spring.profiles.active=@package.target@

第二步在pom.xml中增加不同环境打包的配置:

<!-- 与Maven build标记并列 -->
<profiles>
    <!-- 开发环境 -->
    <profile>
        <id>dev</id>
        <activation>
            <activeByDefault>true</activeByDefault>
        </activation>
        <properties>
            <package.target>dev</package.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-tomcat</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.tomcat</groupId>
                <artifactId>tomcat-jdbc</artifactId>
            </dependency>
        </dependencies>
    </profile>
    <!-- 生产环境 -->
    <profile>
        <id>prod</id>
        <properties>
            <package.target>prod</package.target>
        </properties>
    </profile>
</profiles>

从上面的配置可以看出,Maven的Profile配置了两个:dev和prod,并且在dev中使用了内嵌Tomcat,而 prod 中没有(这种配置场景如生产环境下使用外部Tomcat,开发时使用内部Tomcat)。

第三步再添加资源过滤实现在构建时修改以“@xxx@”表示的属性,利用Maven的resources插件:

...

<plugins>
    <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
</plugins>

<!-- Maven build标记内 -->
<resources>
    <!-- 不加该resource也可以 -->
    <resource>
        <directory>src/main/resources</directory>
        <excludes>
            <!-- 排除掉src/main/resources下的所有application*.properties文件 -->
            <exclude>application*.properties</exclude>
        </excludes>
    </resource>
    <resource>
        <!-- 指定要处理的资源目录 -->
        <directory>src/main/resources</directory>
        <!-- 是否替换@xx@表示的maven properties属性值 -->
        <filtering>true</filtering>
        <includes>
            <!-- 将文件内容的“@xx@”替换为相应的变量,即package.target -->
            <include>application-${package.target}.properties</include>
        </includes>
    </resource>
</resources>

第四步就是编译打包了:

# 根据Maven Profile的 dev 构建环境包
$ mvn clean package -Dmaven.test.skip=true -Pdev

3.6.3 方式三

通过设置系统环境变量:

export SPRING_PROFILES_ACTIVE=dev

3.6.4 其他方式

Java命令行设定方式:

# 方式一
$ java -jar app.jar --spring.profiles.active=dev
# 方式二
$ java -jar -Dspring.profiles.active=dev demo-0.0.1-SNAPSHOT.jar

注解方式(@ActiveProfiles是Spring Boot的Test starter提供的注解,在单元测试中就比较有用了,只能在/src/test/java中使用):

@ActiveProfiles("dev")

3.6.5 YML特殊方式

YAML文件实际上可以是由---行分隔的一系列文档,每个文档被分别解析为一个展开的配置映射。然后激活dev还是production的方式同样的也有指定环境变量或者命令行方式之类的。

# 通用属性
server:
    port: 9000
---
# dev环境下配置
spring:
    profiles: dev
server:
    port: 9001

---
# production生产环境下配置
spring:
    profiles: production
server:
    port: 0

3.7 自定义属性

Spring已经为我们提供很多的默认参数,不过我们也可以创建自己的配置参数。比如我们在application.properties中创建下面的自定义属性:

#random int
app.location-x=${random.int}
app.location-y=${random.int}

#random int with max
app.user-age=${random.int(100)}

#random int range
app.max-users=${random.int[1,10000]}

#random long with max
app.refresh-rate-milli=${random.long(1000000)}

#random long range
app.initial-delay-milli=${random.long[100,90000000000000000]}

#random 32 bytes
app.user-password=${random.value}

#random uuid. Uses java.util.UUID.randomUUID()
app.instance-id=${random.uuid}

然后创建对应的Java参数组件MyAppProperties.java

@Component
@ConfigurationProperties("app")
    public class MyAppProperties {
    private int locationX;
    private int locationY;
    private int userAge;
    private int maxUsers;
    private long refreshRateMilli;
    private long initialDelayMilli;
    private String userPassword;
    private UUID instanceId;
    
    public int getLocationX() {
        return locationX;
    }
    public void setLocationX(int locationX) {
        this.locationX = locationX;
    }
    public int getLocationY() {
        return locationY;
    }
    public void setLocationY(int locationY) {
        this.locationY = locationY;
    }
    public int getUserAge() {
        return userAge;
    }
    public void setUserAge(int userAge) {
        this.userAge = userAge;
    }
    public int getMaxUsers() {
        return maxUsers;
    }
    public void setMaxUsers(int maxUsers) {
        this.maxUsers = maxUsers;
    }
    public long getRefreshRateMilli() {
        return refreshRateMilli;
    }
    public void setRefreshRateMilli(long refreshRateMilli) {
        this.refreshRateMilli = refreshRateMilli;
    }
    public long getInitialDelayMilli() {
        return initialDelayMilli;
    }
    public void setInitialDelayMilli(long initialDelayMilli) {
        this.initialDelayMilli = initialDelayMilli;
    }
    public String getUserPassword() {
        return userPassword;
    }
    public void setUserPassword(String userPassword) {
        this.userPassword = userPassword;
    }
    public UUID getInstanceId() {
        return instanceId;
    }
    public void setInstanceId(UUID instanceId) {
        this.instanceId = instanceId;
    }
    
    @Override
    public String toString() {
        return "MyAppProperties [locationX=" + locationX + ", locationY=" + locationY + ", userAge=" + userAge
                + ", maxUsers=" + maxUsers + ", refreshRateMilli=" + refreshRateMilli + ", initialDelayMilli="
                + initialDelayMilli + ", userPassword=" + userPassword + ", instanceId=" + instanceId + "]";
    }
}

@ConfigurationProperties 注解向Spring Boot声明该类中的所有属性和配置文件中相关的配置进行绑定。prefix = "app"prefix=可省略) : 声明配置前缀,将该前缀下的所有属性进行映射。@Component 或者@Configuration:将该组件加入Spring Boot容器,只有这个组件是容器中的组件,配置才生效。

提示:也可以通过 @Value("${key}") 读取配置文件中的属性,key = properties文件等号左边的key部分。在我们定义的 Java 参数组件中,还可以对具体的参数进行注解断言,如@Email加到邮件的变量上,则如果注入的不是一个合法的邮件地址则会抛出异常。这类注解还有@AssertFalse 校验false、@AssertTrue 校验true、@DecimalMax(value=10,inclusive=true) 小于等于10,inclusive=true,是小于等于、@DecimalMin(value=,inclusive=)@Max(value=) 小于等于value、@Min(value=) 大于等于value、@Past 检查日期、@Pattern(regex=,flag=) 正则、@Validate 对po实体类进行校验等。

最后还需要加入依赖spring-boot-configuration-processor

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    <exclusions>
        <exclusion>
            <groupId>org.junit.vintage</groupId>
            <artifactId>junit-vintage-engine</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<!-- 加入spring-boot-configuration-processor -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>

校验我们自定义的配置:

@SpringBootApplication
public class Test07HelloworldApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(Test07HelloworldApplication.class, args);
        MyAppProperties bean = context.getBean(MyAppProperties.class);
        System.out.println(bean.toString());
    }
}

// output
// MyAppProperties [locationX=1329054364, locationY=1100464591, userAge=99, maxUsers=8007, refreshRateMilli=733281, initialDelayMilli=54489880705550952, userPassword=76aebd15270f7065adf3d31b5a790829, instanceId=681ed3a4-a561-460c-b826-58229c31b055]

4 总结

上述的内容大部分经过实际Demo验证,示例代码可以在https://github.com/yicm/SpringBootExamples上找到。配置文件这一章细节内容比较多,但是我们把我几个点就好了,这个总结下:

  • Spring Boot为我们提供了大量的默认配置,我们可以重写这些配置参数的值,并提供了多种方式去重写(覆盖),且重写方式之间是有优先级的
  • Spring Boot应用可以在不同的位置加载配置文件application.properties(yml),并且这些位置是有顺序、优先级的
  • Spring Boot的参数之间可以通过占位符引用,而且还可以通过占位符实现命令行参数名字的简化
  • Spring Boot可以支持自定义参数
  • Spring Boot可以支持自定义配置文件名
  • Spring Boot可以支持多配置文件的切换,通过application-{profile}.properties(yml)激活profile,且有多种方式去激活profile

如果你知道了上述总结的具体内容,那么这一博文你也差不多都了解了。

5 参考资料

本文属于原创,转载注明出处,欢迎关注CSDNfreeape或微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net

查看原文

赞 0 收藏 0 评论 0

Ethan 回答了问题 · 2019-11-21

新版kafka消费者、生产者配置为何使用bootstrap-servers而不是zookeeper服务器地址?

因为zookeeper要被替代了,kafka团队不想再依赖zk了,目前,Kafka使用ZooKeeper来存储分区和代理的元数据,并选择一个Broker作为Kafka控制器,而希望通过删除对ZooKeeper的依赖,将使Kafka能够以一种更具伸缩性和健壮性的方式管理元数据,实现对更多分区的支持,它还将简化Kafka的部署和配置。但是目前我们还是需要Zookeeper(最新版kafka是2.3.1)。 参见 https://cwiki.apache.org/conf...

关注 5 回答 2

认证与成就

  • 获得 4 次点赞
  • 获得 4 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 4 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

  • WxComment

    🏷️WxComment是一个微信小程序(Mini Program of WeChat)的评论插件,结合BaaS提供商https://leancloud.cn ,无需其他另外的个人或者云服务器,就可以免费使用。解决了需要个人去注册域名、备案、购买云服务器的繁杂问题。适用于个人博客或者个人小程序需要评论、点赞、阅读量统计的用户。📑

注册于 2019-09-27
个人主页被 161 人浏览