在流计算引擎中如何将累计量转化成当期量

请教一下,如果我们订阅的level2行情流表中的成交量和成交额都是累计值,用时间序列聚合引擎如何得到每分钟内的成交额和成交量呢?有什么比较方便的方法吗?
比如我输入下面数据:

sym     timestamp                cumVolume    cumAmount
IBM     2020.06.10T09:30:00.000    100        1,000
MSFT    2020.06.10T09:30:00.000    120        1,200
GOOG    2020.06.10T09:30:00.000    100        1,000
IBM     2020.06.10T09:30:03.000    150        1,600
MSFT    2020.06.10T09:30:03.000    220        2,200
GOOG    2020.06.10T09:30:03.000    200        1,900
IBM     2020.06.10T09:30:06.000    200        2,100
MSFT    2020.06.10T09:30:06.000    300        3,100
GOOG    2020.06.10T09:30:06.000    250        2,500
IBM     2020.06.10T09:30:09.000    240        2,500
MSFT    2020.06.10T09:30:09.000    370        3,700
GOOG    2020.06.10T09:30:09.000    310        3,200

得到下面结果:

sym     timestamp                volume     amount
GOOG    2020.06.10T09:30:00.000        
IBM     2020.06.10T09:30:00.000        
MSFT    2020.06.10T09:30:00.000        
GOOG    2020.06.10T09:30:03.000    100        900
IBM     2020.06.10T09:30:03.000    50         600
MSFT    2020.06.10T09:30:03.000    100        1,000
GOOG    2020.06.10T09:30:06.000    50         600
GOOG    2020.06.10T09:30:09.000    60         700
IBM     2020.06.10T09:30:06.000    50         500
IBM     2020.06.10T09:30:09.000    40         400
MSFT    2020.06.10T09:30:06.000    80         900
MSFT    2020.06.10T09:30:09.000    70         600
阅读 172
评论
    1 个回答
    • 391

    //步骤一
    //累积量实时预处理,此函数定义放到createSubStreamingEnv函数的前面
    def calcTradeVolumeAndAmount(mutable dictVolume, mutable dictAmount, mutable tsAggrKline, msg){

    t = select sym, timestamp, prev(cumVolume) as prevCumVolume, prev(cumAmount) as prevCumAmount from msg context by sym
    update t set prevCumVolume = dictVolume[sym], prevCumAmount = dictAmount[sym] where isNull(prevCumVolume)
    //update dictVolume and dictAmount with most recent values
    snapshot = select sym, cumVolume, cumAmount from msg context by sym limit -1
    dictVolume[snapshot.sym] = snapshot.cumVolume
    dictAmount[snapshot.sym] = snapshot.cumAmount
    //append message to tsAggrKline
    tsAggrKline.append!(select sym, timestamp, cumVolume - prevCumVolume as volume, cumAmount - prevCumAmount as amount from t)

    }

    //步骤二
    //定义一个字典用于保存每一个symbol前一次的聚合值,在预处理函数中用到
    dictVol = dict(STRING, DOUBLE)
    dictAmount = dict(STRING, DOUBLE)

    //步骤三
    //在订阅写入聚合引擎部分,调用预处理函数写入聚合引擎,增加一个参数传入字典
    def createSubStreamingEnv(dbPath,tbName,userName,userPass,mutable dictVol){

    //创建时序数据聚合引擎tsAggrKline
    ......
    //订阅流数据写入聚合引擎
    subscribeTable(,"Trade", "MinuteK", 0, calcTradeVolumeAndAmount{dictVol, dictAmount, tsAggrKline}, true)
    ......
    

    }
    //步骤四
    //在建立订阅环境的入口函数最后增加一个参数传入字典
    createSubStreamingEnv(dbPath, tbName, userName, userPass, dictVol, dictAmount)

      撰写回答

      登录后参与交流、获取后续更新提醒