appendMsg

语法

appendMsg(engine, msgBody, msgId)

参数

engine 是内置流数据引擎,即 createReactiveStateEngine 等函数返回的抽象表对象。

msgBody 是将要写入流数据引擎的消息。

msgId 是写入数据之前,流数据引擎已接收到的的最后一条消息的ID。ID从订阅发布的第一条消息开始计数。

详情

当流数据引擎启用快照机制(snapshot)且未开启RaftGroup时,订阅函数( subscribeTable )的handler参数必须为appendMsg函数,将数据写入流数据引擎。

例子

$ share streamTable(10000:0,`time`sym`price, [TIMESTAMP,SYMBOL,DOUBLE]) as trades
$ output1 =table(10000:0, `time`sym`avgPrice, [TIMESTAMP,SYMBOL,DOUBLE]);

$ engine1 = createTimeSeriesEngine(name=`engine1, windowSize=100, step=50, metrics=<avg(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time, keyColumn=`sym, snapshotDir="C:/DolphinDB/Data/snapshotDir", snapshotIntervalInMsgCount=100)
$ subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=appendMsg{engine1}, msgAsTable=true, handlerNeedMsgId=true)

$ n=500
$ timev=2021.03.12T15:00:00.000 + (1..n join 1..n)
$ symv = take(`A, n) join take(`B, n)
$ pricev = (100+cumsum(rand(1.0,n)-0.5)) join (200+cumsum(rand(1.0,n)-0.5))
$ t=table(timev as time, symv as sym, pricev as price).sortBy!(`time)
$ trades.append!(t)

$ select * from output1