使用 logstash-input-jdbc 同步 mysql 数据 至 ES 最后一条数据未能保存

问题描述

我在使用 logstash-input-jdbc 插件 同步 MySQL 数据至 ES 的时候 SQL 查询满足的条件的数据有 13 条
但是保存至 ES 时 只有 12 条了 我在表里新增一条数据同步的是上次最后一条没有保存的数据,还有我在终端退出 (Ctrl + c) 到时候 它也会把最后一条记录先写入 ES 再退出进程

执行输出的日志:

clipboard.png

当我 Ctrl + c 退出 logstash 时产生一条日志

[2018-10-31T11:40:07,422][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}
{"name":"Kimi","id":14,"todos":[],"todo_list":[],"tags":["_aggregatefinalflush"]}

这条日志说明 logstash 又帮我把那条没有存入的记录写入ES才退出的 但是数据上多了一个 tags 字段 标注为 _aggregatefinalflush

问题出现的平台版本

ElasticSearch 版本 -> 5.5.2
Logstash 版本 -> 5.5.2

开发测试平台 MacOS Mojave 10.14

相关代码

logstash配置:

input {
    stdin{
    }
    jdbc {
        # Mysql数据库地址
        jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
        
        # 开启连接验证
        jdbc_validate_connection => true
        
        # 数据库用户
        jdbc_user => "root"
        
        # 数据库密码
        jdbc_password => "123456"
        
        # JDBC驱动库
        jdbc_driver_library => "/Users/simon/logstash/mysql-connector-java-5.1.36.jar"
        
        # JDBC 驱动类
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        
        # 声明要导入数据库 SQL 语句 如果要将 SQL 独立存放则使用 statement_filepath 参数
        # statement => "select * from users"
        
        # 存放导入数据 SQL 的 SQL文件
        statement_filepath => "/Users/simon/logstash/mysql_users.sql"
        
        # CronJob 自动定时执行默认一分钟执行一次 类似 Crontab 定时规则
        # 定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
        schedule => "* * * * *"

        # 启用后将导致将 sql 语句分解为多个查询。每个查询将使用限制和偏移量来共同检索完整的结果集。限制大小是用 jdbc_page_size 设置的。
        # jdbc_paging_enabled => true
        # 每页处理条数 jdbc_paging_enabled = true 时生效 默认 100000
        # jdbc_page_size => 1
        
        # ElasticSearch Domcument type 自 ES 6.x 版本开始 output 中不再支持 document_type 配置 7.x版本将彻底废弃
        #type => "users"

        # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
        record_last_run => "true"
        last_run_metadata_path => "/Users/simon/logstash/sync_last_id"

        # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        # clean_run => "false"

        # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值
        use_column_value => true

        # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
        tracking_column => "id"

        # 是否将 字段(column) 名称转小写
        #lowercase_column_names => "false"
    }
}

filter {
    aggregate {
        task_id => "%{id}"
        code => "
            #代码注释
            map['id'] = event.get('id')
            map['name'] = event.get('name')
            map['todo_list'] ||=[]
            map['todos'] ||=[]

            if (event.get('todo_id') != nil)
                if !(map['todo_list'].include? event.get('todo_id'))
                    map['todo_list'] << event.get('todo_id')        
                    map['todos'] << {
                        'todo_id' => event.get('todo_id'),
                        'title' => event.get('text'),
                    }
                end
            end

            event.cancel()
        "
        push_previous_map_as_event => true
    }
    
    json {
        source => "message"
        remove_field => ["message"]
        #remove_field => ["message", "type", "@timestamp", "@version"]
    }
    mutate  {
        #将不需要的JSON字段过滤,且不会被存入 ES 中
        remove_field => ["@timestamp", "@version"]
    }
}

# 从 MySQL导入数据到 ElasticSearch 保存
output {
    elasticsearch {
        # ES 服务URL地址
        hosts => ["127.0.0.1:9200"]
        
        # ES 索引名称
        index => "mysql_users"

        # document_type 自 ES 6.x 版本开始 output 中不再支持 document_type 配置 7.x版本将彻底废弃
        document_type => "users"
        
        # 文档ID 对应数据库 ID
        document_id => "%{id}"

        codec => "json"
    }
    stdout {
        codec => json_lines
    }
}

执行的 SQL mysql_users.sql 内容:

SELECT 
`users`.`id` AS `id`,
`users`.`name` AS `name`,
`todo`.`id` AS `todo_id`,
IFNULL(`todo`.`text`, "") AS `text`,
IFNULL(`todo`.`is_done`, 0) AS `is_done`,
`todo`.`user_id` AS `user_id`
FROM `users` 
LEFT JOIN `todo` ON  `users`.`id` = `todo`.`user_id`
WHERE `users`.`id` > :sql_last_value
ORDER BY `id` ASC

数据库建表脚本以及测试数据:

DROP TABLE IF EXISTS `todo`;

CREATE TABLE `todo` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `text` varchar(255) NOT NULL DEFAULT '' COMMENT '任务文本',
  `is_done` tinyint(3) DEFAULT '0' COMMENT '是否完成',
  `user_id` int(11) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

LOCK TABLES `todo` WRITE;
/*!40000 ALTER TABLE `todo` DISABLE KEYS */;

INSERT INTO `todo` (`id`, `text`, `is_done`, `user_id`)
VALUES
    (3,'bbbbb',0,1),
    (4,'cccccc',0,1),
    (5,'人生自古谁无死,留取丹心照汗青',0,2),
    (6,'来自Vue的问候',0,2),
    (7,'Hello world',0,11),
    (8,'举头望明月,低头思故乡',0,11),
    (10,'我欲乘风归去,又恐琼楼玉宇',0,1),
    (11,'朝辞白帝彩云间,千里江陵一日还',0,1),
    (12,'在天愿作比翼鸟,在地愿做连理枝',0,9),
    (13,'天长地久有时尽,此恨绵绵无绝期',0,9);

/*!40000 ALTER TABLE `todo` ENABLE KEYS */;
UNLOCK TABLES;

DROP TABLE IF EXISTS `users`;

CREATE TABLE `users` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT '',
  `version` int(11) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

LOCK TABLES `users` WRITE;
/*!40000 ALTER TABLE `users` DISABLE KEYS */;

INSERT INTO `users` (`id`, `name`, `version`)
VALUES
    (1,'Simon',0),
    (2,'Jerry',0),
    (4,'Jim',0),
    (5,'Mary',0),
    (6,'Amy',0),
    (7,'Kaiven',0),
    (8,'Bell',0),
    (9,'Sky',0),
    (10,'Sam',0),
    (11,'Lily',0),
    (12,'Lucy',0),
    (13,'David',0),
    (14,'Kimi',0);

/*!40000 ALTER TABLE `users` ENABLE KEYS */;
UNLOCK TABLES;

你期待的结果是什么?实际看到的错误信息又是什么?

我希望最后那条数据也能试试同步进入 ES 而不是要等进程结束再写入

执行过程中正常,没有错误提示与警告

阅读 6.4k
2 个回答
push_previous_map_as_event => false

再次感谢 @博弈 大神的解答。
这里对该问题进行一个简要的总结,filter aggregate 创建中 event map 并不知道我这次事件是不是应该结束,也就是它也不知道我到那一条才是最后一条, 因此 设置一个 timeout 告诉它 这个时间执行多少秒就结束继续执行第二个 但这样并不是很严谨 因为你也不确定你的 event map 到底要执行多久 因此我返回去看了下官方的文档 最好的方式是 我们应该给定一个 task end 的条件 ES官网关于 aggregate 的说明

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
宣传栏