createReactiveStateEngine

New in version 1.30.2.

语法

createReactiveStateEngine(name, metrics, dummyTable, outputTable, keyColumn, [filter], [snapshotDir], [snapshotIntervalInMsgCount], [keepOrder], [keyPurgeFilter], [keyPurgeFreqInSecond=0], [raftGroup], [outputElapsedMicroseconds=false])

详情

创建响应式状态引擎。返回一个表对象,向该表写入数据意味着这些数据进入响应式状态引擎进行计算。

下列状态函数在 DolphinDB 的响应式状态引擎中的实现均得到了优化。目前,状态引擎不允许使用未经优化的状态函数,且需避免使用聚合函数。

注意,talib 作为状态函数时,第一个参数 func 只能是响应式状态引擎支持的状态函数。

更多流数据引擎的应用场景说明可以参考 流数据引擎

计算规则

每注入一条数据都会计算并产生一条结果。用户可以通过设置参数 filter 过滤计算结果后输出。 若指定了 keyColumn 进行分组,则计算将在组内进行。

注意:状态引擎的输出结果可能和输入顺序不一致,建议配置参数 keepOrder = true,保持输出结果的有序性。

引擎的其它功能

  • 支持数据/状态清理:状态引擎内部的状态数据是按分组保存的,为避免分组过多,导致引擎内部内存开销过大,可以将历史分组数据进行清理。用户可以通过配置参数 keyPurgeFilter 设置清理条件,配置 keyPurgeFreqInSecond 设置清理时间间隔。(详情请参考 keyPurgeFilterkeyPurgeFreqInSecond 的参数说明)

  • 快照机制:启用快照机制之后,系统若出现异常,可及时将流数据引擎恢复到最新的快照状态。(详情请参考 snapshotDirsnapshotIntervalInMsgCount 的参数说明)

  • 流数据引擎高可用:若要启用引擎高可用,需在订阅端 raft 组的 leader 节点创建引擎并通过 raftGroup 参数开启高可用。开启高可用后,当 leader 节点宕机时,会自动切换新 leader 节点重新订阅流数据表。(详情请参考 raftGroup 的参数说明)

参数

name 字符串标量,表示响应式状态引擎的名称,作为其在一个数据节点/计算节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。

metrics 以元代码的形式表示计算公式。有关元代码的详情可参考 元编程。若需使用用户自定义函数,请注意以下两点:

  1. 需在定义前添加声明 “@state”。状态函数只能包含赋值语句和 return 语句。自 1.30.21 版本起,支持使用 if-else 条件语句,且条件只能是标量。

  2. 若赋值语句的右值是一个多返回值的函数,则需要将多个返回值同时赋予多个变量。例如:两个返回值的函数 linearTimeTrend 应用于自定义状态函数中,正确写法为:

$ @state
$ def forcast2(S, N){
$       linearregIntercept, linearregSlope = linearTimeTrend(S, N)
$       return (N - 1) * linearregSlope + linearregIntercept
$ }

注意:metrics 中使用的列名大小写不敏感,不要求与输入表的列名大小写保持一致。

dummyTable 一个表对象,和订阅的流数据表的 schema 一致,可以含有数据,亦可为空表。

outputTable 计算结果的输出表,可以是内存表或分布式表。使用 createReactiveStateEngine 函数之前,需要将输出表预先设立为一个空表,并指定各列列名以及数据类型。响应式状态引擎会将计算结果注入该表。

输出表的各列的顺序如下:

(1) 分组列。根据 keyColumn 的设置,输出表的前几列必须和 keyColumn 设置的列及其顺序保持一致。

(2) 耗时列。若指定 outputElapsedMicroseconds = true,则需要增加一个 LONG 类型和一个 INT 类型的列,分别用于存储引擎内部每个 batch 的数据耗时(单位:微秒)和记录数。

(3) 计算结果列。可为多列。

keyColumn 字符串标量或向量表示分组列名。计算将在各分组进行。

filter 可选参数。以元代码的形式表示过滤条件。只有符合过滤条件的结果才被输出。它只能是一个表达式。设置多个条件时,用逻辑运算符(and, or)连接。

若要开启快照机制 (snapshot),必须指定 snapshotDirsnapshotIntervalInMsgCount

snapshotDir 可选参数,字符串,表示保存引擎快照的文件目录。

  • 指定的目录必须存在,否则系统会提示异常。

  • 创建流数据引擎时,如果指定了 snapshotDir,会检查该目录下是否存在快照。如果存在,会加载该快照,恢复引擎的状态。

  • 多个引擎可以指定同一个目录存储快照,用引擎的名称来区分快照文件。

  • 一个引擎的快照可能会使用三个文件名:

    • 临时存储快照信息:文件名为 <engineName>.tmp;

    • 快照生成并刷到磁盘:文件保存为 <engineName>.snapshot;

    • 存在同名快照:旧快照自动重命名为 <engineName>.old。

snapshotIntervalInMsgCount 可选参数,为整数类型,表示每隔多少条数据保存一次流数据引擎快照。

New in version 1.30.8: 参数 keepOrder

keepOrder 可选参数,表示输出表数据是否按照输入时的顺序排序。设置 keepOrder = true,表示输出表按照输入时的顺序排序。当 keyColumn 包含有时间列时,keyOrder 默认值为 true,否则默认值为 false。

New in version 1.30.13: 参数 keyPurgeFilterkeyPurgeFreqInSecond

keyPurgeFilter 可选参数,是一个由布尔表达式组成的元代码,表示清理条件。各表达式只能引用 outputTable 中的字段。

keyPurgeFreqInSecond 正整数,表示触发数据清理需要满足的时间间隔(以秒为单位)。

响应式状态引擎提供了 keyPurgeFilter, keyPurgeFreqInSecond 两个参数,用来清理不再需要的分组数据。

每次数据注入时,引擎会依次根据以下条件决定是否触发数据清理:

(1) 检测系统时间与上一次清理的时间间隔是否大于等于 keyPurgeFreqInSecond (第一次数据注入时,会检测系统时间和引擎创建时间的间隔);

(2) 若满足上述条件,引擎将根据 keyPurgeFilter 指定的条件,过滤出待清理的数据;

(3) 若待清理的数据所属的分组数大于等于所有分组数的 10%,则触发清理。

若需要查看清理前后的状态,可以通过调用 getStreamEngineStat 函数查看 ReactiveStreamEngine 引擎状态的 numGroups 列,来对比响应式状态引擎清理前后分组数的变化。

New in version 1.30.14: 参数 raftGroup

raftGroup 是流数据高可用订阅端 raft 组的 ID (大于1的整数,由流数据高可用相关的配置项 streamingRaftGroups 指定)。设置该参数表示开启计算引擎高可用。在 leader 节点创建流数据引擎后,会同步在 follower 节点创建该引擎。每次保存的 snapshot 也会同步到 follower。当 raft 组的 leader 节点宕机时,会自动切换新 leader 节点重新订阅流数据表。请注意,若要指定 raftGroup,必须同时指定 snapshotDir

New in version 1.30.21: 参数 outputElapsedMicroseconds

若同时有一批数据注入响应式状态引擎,则引擎内部数据是分批进行计算的,每个批次的数据称为一个 batch,每个 batch 包含记录数由系统决定。

outputElapsedMicroseconds 布尔值,表示是否输出每个 batch 中数据从注入引擎到计算输出的总耗时,以及每个 batch 包含的总记录数,默认为 false。指定参数 outputElapsedMicroseconds 后,在定义 outputTable 时需要在 keyColumn 列后增加两列,详见 outputTable 参数说明。

例子

$ def sum_diff(x, y){
$  return (x-y)/(x+y)
$ }
$ factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)>
$ share streamTable(1:0, `sym`time`price, [STRING,DATETIME,DOUBLE]) as tickStream
$ result = table(1000:0, `sym`time`factor1, [STRING,DATETIME,DOUBLE])
$ rse = createReactiveStateEngine(name="reactiveDemo", metrics =[<time>, factor1], dummyTable=tickStream, outputTable=result, keyColumn="sym", filter=<sym in ["000001.SH", "000002.SH"]>)
$ subscribeTable(tableName=`tickStream, actionName="factors", handler=tableInsert{rse})

$ data1 = table(take("000001.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 10+cumsum(rand(0.1, 100)-0.05) as price)
$ data2 = table(take("000002.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 20+cumsum(rand(0.2, 100)-0.1) as price)
$ data3 = table(take("000003.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 30+cumsum(rand(0.3, 100)-0.15) as price)
$ data = data1.unionAll(data2).unionAll(data3).sortBy!(`time)

$ replay(inputTables=data, outputTables=tickStream, timeColumn=`time)

查看结果表result,可见只有过滤条件中的”000001.SH”与”000002.SH”这两只股票的计算结果被输出。

若要重复调试以上代码,需要先执行以下代码。

$ unsubscribeTable(tableName=`tickStream, actionName="factors")
$ dropStreamEngine(`reactiveDemo)
$ undef(`tickStream, SHARED)

keyColumn 设置以股票和日期进行分组计算,并设置输出时间在”2012.01.01”和”2012.01.03”之间的计算结果。

$ share streamTable(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT]) as trades
$ outputTable = table(100:0, `date`sym`factor1, [DATE, STRING, DOUBLE])
$ engine = createReactiveStateEngine(name="test", metrics=<mavg(price, 3)>, dummyTable=trades, outputTable=outputTable, keyColumn=["date","sym"], filter=<date between 2012.01.01 : 2012.01.03>, keepOrder=true)
$ subscribeTable(tableName=`trades, actionName="test", msgAsTable=true, handler=tableInsert{engine})

$ n=100
$ tmp = table(rand(2012.01.01..2012.01.10, n) as date, rand(09:00:00.000..15:59:59.999, n) as time, rand("A"+string(1..10), n) as sym, rand(['B', 'S'], n) as market, rand(100.0, n) as price, rand(1000..2000, n) as qty)
$ trades.append!(tmp)
$ select * from outputTable

//若要重复调试以上代码,需要先执行以下代码
$ unsubscribeTable(tableName=`trades, actionName="test")
$ dropStreamEngine(`test)
$ undef(`trades, SHARED)