在使用DolphinDB流数据横截面聚合引擎时用interval模式触发计算,我的代码如下:
//清理环境
def clearEnv(){
unsubscribeTable(server="", tableName="trades", actionName="tradesCrossAggregator") //取消订阅
dropAggregator("CrossSectionalDemo") //取消聚合引擎调用
undef("trades", SHARED) //删除共享表
undef("outputTable", SHARED)
}
//创建横截面引擎并订阅
def createSub(){
share(streamTable(10:0, `time`sym`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT]), "trades") //创建共享的流数据表
share(table(1:0, `updateTime`maxQty`maxDollarVolume`sumDollarVolume`count, [TIMESTAMP, INT, DOUBLE, DOUBLE, INT]), "outputTable") //输出表,保存计算结果
tradesCrossAggregator = createCrossSectionalAggregator(name="CrossSectionalDemo", metrics=<[max(qty), max(price*qty), sum(price*qty), count(price)]>, dummyTable=objByName("trades"), outputTable=objByName("outputTable"), keyColumn="sym", triggeringPattern="interval", triggeringInterval=1000, useSystemTime=true) //创建横截面引擎
subscribeTable(server="", tableName="trades", actionName="tradesCrossAggregator", offset=-1, handler=append!{tradesCrossAggregator}, msgAsTable=true)
}
//定时向表trades写入数据,总记录为3*n
def writeData(n, trades){
for (i in 0:n) {
timev = take(now(), 3)
symv = rand(`A`B`C, 3)
pricev = rand(10.0, 3)
qtyv = rand(10 20 30 , 3)
insert into trades values(timev, symv, pricev, qtyv)
sleep(1000)
}
}
login("admin", "123456")
//clearEnv()
//undef all
createSub()
submitJob("jobId", "writeDataTrades", writeData{20, objByName("trades")})
select * from outputTable
为什么已经停止向trades表写入数据,outputTable表还会不断有新的计算结果进来?
在"interval"模式下,无论是否有新的数据写入,计算均会定时触发。
可以通过取消聚合引擎调用停止触发计算: