createTimeSeriesEngine

语法

createTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill=’none’], [forceTriggerTime], [raftGroup], [keyPurgeFreqInSec], [closed=’left’], [outputElapsedMicroseconds=false])

别名:createTimeSeriesAggregator

详情

创建流数据时间序列引擎,以实现基于时间的滑动窗口或滚动窗口进行实时计算。返回一个表对象,通过向该表对象写入,将数据注入时间序列引擎进行计算。

时序引擎的聚合计算的算子分为增量计算和全量计算两种。增量计算算子不会保留历史数据,每次有数据进来就会进行增量计算;而全量计算算子(例如自定义的聚合函数,或未经优化的内置聚合函数,亦或是嵌套了状态函数的函数)会保留窗口内完整的数据,待触发输出时进行全量计算。

时序引擎对以下聚合计算算子进行了优化,实现了增量计算,显著提升了性能:corr, covar, first, last, max, med, min, percentile, quantile, std, var, sum, sum2, sum3, sum4, wavg, wsum, count, firstNot, ifirstNot, lastNot, ilastNot, imax, imin, nunique, prod, sem, mode, searchK。

更多流数据引擎的应用场景说明可以参考 流数据引擎

窗口

  • 窗口边界规整:对起始窗口左边界进行规整。(详情请参考 steproundTime 的参数说明以及 规整规则表

  • 窗口确定:

    • windowSize 确定窗口长度。

    • closed 确定窗口左闭右开或左开右闭。

    • step 确定滑动的步长。

其中,windowSizestep 的单位取决于 useSystemTime 参数。若 useSystemTime = true, windowSizestep 的单位是毫秒。若 useSystemTime = false,windowSizestep 的单位同 timeColumn 列的精度一致。

  • 窗口截取:useSystemTime 参数决定了如何截取窗口,可以基于数据的时间列,亦可以基于数据注入系统的时间进行截取。

计算

  • 计算前提:若指定了 timeColumn,则 timeColumn 的时间必须递增;若同时指定了 keyColumn,则按照分组分别进行滑动窗口计算,timeColumn 在每组内的时间必须递增。否则,时间乱序的数据在计算中会被直接丢弃。

  • 触发规则:若指定了 timeColumn,当前窗口的计算将由该窗口结束后收到的第一条数据触发;若设置 useSystemTime = true,窗口结束后会自动触发计算。

  • 强制触发规则:对长时间未触发计算的窗口数据,可以通过设置 updateTimeforceTriggerTime 强制触发计算。具体规则请参考参数说明。

  • 窗口填充规则:未指定 fill 或指定 fill = “none” 时,只会输出计算结果不为空的窗口;若指定了 fill,则会输出所有窗口,且根据 fill 规则对结果为空的窗口进行填充。

其它功能

  • 支持数据/状态清理:清理引擎中堆积的数据和不再需要的状态信息。(详情请参考 garbageSizekeyPurgeFreqInSec 的参数说明)

  • 快照机制:启用快照机制之后,系统若出现异常,可及时将流数据引擎恢复到最新的快照状态。(详情请参考 snapshotDirsnapshotIntervalInMsgCount 的参数说明)

  • 流数据引擎高可用:若要启用引擎高可用,需在订阅端 raft 组的 leader 节点创建引擎并通过 raftGroup 参数开启高可用。开启高可用后,当 leader 节点宕机时,会自动切换新 leader 节点重新订阅流数据表。(详情请参考 raftGroup 的参数说明)

参数

name 字符串标量,表示时间序列引擎的名称,作为其在一个数据节点/计算节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。

windowSize 正整数标量或向量,表示滑动窗口的长度。

step 正整数标量,表示滑动窗口移动的步长。 windowSize 必须是 step 的整数倍,否则会抛出异常。

windowSizestep 的单位与 useSystemTime 有关:

  • useSystemTime 为 true,则 windowSizestep 的单位为毫秒。

  • useSystemTime 为 false,则 windowSizestep 的单位与 timeColumn 的时间精度一致。

metrics 以元代码的格式表示计算指标,支持输入元组。有关元代码的更多信息可参考 元编程

  • 计算指标可以是一个或多个系统内置或用户自定义的聚合函数(使用 defg 关键字定义),如 <[sum(volume), avg(price)]>;可以对聚合结果使用表达式,如 <[avg(price1)-avg(price2)]>;也可对列与列的计算结果进行聚合计算,如 <[std(price1-price2)]>。

  • metrics 内支持调用具有多个返回值的函数,例如 <func(price) as `col1`col2>(可不指定列名)。

  • windowSize 为向量, windowSize 每个值可对应 metrics 中多个计算指标。例如,windowSize 为[10,20]时,metrics可为 (<[min(volume), max(volume)]>, <sum(volume)>)。 metrics 也可以嵌套输入元组向量。例如:[[<[min(volume), max(volume)]>, <sum(volume)>], [<avg(volume)>]]

注意:metrics 中使用的列名大小写不敏感,不要求与输入表的列名大小写保持一致。然而,如果在 metrics 中有多个相同的列名,则它们的大小写必须保持一致。

dummyTable 一个表对象,和订阅的流数据表的 schema 一致,可以含有数据,亦可为空表。

outputTable 计算结果的输出表,可以是内存表或者分布式表。在使用 createTimeSeriesEngine 函数之前,需要将输出表预先设立为一个空表,并指定各列列名以及数据类型。时间序列引擎会将计算结果插入该表。

输出表的列顺序如下:

(1) 时间列。其中:

  • useSystemTime = true,为 TIMESTAMP 类型;反之,该列数据类型与 timeColumn 列一致。

  • useWindowStartTime = true,显示时间为数据窗口起始时间;反之,显示时间为数据窗口终止时间。

(2) 分组列。如果 keyColumn 不为空,则其后几列和 keyColumn 设置的列及其顺序保持一致。

(3) 耗时列。如果指定 outputElapsedMicroseconds = true,则指定一个 LONG 类型的列用于存储耗时(单位:微秒)。

(4) 计算结果列。可为多列。

timeColumn 可选参数,字符串标量或向量。当 useSystemTime = false 时,必须指定该参数。 该参数用于指定订阅的流数据表中时间列的名称。

请注意,字符串向量,必须是 date 和 time 组成的向量,date 类型为 DATE,time 类型为 TIME, SECOND 或 NANOTIME。此时,输出表第一列的时间类型必须与 concatDateTime(date, time) 的类型一致。

useSystemTime 可选参数,布尔值,表示是否使用数据注入引擎时的系统时间作为时间列进行计算。

  • useSystemTime = true时,时间序列引擎会按照数据注入时间序列引擎的时刻(毫秒精度的本地系统时间,与数据中的时间列无关),每隔固定时间截取固定长度窗口的流数据进行计算。只要一个数据窗口中含有数据,数据窗口结束后就会自动进行计算。结果中的第一列为计算发生的时间戳,与数据中的时间无关。

  • useSystemTime = false(缺省值)时,时间序列引擎根据流数据中的 timeColumn 列来截取数据窗口。一个数据窗口结束后的第一条新数据才会触发该数据窗口的计算。请注意,触发计算的数据并不会参与该次计算。

例如,一个数据窗口从 10:10:10 到 10:10:19。若 useSystemTime = true,则只要该窗口中至少有一条数据,该窗口的计算会在窗口结束后的 10:10:20 触发。若 useSystemTime = false,且 10:10:19 后的第一条数据为 10:10:25,则该窗口的计算会在 10:10:25 触发。

keyColumn 可选参数,字符串标量或向量,表示分组列名。若设置,则分组进行聚合计算,例如以每支股票为一组进行聚合计算。

garbageSize 可选参数,正整数,默认值是 50,000(单位为行)。随着订阅的流数据不断积累,注入时间序列引擎,存放在内存中的数据会越来越多,这时需要清理不再需要的历史数据。当内存中历史数据行数超过 garbageSize 值时,系统会清理本次计算不需要的历史数据。如果指定了 keyColumn,内存清理是各组内独立进行的。当一个组在内存中的历史数据记录数超出 garbageSize 时,会清理该组中本次计算中不需要的历史数据。

注意:对于增量计算算子,系统会自动清理不再需要的历史数据。而对于全量计算算子,需要指定该参数来触发清理历史数据。

updateTime 可选参数,正整数,单位与 timeColumn 的时间精度一致。用于指定比 step 更短的计算时间间隔。step 必须是 updateTime 的整数倍。要设置 updateTimeuseSystemTime 必须设为 false。

如果没有指定 updateTime,一个数据窗口结束前,不会发生对该数据窗口数据的计算。若一个窗口长时间未触发计算,可以指定 updateTime,分多次触发当前窗口数据的计算。

计算触发的规则为:

从当前窗口的左边界开始,每隔 updateTime 时间,若有新的数据到来,则对当前窗口内该数据之前的所有数据进行计算。另外,如果系统经过 2 * updateTime (至少2秒)后仍有未被处理的数据,则触发对当前窗口内所有数据的计算。

若分组计算,则每组内进行上述操作。

指定 updateTime 后,建议使用键值内存表作为输出表。因为每次计算均会增加一条记录,输出表若使用普通内存表或流数据表,则会产生大量带有相同时间戳的结果。因为键值流数据表不可更新记录,输出表亦不推荐使用键值流数据表。

useWindowStartTime 可选参数,布尔值,表示输出表中的时间是否为数据窗口起始时间。默认值为 false,表示输出表中的时间为数据窗口起始时间 + windowSize 。若 windowSize 是向量, useWindowStartTime 必须为 false。

roundTime 可选参数,布尔值,表示若数据时间精度为毫秒或者秒且 step > 一分钟,如何对窗口边界值进行规整处理。默认值为 true,表示按照既定的多分钟规则进行规整。若为 false,则按一分钟规则进行窗口规整(详见 规整规则表)。

若要开启快照机制 (snapshot),必须指定 snapshotDirsnapshotIntervalInMsgCount

snapshotDir 可选参数,字符串,表示保存引擎快照的文件目录。

  • 指定的目录必须存在,否则系统会提示异常。

  • 创建流数据引擎时,如果指定了 snapshotDir,会检查该目录下是否存在快照。如果存在,会加载该快照,恢复引擎的状态。

  • 多个引擎可以指定同一个目录存储快照,用引擎的名称来区分快照文件。

  • 一个引擎的快照可能会使用三个文件名:

    • 临时存储快照信息:文件名为 <engineName>.tmp;

    • 快照生成并刷到磁盘:文件保存为 <engineName>.snapshot;

    • 存在同名快照:旧快照自动重命名为 <engineName>.old。

snapshotIntervalInMsgCount 可选参数,为整数类型,表示每隔多少条数据保存一次流数据引擎快照。

New in version 1.30.6: 参数 fill

fill 可选参数,一个标量或向量,表示若(某个分组的)某个窗口无数据时,如何处理。可取以下值:

  • ‘none’: 不输出结果。

  • ‘null’: 输出结果为 NULL。

  • ‘ffill’: 输出上一个有数据的窗口的结果。

  • ‘具体数值’:该值的数据类型需要和对应的 metrics 计算结果的类型保持一致。

fill 可以输入向量,长度与 metrics 元素个数保持一致,表示为每个 metrics 指定不同的 fill 方式。若为向量,向量中各项只能是 ‘null’, ‘ffill’ 或一个数值,不能是 ‘none’。

New in version 1.30.13: 参数 forceTriggerTime

forceTriggerTime 可选参数,是非负整数,单位与 timeColumn 的时间精度一致。用于强制触发各个分组未计算的窗口进行计算。要设置 forceTriggerTimeuseSystemTime 必须设置为 false,且不能指定 updateTime。 强制触发计算及输出规则如下:

1. 未被触发计算的窗口结束后(窗口结束时刻为 t),若收到了其他分组的数据(时间戳为 t1),且满足 t1-t ≥ forceTriggerTime,则该窗口将被触发计算。

2. 如果某个分组在最后一个窗口被强制触发计算后,没有再收到新的数据,但其他分组仍然收到了新的数据,那么通过 fill 来填充该分组的所有缺失窗口,可以确保在最新时间截面上仍然输出该分组的窗口。如果不指定 fill,则最后一个窗口被触发计算后,该分组不会产生新的窗口。

设置 forceTriggerTimeupdateTime 时需注意以下几点:

  • 设置 updateTime,计算发生后仍有属于当前窗口的数据到达时,当前窗口计算结果会再次更新;

  • 设置 forceTriggerTime,则触发计算之后收到的数据会被丢弃,建议不要设置太小的 forceTriggerTime

New in version 1.30.14: 参数 raftGroup

raftGroup 可选参数,表示流数据高可用订阅端 raft 组的 ID (大于1的整数,由流数据高可用相关的配置项 streamingRaftGroups 指定)。设置该参数表示开启计算引擎高可用。在 leader 节点创建流数据引擎后,会同步在 follower 节点创建该引擎。每次保存的 snapshot 也会同步到 follower。当 raft 组的 leader 节点宕机时,会自动切换新 leader 节点重新订阅流数据表。请注意,若要指定 raftGroup,必须同时指定 snapshotDir

New in version 1.30.18: 参数 keyPurgeFreqInSecclosed

keyPurgeFreqInSec 正整数,表示清理窗口数据为空的分组的时间间隔,单位为秒。指定该参数后,若当前数据注入时间与上一次清理时间的间隔大于等于 keyPurgeFreqInSec,则触发对当前窗口数据为空的分组信息的清理。

注意:若需指定该参数,必须指定 forceTriggerTime,且不能指定 fill

若需要查看清理前后的状态,可以通过调用 getStreamEngineStat 函数查看 TimeSeriesEngine 引擎状态的 numGroups 列,来对比响应式状态引擎清理前后分组数的变化。

closed 字符串,用于确定滑动窗口边界的开闭情况。可选值为 ‘left’ 或 ‘right’,默认值为 ‘left’。

  • closed = ‘left’: 窗口左闭右开。

  • closed = ‘right’: 窗口左开右闭。

New in version 1.30.21: 参数 outputElapsedMicroseconds

outputElapsedMicroseconds 布尔值,表示是否输出每个窗口数据注入到计算完成的耗时(若指定了 keyColumn 则包含数据分组的耗时),默认为 false。指定参数 outputElapsedMicroseconds 后,在定义 outputTable 时需要在时间列和分组列后增加一个 LONG 类型的列,详见 outputTable 参数说明。

规整规则

为了便于观察和对比计算结果,系统会对第一个数据窗口的起始时间进行规整,根据 step 参数、数据的时间精度,以及 roundTime 参数来确定整数类型的规整尺度 alignmentSize。当时间序列引擎使用分组计算时,所有分组的窗口均进行统一的规整。相同时刻的数据窗口在各组均有相同的边界。

  • 若数据的时间精度为分钟,如 MINUTE(HH:mm) 类型,alignmentSize 取值如下:

    若 roundTime = false

    step

    alignmentSize

    0~2

    2

    3

    3

    4~5

    5

    6~10

    10

    11~15

    15

    16~20

    20

    21~30

    30

    >30

    60 (1小时)

    若 roundTime = true:

    当 step <= 30 时,alignmentSize 取值同上表。当 step > 30 时,alignmentSize 取值见下表:

    step

    alignmentSize

    31~60

    60 (1小时)

    61~120

    120 (2小时)

    121~180

    180 (3小时)

    181~300

    300 (5小时)

    301~600

    600 (10小时)

    601~900

    900 (15小时)

    901~1200

    1200 (20小时)

    1201~1800

    1800 (30小时)

    >1800

    3600 (60小时)

  • 若数据的时间精度为秒,如 DATETIME(yyyy-MM-dd HH:mm:ss) 与 SECOND(HH:mm:ss) 类型,alignmentSize 取值如下:

    若 roundTime = false

    step

    alignmentSize

    0~2

    2

    3

    3

    4~5

    5

    6~10

    10

    11~15

    15

    16~20

    20

    21~30

    30

    >30

    60 (1分钟)

    若 roundTime = true:

    当 step <= 30 时,alignmentSize 取值同上表。当 step > 30 时,alignmentSize 取值见下表:

    step

    alignmentSize

    31~60

    60 (1分钟)

    61~120

    120 (2分钟)

    121~180

    180 (3分钟)

    181~300

    300 (5分钟)

    301~600

    600 (10分钟)

    601~900

    900 (15分钟)

    901~1200

    1200 (20分钟)

    1201~1800

    1800 (30分钟)

    >1800

    3600 (1小时)

  • 若数据的时间精度为毫秒,如 TIMESTAMP(yyyy-MM-dd HH:mm:ss.mmm) 与 TIME(HH:mm:ss.mmm) 类型,alignmentSize 的取值如下:

    若 roundTime = false:

    step

    alignmentSize

    0~2

    2

    3~5

    5

    6~10

    10

    11~20

    20

    21~25

    25

    26~50

    50

    51~100

    100

    101~200

    200

    201~250

    250

    251~500

    500

    501~1000

    1000(1秒)

    1001~2000

    2000(2秒)

    2001~5000

    5000(5秒)

    5001~10000

    10000(10秒)

    10001~15000

    15000(15秒)

    15001~20000

    20000(20秒)

    20001~30000

    30000(30秒)

    >30000

    60000(1分钟)

    若 roundTime = true:

    step <= 30000,alignmentSize 取值同上表;若 step > 30000,alignmentSize 取值见下表:

    step

    alignmentSize

    30001~60000

    60000(1分钟)

    60001~120000

    120000(2分钟)

    120001~300000

    300000(5分钟)

    300001~600000

    600000(10分钟)

    600001~900000

    900000(15分钟)

    900001~1200000

    1200000(20分钟)

    1200001~1800000

    1800000(30分钟)

    >1800000

    3600000(1小时)

  • 若数据的时间精度为纳秒,如 NANOTIMESTAMP(yyyy-MM-dd HH:mm:ss.nnnnnnnnn) 与 NANOTIME(HH:mm:ss.nnnnnnnnn) 类型,alignmentSize 的取值如下:

    若 roundTime = false:

    step

    alignmentSize

    0~2ns

    2ns

    3ns~5ns

    5ns

    6ns~10ns

    10ns

    11ns~20ns

    20ns

    21ns~25ns

    25ns

    26ns~50ns

    50ns

    51ns~100ns

    100ns

    101ns~200ns

    200ns

    201ns~250ns

    250ns

    251ns~500ns

    500ns

    >500ns

    1000ns

    若 roundTime = true:

    step

    alignmentSize

    1000ns~1ms

    1ms

    1ms~10ms

    10ms

    10ms~100ms

    100ms

    100ms~1s

    1s

    1s~2s

    2s

    2s~3s

    3s

    3s~5s

    5s

    5s~10s

    10s

    10s~15s

    15s

    15s~20s

    20s

    20s~30s

    30s

    >30s

    1min

假设第一条数据的时间为 x, 那么根据其类型,第一个数据窗口的左边界的计算规则为:timeType_cast(x/alignmentSize*alignmentSize+step-windowSize)。其中,timeType_cast 表示依据时间精度,需要强制转换的时间类型;’/’ 表示整除。例如,第一条数据的时间为 2018.10.08T01:01:01.365,windowSize 为 120000,step 为 60000,那么 alignmentSize 为 60000,第一个数据窗口的左边界为 timestamp(2018.10.08T01:01:01.365/60000*60000+60000-120000),即 2018.10.08T01:00:00.000。

例子

下例中,时间序列引擎 engine1 订阅流数据表 trades,实时计算表 trades 中过去1分钟内每只股票交易量之和。

$ share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
$ output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
$ engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
$ subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

$ insert into trades values(2018.10.08T01:01:01.785,`A,10)
$ insert into trades values(2018.10.08T01:01:02.125,`B,26)
$ insert into trades values(2018.10.08T01:01:10.263,`B,14)
$ insert into trades values(2018.10.08T01:01:12.457,`A,28)
$ insert into trades values(2018.10.08T01:02:10.789,`A,15)
$ insert into trades values(2018.10.08T01:02:12.005,`B,9)
$ insert into trades values(2018.10.08T01:02:30.021,`A,10)
$ insert into trades values(2018.10.08T01:04:02.236,`A,29)
$ insert into trades values(2018.10.08T01:04:04.412,`B,32)
$ insert into trades values(2018.10.08T01:04:05.152,`B,23)

$ sleep(10)

$ select * from output1;

time

sym

sumVolume

2018.10.08T01:02:00.000

A

38

2018.10.08T01:02:00.000

B

40

2018.10.08T01:03:00.000

A

25

2018.10.08T01:03:00.000

B

9

下面详细解释时间序列引擎的计算过程。为简便起见,以下提到时间时,省略相同的日期部分,只列出(小时:分钟:秒.毫秒)部分。

首先,时间序列引擎对第一个数据窗口的起始时间进行规整。第一个数据窗口的时间范围是 01:01:00.000 到 01:02:00.000,只包含左边界,不包含右边界。当(01:02:10.789,`A,15)到达时,触发第一个窗口 A 组计算;当(01:02:12.005,`B,9)到达时,触发第一个窗口 B 组计算。

第二个数据窗口的时间范围是 01:02:00.000 到 01:03:00.000。当(01:04:02.236,`A,29)到达时,触发第二个窗口 A 组计算;当(01:04:04.412,`B,32)到达时,触发第二个窗口 B 组计算。

由于 01:05:00.000 及之后没有数据,因此没有对 01:04:00.000 到 01:05:00.000 之间的数据进行计算。

输出表 output1 保存了时间序列引擎的计算结果。由于 useWindowStartTime 为 false,因此输出表 output1 中的时间为窗口的结束时间。若将 useWindowStartTime 设为 true,则输出表中的时间为窗口的起始时间。例如:

$ output2 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
$ engine2 = createTimeSeriesEngine(name="engine2", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output2, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=true)
$ subscribeTable(tableName="trades", actionName="engine2", offset=0, handler=append!{engine2}, msgAsTable=true)

$ sleep(10)
$ select * from output2;

time

sym

sumVolume

2018.10.08T01:01:00.000

A

38

2018.10.08T01:01:00.000

B

40

2018.10.08T01:02:00.000

A

25

2018.10.08T01:02:00.000

B

9

下例中指定 updateTime 为 1000(毫秒):

$ output3 = keyedTable(`time`sym,10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
$ engine3 = createTimeSeriesEngine(name="engine3", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output3, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=1000, useWindowStartTime=false)
$ subscribeTable(tableName="trades", actionName="engine3", offset=0, handler=append!{engine3}, msgAsTable=true)

$ sleep(2001)
$ select * from output3;

time

sym

sumVolume

2018.10.08T01:02:00.000

A

38

2018.10.08T01:02:00.000

B

40

2018.10.08T01:03:00.000

A

25

2018.10.08T01:03:00.000

B

9

2018.10.08T01:05:00.000

B

55

2018.10.08T01:05:00.000

A

29

下面以最后一个数据窗口为例,介绍时间序列引擎指定 updateTime 时如何进行计算。假设 time 列时间亦为数据进入时间序列引擎的时刻。

(1) 在 01:04:04.236 时,A 分组的第一条记录到达后已经过 2000 毫秒,触发一次 A 组计算,输出表增加一条记录(01:05:00.000, `A, 29)。

(2) 在 01:04:05.152 时的 B 组记录为 01:04:04.412 所在小窗口[01:04:04.000, 01:04:05.000)之后第一条记录,触发一次 B 组计算,输出表增加一条记录(01:05:00,”B”,32)。

(3) 2 秒(2*updateTime个时间单位)后,在 01:04:07.152 时,由于 01:04:05.152 时的 B 组记录仍未参与计算,触发一次B组计算,输出一条记录(01:05:00,”B”,55)。由于输出表的主键为 time 和 sym,并且输出表中已有(01:05:00,”B”,32)这条记录,因此将该记录更新为(01:05:00,”B”,55)。

下例中,共享流数据表 “pubT” 包含两个时间列,类型分别时 DATE 和 SECOND,创建时间序列引擎时,通过设置 timeColumn 来将原来流数据表的两个时间列整合为输出表 streamMinuteBar_1min 中的一个类型为 DATETIME 的时间列。

$ colNames=`symbol`date`minute`price`type`volume
$ colTypes=`SYMBOL`DATE`SECOND`DOUBLE`STRING`INT
$ pubTable = streamTable(10000:0,colNames,colTypes)
$ share pubTable as pubT

$ colNames = `time`symbol`open`max`min`close`volume`amount`ret`vwap
$ colTypes = `DATETIME`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`DOUBLE`DOUBLE
$ streamMinuteBar_1min = streamTable(10000:0,colNames, colTypes)

$ tsAggrOHLC = createTimeSeriesEngine(name="subT", windowSize=60, step=60, metrics=<[first(price) as open ,max(price) as max,min(price) as min ,last(price) as close ,sum(volume) as volume ,wsum(volume, price) as amount ,(last(price)-first(price)/first(price)) as ret, (wsum(volume, price)/sum(volume)) as vwap]>, dummyTable=pubTable, outputTable=streamMinuteBar_1min, timeColumn=`date`minute, useSystemTime=false, keyColumn='symbol', fill=`none)
$ subscribeTable(tableName="pubT", actionName="subT", offset=-1, handler=append!{tsAggrOHLC}, msgAsTable=true)

$ insert into pubT values(`000001, 2021.04.05, 09:25:01, 1, 'B', 1)
$ insert into pubT values(`000001, 2021.04.05, 09:30:05, 2, 'B', 1)
$ insert into pubT values(`000001, 2021.04.05, 09:31:06, 3, 'B', 1)
$ insert into pubT values(`000001, 2021.04.05, 09:35:05, 4, 'S', 4)
$ insert into pubT values(`000001, 2021.04.05, 09:40:05, 5, 'S', 5)
$ insert into pubT values(`000001, 2021.04.06, 09:25:05, 6, 'S', 6)

symbol

date

minute

price

type

volume

000001

2021.04.05

09:25:01

1

B

1

000001

2021.04.05

09:30:05

2

B

1

000001

2021.04.05

09:31:06

3

B

1

000001

2021.04.05

09:35:05

4

S

4

000001

2021.04.05

09:40:05

5

S

5

000001

2021.04.06

09:25:05

6

S

6

$ select * from streamMinuteBar_1min

time

symbol

open

max

min

close

volume

amount

ret

vwap

2021.04.05T09:26:00

000001

1

1

1

1

1

1

0

1

2021.04.05T09:31:00

000001

2

2

2

2

1

2

1

2

2021.04.05T09:32:00

000001

3

3

3

3

1

3

2

3

2021.04.05T09:36:00

000001

4

4

4

4

4

16

3

4

2021.04.05T09:41:00

000001

5

5

5

5

5

25

4

5

$ share streamTable(1000:0, `time`sym`qty, [DATETIME, SYMBOL, INT]) as trades
$ output3 = table(10000:0, `time`sym`sumQty, [DATETIME, SYMBOL, INT])

$ engine = createTimeSeriesEngine(name="engine", windowSize=6, step=6, metrics=<sum(qty)>, dummyTable=trades, outputTable=output3, timeColumn=`time,keyColumn=`sym, forceTriggerTime=7,fill=1000)
$ subscribeTable(tableName="trades", actionName="engine", offset=0, handler=append!{engine}, msgAsTable=true)
$ sleep(1000)
$ insert into engine values(2018.08.01T14:05:43,`A,1)
$ insert into engine values(2018.08.01T14:05:43,`C,3)
$ sleep(10)
$ insert into engine values(2018.08.01T14:05:44,`B,1)
$ sleep(80)
$ insert into engine values(2018.08.01T14:05:52,`B,3)
$ sleep(20)
$ insert into engine values(2018.08.01T14:05:54,`A,3)
$ sleep(10)
$ insert into engine values(2018.08.01T14:05:55,`A,5)
$ sleep(20)
$ insert into engine values(2018.08.01T14:05:57,`B,5)
$ sleep(50)
$ insert into engine values(2018.08.01T14:06:12,`A,1)
$ sleep(50)
$ select * from output3 order by sym

time

sum

Qty

2018.08.01T14:05:46

A

1

2018.08.01T14:05:52

A

1,000

2018.08.01T14:05:58

A

8

2018.08.01T14:06:04

A

1,000

2018.08.01T14:06:10

A

1,000

2018.08.01T14:05:46

B

1

2018.08.01T14:05:52

B

1,000

2018.08.01T14:05:58

B

8

2018.08.01T14:05:46

C

3

2018.08.01T14:05:52

C

1,000