createDailyTimeSeriesEngine

New in version 1.30.9.

语法

createDailyTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill=’none’], [sessionBegin], [sessionEnd], [mergeSessionEnd=false], [forceTriggerTime], [raftGroup], [forceTriggerSessionEndTime], [keyPurgeFreqInSec], [closed=’left’], [outputElapsedMicroseconds=false])

详情

创建流数据日级时间序列引擎。日级时间序列引擎和时间序列引擎窗口划分和计算规则基本一致,但在此基础上做了如下拓展:

  • 该引擎只能在一个自然日的指定时间段内(以下统称为 session)进行窗口的聚合计算。一个自然日内,可以指定多个 session, 如 9:00-12:00,13:00-15:00 等。

  • 出现在一个 session 开始之前的数据,日级时序引擎规定将该部分数据合入该 session 的第一个窗口进行计算。

  • 当日最后一个 session 后到来的数据将被丢弃,不会计入第二天的第一个窗口中。

若指定了 keyColumn 进行分组,则上述计算将在各分组内独立进行。

更多流数据引擎的应用场景说明可以参考 流数据引擎

参数

该引擎是基于时间序列引擎进行的扩展,继承了 createTimeSeriesEngine 所有的参数,请参照 createTimeSeriesEngine 中参数介绍。这里仅介绍与时间序列引擎不同的参数:

sessionBegin 为可选参数,可以是与时间列的数据类型对应的 SECOND、TIME 或 NANOTIME 类型的标量或向量,表示每个时间段的起始时刻。如果 sessionBegin 是一个向量,它必须是递增的。

sessionEnd 为可选参数,可以是与时间列的数据类型对应的 SECOND、TIME 或 NANOTIME 类型的标量或向量,表示每个时间段的结束时刻。可在 sessionEnd 中指定 00:00:00 表示的次日的零点(即当日的 24:00:00)。

mergeSessionEnd 为可选参数,是一个布尔值。当 close = ‘left’ 时,表示每个 session 结束时刻(规整后)的数据是否合入最后一个窗口。默认值为 false,此时该条数据不会合入当前 session 的最后一个窗口,但可以触发最后一个窗口的计算;如果当前 session 不是该自然日内最后一个 session,则该数据会合入下个 session 的第一个窗口。

New in version 1.30.17: 参数 forceTriggerSessionEndTime

forceTriggerSessionEndTime 可选参数,正整数,单位与 timeColumn 的时间精度一致。若 sessionEnd 时刻对应的窗口数据长时间未发生计算,通过该参数可以设置系统经过多少时间后触发计算并输出。若不指定 fill ,未包含在该窗口内的分组不会输出结果;若指定了 fill ,未包含在该窗口内的分组会按照 fill 指定的方式输出结果。

注:

  • 系统会对数据窗口的起始时间(sessionBeginsessionEnd)进行规整。

例子

$ share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
$ output1 = keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT])
$ engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=2, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true)
$ subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

$ insert into trades values(2018.10.08,09:25:31,`A,8)
$ insert into trades values(2018.10.08,09:26:01,`B,10)
$ insert into trades values(2018.10.08,09:30:02,`A,26)
$ insert into trades values(2018.10.08,09:30:10,`B,14)
$ insert into trades values(2018.10.08,11:29:46,`A,30)
$ insert into trades values(2018.10.08,11:29:50,`B,11)
$ insert into trades values(2018.10.08,11:30:00,`A,14)
$ insert into trades values(2018.10.08,11:30:00,`B,4)
$ insert into trades values(2018.10.08,13:00:10,`A,16)
$ insert into trades values(2018.10.08,13:00:12,`B,9)
$ insert into trades values(2018.10.08,14:59:56,`A,20)
$ insert into trades values(2018.10.08,14:59:58,`B,20)
$ insert into trades values(2018.10.08,15:00:00,`A,10)
$ insert into trades values(2018.10.08,15:00:00,`B,29)

$ sleep(1000)
$ select * from output1

time

sym

sumVolume

2018.10.08T09:31:00

A

34

2018.10.08T09:31:00

B

24

2018.10.08T11:30:00

A

44

2018.10.08T11:30:00

B

15

2018.10.08T13:01:00

A

16

2018.10.08T13:01:00

B

9

2018.10.08T15:00:00

A

30

2018.10.08T15:00:00

B

49

$ share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
$ output1 = keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT])
$ engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true,forceTriggerSessionEndTime=10)
$ subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

$ insert into trades values(date(now()),09:25:31,`A,8)
$ insert into trades values(date(now()),09:26:01,`B,10)
$ insert into trades values(date(now()),09:30:02,`A,26)
$ insert into trades values(date(now()),09:30:10,`B,14)
$ insert into trades values(date(now()),11:29:46,`A,30)
$ insert into trades values(date(now()),11:29:50,`B,11)
$ insert into trades values(date(now()),11:30:00,`B,14)
$ insert into trades values(date(now()),11:30:01,`A,4)

$ select * from output1

time

sym

sumVolume

2022.03.24T09:31:00

A

34

2022.03.24T09:31:00

B

24

2022.03.24T11:30:00

A

30

设置 forceTriggerSessionEndTime = 10,则系统到达 11:30:00 后,再经过 10s 就会触发右边界为11:30:00的窗口内数据的计算。

$ sleep(10000)
$ select * from output1

time

sym

sumVolume

2022.03.24T09:31:00

A

34

2022.03.24T09:31:00

B

24

2022.03.24T11:30:00

A

30

2022.03.03T11:30:00

B

25