• 0
  • 新人请关照

logstash 同步一对多对多的关系数据如何处理啊

`input {

jdbc {
    jdbc_driver_library => "F:/ELK/mysql-connector-java-5.1.46.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/newmedtiondb?useUnicode=true&useSSL=false"
    jdbc_user => "root"
    jdbc_password => "rS3m$6Bp"
    jdbc_default_timezone => "Asia/Shanghai"

    jdbc_pool_timeout => 5
    jdbc_validate_connection => true
    jdbc_validation_timeout => 3600
    connection_retry_attempts => 3
    connection_retry_attempts_wait_time => 3

    id => "jdbc-meeting"
    schedule => "* * * * *"
    statement => "select * from xxx,xxx,xxx
    "
    lowercase_column_names => false

    # clean_run => true
    record_last_run => true
    last_run_metadata_path => "syncpoint_table/meeting"
    use_column_value => true
    tracking_column => "startTime"
    tracking_column_type => "timestamp"
}

}

filter {

aggregate {
    task_id => "%{id}"
    code => "
        map['id'] = event.get('id')
        map['meetingName'] = event.get('meetingName')
        map['meetingShortName'] = event.get('meetingShortName')
        map['meetingSchedule'] = event.get('meetingSchedule')
        map['meetingAddress'] = event.get('meetingAddress')
        map['keyDay'] = event.get('keyDay')
        map['priority'] = event.get('priority')
        map['startTime'] = event.get('startTime')
        map['endTime'] = event.get('endTime')
        map['publishStatus'] = event.get('publishStatus')
        map['recommend'] = event.get('recommend')
        map['latest'] = event.get('latest')
        map['applyProxy'] = event.get('applyProxy')
        map['hostOrganizer'] = event.get('hostOrganizer')
        map['holdOrganizer'] = event.get('holdOrganizer')
        map['remarks'] = event.get('remarks')
        map['description'] = event.get('description')
        map['creator'] = event.get('creator')
        map['createDate'] = event.get('createDate')
        map['updator'] = event.get('updator')
        map['updateDate'] = event.get('updateDate')
        map['stick'] = event.get('stick')
        map['vote'] = event.get('vote')
        map['canRegist'] = event.get('canRegist')
        map['provideAccommodation'] = event.get('provideAccommodation')
        map['keywords'] = event.get('keywords')
        map['meetingSpeciality'] = event.get('meetingSpeciality')
        map['qrcode'] = event.get('qrcode')
        map['isIntervention'] = event.get('isIntervention')
        map['siteType'] = event.get('siteType')
        map['code'] = event.get('code')
        map['hotIndex'] = event.get('hotIndex')
        map['hits'] = event.get('hits')
        map['province'] = event.get('province')
        map['city'] = event.get('city')
        
        map['views'] = event.get('views')

        map['meetingDetail'] = {
            'id' => event.get('meetingDetail.id'),
            'mainPic' => event.get('meetingDetail.mainPic'),
            'titlePic' => event.get('meetingDetail.titlePic'),
            'twoDimensionCode' => event.get('meetingDetail.twoDimensionCode'),
            'trafficInfo' => event.get('meetingDetail.trafficInfo'),
            'diningInfo' => event.get('meetingDetail.diningInfo'),
            'creditPoint' => event.get('meetingDetail.creditPoint'),
            'noticeInfo' => event.get('meetingDetail.noticeInfo'),
            'noticeAttach' => event.get('meetingDetail.noticeAttach'),
            'venuePic' => event.get('meetingDetail.venuePic'),
            'inviteAttach' => event.get('meetingDetail.inviteAttach'),
            'pptAuthorize' => event.get('meetingDetail.pptAuthorize'),
            'vedioAuthorize' => event.get('meetingDetail.vedioAuthorize'),            
            'remarks' => event.get('meetingDetail.remarks')
        }
        map['meetingAuthorIds'] ||= []    
        
        
        map['meetingAgendaIds'] ||= []                                    
        map['meetingFields.meetingAgenda'] ||= []
        
        map['meetingFieldsIds'] ||= []
        map['meetingFields'] ||= []
        
         map['fieldAgentAuthorIds'] ||= []
    
        if (event.get('meetingFields.id') != nil)
            if !(map['fieldAgentAuthorIds'].include? event.get('meetingFields.id'))                   
                   map['meetingFieldsIds'] << event.get('meetingFields.id')                                                
                map['meetingFields'] << {
                    'id' => event.get('meetingFields.id'),
                    'meetingInfoId' => event.get('meetingFields.meetingInfoId'),
                    'meetingSubject' => event.get('meetingFields.meetingSubject'),
                    'holder' => event.get('meetingFields.holder'),
                    'startTime' => event.get('meetingFields.startTime'),
                    'endTime' => event.get('meetingFields.endTime'),
                    'time' => event.get('time'),
                    'videoCode' => event.get('meetingFields.videoCode'),
                    'pushUrl' => event.get('meetingFields.pushUrl'),
                    'address' => event.get('meetingFields.address'),
                    'weight' => event.get('meetingFields.weight'),
                    'gdLng' => event.get('meetingFields.gdLng'),
                    'gdLat' => event.get('meetingFields.gdLat'),
                    'bdLng' => event.get('meetingFields.bdLng'),
                    'bdLat' => event.get('meetingFields.bdLat'),
                    'remarks' => event.get('meetingFields.remarks'),
                    'creator' => event.get('meetingFields.creator'),
                    'createDate' => event.get('meetingFields.createDate'),
                    'updator' => event.get('meetingFields.updator'),
                    'updateDate' => event.get('meetingFields.updateDate'),
                    'comments' => event.get('meetingFields.comments'),
                    'views' => event.get('meetingFields.views'),
                    'chatroomId' => event.get('meetingFields.chatroomId'),
                    'isLogin' => event.get('meetingFields.isLogin'),
                    'isRealnameAuthentication' => event.get('meetingFields.isRealnameAuthentication'),
                    'messageCount' => event.get('meetingFields.messageCount'),
                    'likes' => event.get('meetingFields.likes'),
                    'iscomment' => event.get('meetingFields.iscomment'),
                    'condition' => event.get('meetingFields.condition'),
                    'recordStatus' => event.get('meetingFields.recordStatus'),
                    'manualStatus' => event.get('meetingFields.manualStatus'),                        
                    'auditMethod' => event.get('meetingFields.auditMethod'),

                    'meetingAgenda' => [] << {
                        'id' => event.get('meetingAgenda.id'),
                        'meetingFieldsId' => event.get('meetingAgenda.meetingFieldsId'),
                        'parentId' => event.get('meetingAgenda.parentId'),
                        'type' => event.get('meetingAgenda.type'),
                        'authorIds' => event.get('meetingAgenda.authorIds'),
                        'holder' => event.get('meetingAgenda.holder'),
                        'theme' => event.get('meetingAgenda.theme'),
                        'startTime' => event.get('meetingAgenda.startTime'),
                        'endTime' => event.get('meetingAgenda.endTime'),
                        'vid' => event.get('meetingAgenda.vid'),
                        'links' => event.get('meetingAgenda.links'),
                        'seq' => event.get('meetingAgenda.seq'),
                        'creator' => event.get('meetingAgenda.creator'),
                        'createDate' => event.get('meetingAgenda.createDate'),
                        'updator' => event.get('meetingAgenda.updator'),
                        'updateDate' => event.get('meetingAgenda.updateDate'),
                        'comments' => event.get('meetingAgenda.comments'),
                        'views' => event.get('meetingAgenda.views'),
                        'agendaIslogin' => event.get('meetingAgenda.agendaIslogin'),
                        'agendaIsrealnameAuthentication' => event.get('meetingAgenda.agendaIsrealnameAuthentication'),
                        'condition' => event.get('meetingAgenda.condition'),
                        
                        'authors' => [] << {
                            'id' => event.get('author.id'),
                            'userId' => event.get('author.userId'),
                            'authorname' => event.get('author.authorname'),
                            'gender' => event.get('author.gender'),
                            'title' => event.get('author.title'),
                            'master' => event.get('author.master'),
                            'company' => event.get('author.company'),
                            'description' => event.get('author.description'),
                            'autorUrl' => event.get('author.autorUrl'),
                            'headImage' => event.get('author.headImage'),
                            'priority' => event.get('author.priority'),
                            'creator' => event.get('author.creator'),
                            'createDate' => event.get('author.createDate'),
                            'updator' => event.get('author.updator'),
                            'updateDate' => event.get('author.updateDate')
                        
                        }
                    }                                                                                        
                }
                
            end
            
        end
        map['meetingAgendaIds'] ||= []
        map['meetingAgenda'] ||= []
        if (event.get('meetingAgenda.id') != nil)
            if !(map['meetingAgendaIds'].include? event.get('meetingAgenda.id'))
                map['meetingAgendaIds'] << event.get('meetingAgenda.id')
                map['meetingAgenda'] << {
                    'id' => event.get('meetingAgenda.id'),
                    'meetingFieldsId' => event.get('meetingAgenda.meetingFieldsId'),
                    'parentId' => event.get('meetingAgenda.parentId'),
                    'type' => event.get('meetingAgenda.type'),
                    'authorIds' => event.get('meetingAgenda.authorIds'),
                    'holder' => event.get('meetingAgenda.holder'),
                    'theme' => event.get('meetingAgenda.theme'),
                    'startTime' => event.get('meetingAgenda.startTime'),
                    'endTime' => event.get('meetingAgenda.endTime'),
                    'vid' => event.get('meetingAgenda.vid'),
                    'links' => event.get('meetingAgenda.links'),
                    'seq' => event.get('meetingAgenda.seq'),
                    'creator' => event.get('meetingAgenda.creator'),
                    'createDate' => event.get('meetingAgenda.createDate'),
                    'updator' => event.get('meetingAgenda.updator'),
                    'updateDate' => event.get('meetingAgenda.updateDate'),
                    'comments' => event.get('meetingAgenda.comments'),
                    'views' => event.get('meetingAgenda.views'),
                    'agendaIslogin' => event.get('meetingAgenda.agendaIslogin'),
                    'agendaIsrealnameAuthentication' => event.get('meetingAgenda.agendaIsrealnameAuthentication'),
                    'condition' => event.get('meetingAgenda.condition')                                    
                }
            end
        end                    
        
        event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 5
}

mutate {
    # remove_field =>["@version", "meetingFieldsIds", "meetingAgendaIds", "authorIds"]
    remove_field =>["@version"]
}

}

output {

elasticsearch {
    id => "elasticsearch-meeting"
    hosts => ["127.0.0.1:9200"]
    index => "testmeeting"
    document_type => "testmeeting"
    document_id => "%{id}"
}

}`

这是我写的配置文件,但是无论怎么做,级联关系(二级数组)中的数据始终只有一条,我知道原因,是因为上面做了重复判断,但是不做重复判断的话又会出现重复情况,请问大神们这个地方怎么处理啊

数据表结构:
logstash处理宽表.png

阅读 639
评论
    0 个回答
    撰写回答

    登录后参与交流、获取后续更新提醒