我用eggjs 尝试连接kafka?

我用eggjs 尝试连接kafka,做到topic对应app/kafka下的文件名,最终执行文件名下的所有的方法。,想在app目录下独立一个目录去处理业务,最开始我设想用loadController,kafka在触发onmessage的时候找不到对应的controller方法,最后用的比较粗暴的方式解决。不知道有没有问题,有没有可以指点一下的。代码如下:

load kafka:

//kafka-load
import * as path from 'path'
export default app => {
    let dirs = app.loader
        .getLoadUnits()
        .map(unit => path.join(unit.path, 'app', 'kafka'))

    app.kafka = app.kafka || {}
    new app.loader.FileLoader({
        directory: dirs,
        target: app.kafka,
        initializer: (kafka, opts) => {
            const fileName = path.basename(opts.path, path.extname(opts.path))
            Object.keys(kafka).map(action => {
                if (!app.kafka[fileName]) {
                    app.kafka[fileName] = {}
                }
                app.kafka[fileName][action] = kafka[action]
            })
            return null
        }
    }).load()
}


kafka connect

//kafka connect
import * as Kafka from 'kafka-node'
import { KafkaConfig } from '../config/config.d'
import load from './kafkaLoad'

export default app => {
    load(app)
    const config: KafkaConfig = app.config.kafka
    const zookeepers = config.host.join(',')
    const client = new Kafka.Client(zookeepers, config.clientId)
    const consumer = new Kafka.Consumer(client, config.topics, config.options)
    const topics = config.topics.map(item => item.topic)

    consumer.on('message', message => {
        const topicConsumers = app.kafka[message.topic]
        if (topicConsumers) {
            Object.keys(topicConsumers).map(name =>
                topicConsumers[name].call(app, message.value)
            )
        }
        app.logger.info(
            `[egg-kafka] Receive producer message`,
            JSON.stringify(message)
        )
    })

    consumer.on('error', error => {
        app.coreLogger.error(`[egg-kafka] init instance error`, error)
    })

    app.beforeStart(() => {
        app.coreLogger.info(
            `[egg-kafka] init instance success ,host@${zookeepers} -----> topic@${topics}`
        )
    })
}

kafka controller

export interface TopicNodejsMethods {
    test1(message: { [key: string]: any }): Promise<any>
    test2(message: { [key: string]: any }): Promise<any>
}

export type TopicNodejs = Application & TopicNodejsMethods

export default {
    async test1(message) {

        this.io.of('/').emit('passAlarm',message)
    },
    async test2(message) {
        console.log(message)
    }
} as TopicNodejs

阅读 3.6k
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题