请教一下,如果我们订阅的level2行情流表中的成交量和成交额都是累计值,用时间序列聚合引擎如何得到每分钟内的成交额和成交量呢?有什么比较方便的方法吗?
比如我输入下面数据:
sym timestamp cumVolume cumAmount
IBM 2020.06.10T09:30:00.000 100 1,000
MSFT 2020.06.10T09:30:00.000 120 1,200
GOOG 2020.06.10T09:30:00.000 100 1,000
IBM 2020.06.10T09:30:03.000 150 1,600
MSFT 2020.06.10T09:30:03.000 220 2,200
GOOG 2020.06.10T09:30:03.000 200 1,900
IBM 2020.06.10T09:30:06.000 200 2,100
MSFT 2020.06.10T09:30:06.000 300 3,100
GOOG 2020.06.10T09:30:06.000 250 2,500
IBM 2020.06.10T09:30:09.000 240 2,500
MSFT 2020.06.10T09:30:09.000 370 3,700
GOOG 2020.06.10T09:30:09.000 310 3,200
得到下面结果:
sym timestamp volume amount
GOOG 2020.06.10T09:30:00.000
IBM 2020.06.10T09:30:00.000
MSFT 2020.06.10T09:30:00.000
GOOG 2020.06.10T09:30:03.000 100 900
IBM 2020.06.10T09:30:03.000 50 600
MSFT 2020.06.10T09:30:03.000 100 1,000
GOOG 2020.06.10T09:30:06.000 50 600
GOOG 2020.06.10T09:30:09.000 60 700
IBM 2020.06.10T09:30:06.000 50 500
IBM 2020.06.10T09:30:09.000 40 400
MSFT 2020.06.10T09:30:06.000 80 900
MSFT 2020.06.10T09:30:09.000 70 600
//步骤一
//累积量实时预处理,此函数定义放到createSubStreamingEnv函数的前面
def calcTradeVolumeAndAmount(mutable dictVolume, mutable dictAmount, mutable tsAggrKline, msg){
}
//步骤二
//定义一个字典用于保存每一个symbol前一次的聚合值,在预处理函数中用到
dictVol = dict(STRING, DOUBLE)
dictAmount = dict(STRING, DOUBLE)
//步骤三
//在订阅写入聚合引擎部分,调用预处理函数写入聚合引擎,增加一个参数传入字典
def createSubStreamingEnv(dbPath,tbName,userName,userPass,mutable dictVol){
}
//步骤四
//在建立订阅环境的入口函数最后增加一个参数传入字典
createSubStreamingEnv(dbPath, tbName, userName, userPass, dictVol, dictAmount)