createLookupJoinEngine

New in version 1.30.16.

语法

createLookupJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, [rightTimeColumn], [checkTimes])

详情

创建流数据表的 lookup join 引擎,该引擎以 matchingColumn 作为连接列将两个流数据表进行实时 left join/left outer join, 或者将流数据表和非流数据表进行 left join(此时需要定时刷新非流表)。lookup join 引擎常用于右表更新不频繁的场景(如保存了日频指标的维度表)。

工作机制:

1. 仅当左表有新数据流入时才会触发左、右表的 left join 输出。

2. 引擎仅保留右表根据 matchingColumn 分组后各分组内的最新一条数据。当右表是订阅的流表时,数据流入右表的同时会更新数据; 若右表为内存表或分布式表(目前仅支持维度表),系统会根据 checkTimes 定时刷新右表数据。

lookup join 引擎实际采用 left join 的连接方式,但与 left join 不完全等价,区别在于:如果右表中有多条匹配记录,left join 会返回右表所有匹配的记录,但 lookup join 引擎只返回右表最新一条匹配的记录。

lookup join 引擎与 asof join 引擎很相似,它们之间的区别如下:

lookup join 引擎输出表的第一列可以不是时间列,而 asof join 引擎输出表第一列必须是时间列。

lookup join 引擎当左表有新数据流入便会触发 join 输出,因此无需考虑数据延迟,也无需缓存左表数据。而 asof join 引擎,当指定 timeColumn 时,需要考虑左右表的数据延时。

更多流数据引擎的应用场景说明可以参考 流数据引擎

参数

name 必选参数,表示流数据 lookup join 引擎的名称,作为其在一个数据节点/计算节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。

leftTable 表对象。可以不包含数据,但结构必须与订阅的流数据表相同。

rightTable 表对象,可以是内存表、流数据表或维度表。请注意,如果 rightTable 没有被订阅,但 rightTable 会定期更新,则必须设置 checkTimes 来定时刷新右表数据。

outputTable 必选参数,为计算结果的输出表。在使用 createLookupJoinEngine 函数之前,需要将输出表预先设立为一个空表,并指定各列列名以及数据类型。

输出表的各列的顺序如下:

(1) 连接列。与 matchingColumn 中的列以及其顺序一致,可为多列。

(2) 计算结果列。可为多列。

metrics 以元代码的格式表示计算指标,支持输入元组。有关元代码的更多信息可参考 元编程

  • 计算指标可以是一个或多个表达式、系统内置或用户自定义函数,但不能是聚合函数。

  • metrics 内支持调用具有多个返回值的函数,且必须指定列名,例如 <func(price) as `col1`col2>。

若在 metrics 指定了 leftTablerightTable 中具有相同名称的列,默认取左表的列,可以通过 “tableName.colName” 指定该列来自哪个表。

注意:metrics 中使用的列名大小写敏感,其必须与输入表的列名大小写保持一致。

matchingColumn 表示连接列的字符串标量/向量或字符串向量组成的 tuple,支持 Integral, Temporal 或 Literal(UUID 除外)类型。

matchingColumn 指定规则:

1. 只有一个连接列。当左表和右表的连接列名相同时,matchingColumn 是一个字符串标量,否则是一个长度为 2 的 tuple。例如:左表连接列名为 sym,右表连接列名为 sym1,则 matchingColumn = [[`sym],[`sym1]]。

2. 有多个连接列。当左表和右表的连接列名相同时,matchingColumn 是一个字符串向量,否则是一个长度为 2 的 tuple。例如:左表连接列名为 timestamp, sym,右表连接列名为 timestamp, sym1,则 matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]]。

rightTimeColumn 是字符串标量,表示右表的时间列名称。若设置该参数,右表会根据指定的时间列的时间戳保留最新的数据(若有多行,则取其中最后一行)。若不指定该参数,则根据数据注入系统的时间保留最新数据。

checkTimes 是一个时间类型向量或 DURATION 的标量。设置后,系统会定时更新 rightTable 的数据(只保留 rightTable 的最新数据),并将更新后的数据追加到引擎中。当无需更新 rightTable 时,则不用设置该参数,但需要在引擎创建后,手动将 rightTable 注入到引擎中。

  • checkTimes 是时间类型向量时,只能为SECOND, TIME 或 NANOTIME 类型。 lookup join 引擎每天根据向量内各元素指定的时间定时更新右表。

  • checkTimes 是 DURATION 标量时,表示更新右表的时间间隔。

例子

例1:

$ login(`admin, `123456)
$ share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
$ output = table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE])

$ LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym)
$ subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)

$ n = 15
$ tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
$ prices.append!(tem1)
$ sleep(2000)
$ n  = 10
$ tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
$ trades.append!(tem2)
$ sleep(100)
$ select * from output

sym

factor1

factor2

factor3

A

10.1

13

131.3

B

11.1

14

155.4

C

12.1

15

181.5

A

13.1

13

170.3

B

14.1

14

197.4

C

15.1

15

226.5

A

16.1

13

209.3

B

17.1

14

239.4

C

8.1

15

271.5

A

19.1

13

248.3

例2:

$ share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
$ output = table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps)
$ subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)

$ n = 15
$ tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
$ prices.append!(tem1)
$ sleep(2000)
$ n  = 10
$ tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
$ trades.append!(tem2)
$ sleep(100)
$ select * from output

sym

factor1

factor2

factor3

A

10.1

10

101

B

11.1

11

122.1

C

12.1

12

145.2

A

13.1

10

131

B

14.1

11

155.1

C

15.1

12

181.2

A

16.1

10

161

B

17.1

11

188.1

C

18.1

12

217.2

A

19.1

10

191

例3:右表是内存表,需设置checkTimes

$ share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ output = table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ prices=table(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT])
$ LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps, checkTimes=1s)
$ subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)

$ n = 15
$ tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
$ prices.append!(tem1)
$ sleep(2000)
$ n  = 10
$ tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
$ trades.append!(tem2)
$ sleep(100)
$ select * from output

sym

factor1

factor2

factor3

A

10.1

10

101

B

11.1

11

122.1

C

12.1

12

145.2

A

13.1

10

131

B

14.1

11

155.1

C

15.1

12

181.2

A

16.1

10

161

B

17.1

11

188.1

C

18.1

12

217.2

A

19.1

10

191

例4:左表为一个实时的交易表与右表(相对稳定的维度表)做连接。

$ share streamTable(1000:0, `time`volume`id, [TIMESTAMP, INT,INT]) as trades
$ dbPath="dfs://testlj"
$ if(existsDatabase(dbPath)){
$    dropDatabase(dbPath)
$ }
$ rt=table(1000:0, `time`price`id, [TIMESTAMP, DOUBLE, INT])
$ db=database(dbPath, VALUE, `A`B`C)
$ prices=db.createTable(rt,`rightTable)
$ outputTable = table(10000:0, `id`volume`price`prod, [INT,INT,DOUBLE,DOUBLE])

$ tradesLookupJoin = createLookupJoinEngine(name="streamLookup1", leftTable=trades, rightTable=prices, outputTable=outputTable, metrics=<[volume,price,volume*price]>, matchingColumn=`id, rightTimeColumn=`time,checkTimes=1s)
$ subscribeTable(tableName="trades", actionName="append_trades", offset=0, handler=appendForJoin{tradesLookupJoin, true}, msgAsTable=true)

$ def writeData(t,n){
$     timev = 2021.10.08T01:01:01.001 + timestamp(1..n)
$     volumev = take(1..n, n)
$     id = take(1 2 3, n)
$     insert into t values(timev, volumev, id)
$ }
$ //n=7
$ writeData(rt, 10)
$ prices.append!(rt)
$ sleep(2000)
$ writeData(trades, 6)
$ sleep(2)

$ select * from outputTable

id

volume

price

prod

1

1

10

10

2

2

8

16

3

3

9

27

1

4

10

40

2

5

8

40

3

6

9

54