createCrossSectionalAggregator

Parent Previous Next


语法


createCrossSectionalAggregator(name, [metrics], dummyTable, [outputTable], keyColumn, [triggeringPattern="perBatch"], [triggeringInterval=1000])


参数


name是一个字符串,表示横截面聚合引擎的名称,是横截面聚合引擎的唯一标识。它可以包含字母,数字和下划线,但必须以字母开头。


metrics是元代码。它可以是系统内置或用户自定义的函数,如<[sum(qty), avg(price)]>,可以对聚合结果使用表达式,如<[avg(price1)-avg(price2)]>,也可以对计算列进行聚合运算,如<[std(price1-price2)]>。详情可参考元编程


dummyTable是表对象,它可以不包含数据,但它的结构必须与订阅的流数据表相同。


outputTable是表对象,用于保存计算结果。输出表的列数为metrics数量+1,第一列为TIMESTAMP类型,用于存放发生计算的时间戳,,其他列的数据类型必须与metrics返回结果的数据类型一致。


keyColumn是一个字符串,指定dummyTable某列的值为横截面聚合引擎的key。keyColumn指定列中的每一个key对应表中的唯一一行。


triggeringPattern是一个字符串,表示触发计算的方式。它可以是以下取值:





triggeringInterval是一个整数。只有当triggeringPattern的取值为interval时才生效,表示触发计算的时间间隔。默认值为1000毫秒。


详情


返回一个包含最新记录的表。keyColumn中的值对应表中唯一一行,如果新插入数据中keyColumn的值已经存在,那么将更新记录,反之将在表的末尾插入一行新的记录。




例子


下面的例子使用createCrossSectionalAggregator函数创建了一个表tradesCrossAggregator,然后通过订阅流数据表trades把数据写入到tradesCrossAggregator中,每插入一行数据触发一次计算,并把结果保存在表outputTable中。



share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades

outputTable = table(1:0, `time`recordTime`avgPrice`sumQty`total, [TIMESTAMP,TIMESTAMP,DOUBLE,INT,DOUBLE])

CSAggregator1=createCrossSectionalAggregator("CSAggregatorDemo1", <[max(time), avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `perRow)


subscribeTable(,"trades","tradesStats",-1,append!{CSAggregator1},true)


def writeData(n){

   timev = 2000.10.08T01:01:01.001 + timestamp(1..n)

   symv =take(`A`B, n)

   pricev=take(102.1 33.4 73.6 223,n)

   qtyv = take(60 74 82 59, n)

   insert into trades values(timev, symv, pricev,qtyv)

}

writeData(4);


>select * from trades;

time                    sym price qty

----------------------- --- ----- ---

2000.10.08T01:01:01.002 A   102.1 60

2000.10.08T01:01:01.003 B   33.4  74

2000.10.08T01:01:01.004 A   73.6  82

2000.10.08T01:01:01.005 B   223   59


>select recordTime, avgPrice, sumQty, total from outputTable;

recordTime              avgPrice sumQty total  

----------------------- -------- ------ -------

2000.10.08T01:01:01.002 102.1    60     6126  

2000.10.08T01:01:01.003 67.75    134    8597.6

2000.10.08T01:01:01.004 53.5     156    8506.8

2000.10.08T01:01:01.005 148.3    141    19192.2



下面的例子中,横截面聚合引擎触发计算的方式为"perBatch"。



share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades

outputTable = table(1:0, `time`avgPrice`sumqty`Total, [TIMESTAMP,DOUBLE,INT,DOUBLE])

tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `perBatch)

subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true)

def writeData(n){

  timev  = 2000.10.08T01:01:01.001 + timestamp(1..n)

  symv   = take(`A`B, n)

  pricev = take(102.1 33.4 73.6 223,n)

  qtyv   = take(60 74 82 59, n)

  insert into trades values(timev, symv, pricev,qtyv)

}

//写入三批数据,预期会触发三次计算,输出三次聚合结果。

writeData(4);

writeData(4);

writeData(4);


>select * from outputTable;

time                    avgPrice sumqty Total  

----------------------- -------- ------ -------

2019.07.08T10:14:54.446 148.3    141    19192.2

2019.07.08T10:14:54.446 148.3    141    19192.2

2019.07.08T10:14:54.446 148.3    141    19192.2



把横截面聚合引擎触发计算的方式设置为"interval",每1000毫秒触发一次计算。往流数据表中写入6次数据,每次写入2条数据,间隔500毫秒。



share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades

outputTable = table(1:0, `time`avgPrice`sumqty`Total, [TIMESTAMP,DOUBLE,INT,DOUBLE])

tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `interval,1000)

subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true)

def writeData(n){

  timev  = 2000.10.08T01:01:01.001 + timestamp(1..n)

  symv   = take(`A`B, n)

  pricev = take(102.1 33.4 73.6 223,n)

  qtyv   = take(60 74 82 59, n)

  insert into trades values(timev, symv, pricev,qtyv)

}

a = now()

writeData(2);

sleep(500)

writeData(2);

sleep(500)

writeData(2);

sleep(500)

writeData(2);

sleep(500)

writeData(2);

sleep(500)

writeData(2);

sleep(500)

b = now()


>select count(*) from outputTable;

3



如果再次执行select count(*) from outputTable,会发现随着时间的推移,输出表的记录数会不断增长。这是因为在interval模式下,计算是按照现实时间定时触发,并不依赖于是否有新的数据进来。