genericStateIterate
New in version 1.30.21.
语法
genericStateIterate(X, initial, window, func)
参数
X 表中的字段或对其应用向量函数的计算结果。若不指定,需要置为[];若需要输入多个列变量,需要用元组表示。
initial 用于初始化的列字段,作为初始化窗口内元素的输出,初始化窗口(以元素个数衡量)为 [1, window]。initial 可以是输入表中的字段或对其应用向量函数的计算结果。
window 大于 1 的正整数,表示初始化窗口和历史窗口的长度。
func 无状态函数,为用户自定义函数,其返回值必须是标量。func 参数个数为 1(历史窗口内的数据)+ X 指定的列数。第一个参数为历史窗口的元素值,之后的参数依次为 X 指定列的元素值。除前述参数外,若 func 包含其他固定的常量参数,则需以部分应用的形式指定。
详情
基于以元素个数衡量的窗口进行迭代计算。
假设 X 指定为 [X1, X2, …, Xn],该函数计算结果对应输出表中的列为 factor,初始化字段为 initial,window 为 w,迭代函数为 func。
对于第 k 条记录(k = 1, 2 …),其计算逻辑为:
k <= w:factor[k] = initial[k]
k > w:factor[k] = func(factor[(k-w):k], X1[k], X2[k], … , Xn[k])
注意:数据对用于索引时,不包含右边界的值,即 (k-w):k 的范围是 [k-w, k)。
例子
// define a function
$ def myfunc(x, w){
$ re = sum(x*w)
$ return re
$ }
$ dateTime = 2021.09.09T09:30:00.000 2021.09.09T09:31:00.000 2021.09.09T09:32:00.000 2021.09.09T09:33:00.000 2021.09.09T09:34:00.000
$ securityID = `600021`600021`600021`600021`600021
$ volume = 310 280 300 290 240
$ price = 1.5 1.6 1.7 1.6 1.5
$ t = table(1:0, `dateTime`securityID`volume`price, [TIMESTAMP, SYMBOL, INT, DOUBLE])
$ tableInsert(t, dateTime, securityID, volume, price)
$ output = table(100:0, `securityID`dateTime`factor1, [SYMBOL, TIMESTAMP, DOUBLE])
$ engine = createReactiveStateEngine(name="test", metrics=[<dateTime>, <genericStateIterate(volume,price,3,myfunc{,})>], dummyTable=t, outputTable=output, keyColumn=`SecurityID, keepOrder=true)
$ engine.append!(t)
$ dropAggregator(`test)
securityID | dateTime | factor1 |
---|---|---|
600021 | 2021.09.09T09:30:00.000 | 1.5 |
600021 | 2021.09.09T09:31:00.000 | 1.6 |
600021 | 2021.09.09T09:32:00.000 | 1.7 |
600021 | 2021.09.09T09:33:00.000 | 1,392 |
600021 | 2021.09.09T09:34:00.000 | 334,872 |
上例计算过程如下:
由于窗口为 3,因此对于前 3 条数据,以 price 的值作为 factor1 的输出;
第 4 条数据到来时,历史窗口的数据为 [1.5, 1.6, 1.7],当前 volume 的值为 290,因此调用自定义函数 myfunc([1.5, 1.6, 1.7], 290) = 1392;
第 5 条数据到来时,历史窗口的数据为 [1.6, 1.7, 1392],当前 volume 的值为 240,因此调用自定义函数 myfunc([1.6, 1.7, 1392], 240) = 334872;
若之后继续有数据注入,其计算过程以此类推。
相关函数:genericTStateIterate