createLookupJoinEngine
New in version 2.00.4.
语法
createLookupJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, [rightTimeColumn], [checkTimes])
详情
创建流数据表的 lookup join 引擎,该引擎以 matchingColumn 作为连接列将两个流数据表进行实时 left join/left outer join, 或者将流数据表和非流数据表进行 left join(此时需要定时刷新非流表)。lookup join 引擎常用于右表更新不频繁的场景(如保存了日频指标的维度表)。
工作机制:
1. 仅当左表有新数据流入时才会触发左、右表的 left join 输出。
2. 引擎仅保留右表根据 matchingColumn 分组后各分组内的最新一条数据。当右表是订阅的流表时,数据流入右表的同时会更新数据; 若右表为内存表或分布式表(目前仅支持维度表),系统会根据 checkTimes 定时刷新右表数据。
lookup join 引擎实际采用 left join 的连接方式,但与 left join 不完全等价,区别在于:如果右表中有多条匹配记录,left join 会返回右表所有匹配的记录,但 lookup join 引擎只返回右表最新一条匹配的记录。
lookup join 引擎与 asof join 引擎很相似,它们之间的区别如下:
lookup join 引擎输出表的第一列可以不是时间列,而 asof join 引擎输出表第一列必须是时间列。
lookup join 引擎当左表有新数据流入便会触发 join 输出,因此无需考虑数据延迟,也无需缓存左表数据。而 asof join 引擎,当指定 timeColumn 时,需要考虑左右表的数据延时。
更多流数据引擎的应用场景说明可以参考 流数据引擎。
参数
name 必选参数,表示流数据 lookup join 引擎的名称,作为其在一个数据节点/计算节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。
leftTable 表对象。可以不包含数据,但结构必须与订阅的流数据表相同。
rightTable 表对象,可以是内存表、流数据表或维度表。请注意,如果 rightTable 没有被订阅,但 rightTable 会定期更新,则必须设置 checkTimes 来定时刷新右表数据。
outputTable 必选参数,为计算结果的输出表。在使用 createLookupJoinEngine
函数之前,需要将输出表预先设立为一个空表,并指定各列列名以及数据类型。
输出表的各列的顺序如下:
(1) 连接列。与 matchingColumn 中的列以及其顺序一致,可为多列。
(2) 计算结果列。可为多列。
metrics 以元代码的格式表示计算指标,支持输入元组。有关元代码的更多信息可参考 元编程。
计算指标可以是一个或多个表达式、系统内置或用户自定义函数,但不能是聚合函数。
metrics 内支持调用具有多个返回值的函数,且必须指定列名,例如 <func(price) as `col1`col2>。
若在 metrics 指定了 leftTable 和 rightTable 中具有相同名称的列,默认取左表的列,可以通过 “tableName.colName” 指定该列来自哪个表。
注意:metrics 中使用的列名大小写敏感,其必须与输入表的列名大小写保持一致。
matchingColumn 表示连接列的字符串标量/向量或字符串向量组成的 tuple,支持 Integral, Temporal 或 Literal(UUID 除外)类型。
matchingColumn 指定规则:
1. 只有一个连接列。当左表和右表的连接列名相同时,matchingColumn 是一个字符串标量,否则是一个长度为 2 的 tuple。例如:左表连接列名为 sym,右表连接列名为 sym1,则 matchingColumn = [[`sym],[`sym1]]。
2. 有多个连接列。当左表和右表的连接列名相同时,matchingColumn 是一个字符串向量,否则是一个长度为 2 的 tuple。例如:左表连接列名为 timestamp, sym,右表连接列名为 timestamp, sym1,则 matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]]。
rightTimeColumn 是字符串标量,表示右表的时间列名称。若设置该参数,右表会根据指定的时间列的时间戳保留最新的数据(若有多行,则取其中最后一行)。若不指定该参数,则根据数据注入系统的时间保留最新数据。
checkTimes 是一个时间类型向量或 DURATION 的标量。设置后,系统会定时更新 rightTable 的数据(只保留 rightTable 的最新数据),并将更新后的数据追加到引擎中。当无需更新 rightTable 时,则不用设置该参数,但需要在引擎创建后,手动将 rightTable 注入到引擎中。
checkTimes 是时间类型向量时,只能为SECOND, TIME 或 NANOTIME 类型。 lookup join 引擎每天根据向量内各元素指定的时间定时更新右表。
checkTimes 是 DURATION 标量时,表示更新右表的时间间隔。
例子
例1:
$ login(`admin, `123456)
$ share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
$ output = table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym)
$ subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)
$ n = 15
$ tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
$ prices.append!(tem1)
$ sleep(2000)
$ n = 10
$ tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
$ trades.append!(tem2)
$ sleep(100)
$ select * from output
sym |
factor1 |
factor2 |
factor3 |
---|---|---|---|
A |
10.1 |
13 |
131.3 |
B |
11.1 |
14 |
155.4 |
C |
12.1 |
15 |
181.5 |
A |
13.1 |
13 |
170.3 |
B |
14.1 |
14 |
197.4 |
C |
15.1 |
15 |
226.5 |
A |
16.1 |
13 |
209.3 |
B |
17.1 |
14 |
239.4 |
C |
8.1 |
15 |
271.5 |
A |
19.1 |
13 |
248.3 |
例2:
$ share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
$ output = table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps)
$ subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)
$ n = 15
$ tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
$ prices.append!(tem1)
$ sleep(2000)
$ n = 10
$ tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
$ trades.append!(tem2)
$ sleep(100)
$ select * from output
sym |
factor1 |
factor2 |
factor3 |
---|---|---|---|
A |
10.1 |
10 |
101 |
B |
11.1 |
11 |
122.1 |
C |
12.1 |
12 |
145.2 |
A |
13.1 |
10 |
131 |
B |
14.1 |
11 |
155.1 |
C |
15.1 |
12 |
181.2 |
A |
16.1 |
10 |
161 |
B |
17.1 |
11 |
188.1 |
C |
18.1 |
12 |
217.2 |
A |
19.1 |
10 |
191 |
例3:右表是内存表,需设置checkTimes
$ share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ output = table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ prices=table(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT])
$ LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps, checkTimes=1s)
$ subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
$ n = 15
$ tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
$ prices.append!(tem1)
$ sleep(2000)
$ n = 10
$ tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
$ trades.append!(tem2)
$ sleep(100)
$ select * from output
sym |
factor1 |
factor2 |
factor3 |
---|---|---|---|
A |
10.1 |
10 |
101 |
B |
11.1 |
11 |
122.1 |
C |
12.1 |
12 |
145.2 |
A |
13.1 |
10 |
131 |
B |
14.1 |
11 |
155.1 |
C |
15.1 |
12 |
181.2 |
A |
16.1 |
10 |
161 |
B |
17.1 |
11 |
188.1 |
C |
18.1 |
12 |
217.2 |
A |
19.1 |
10 |
191 |
例4:左表为一个实时的交易表与右表(相对稳定的维度表)做连接。
$ share streamTable(1000:0, `time`volume`id, [TIMESTAMP, INT,INT]) as trades
$ dbPath="dfs://testlj"
$ if(existsDatabase(dbPath)){
$ dropDatabase(dbPath)
$ }
$ rt=table(1000:0, `time`price`id, [TIMESTAMP, DOUBLE, INT])
$ db=database(dbPath, VALUE, `A`B`C)
$ prices=db.createTable(rt,`rightTable)
$ outputTable = table(10000:0, `id`volume`price`prod, [INT,INT,DOUBLE,DOUBLE])
$ tradesLookupJoin = createLookupJoinEngine(name="streamLookup1", leftTable=trades, rightTable=prices, outputTable=outputTable, metrics=<[volume,price,volume*price]>, matchingColumn=`id, rightTimeColumn=`time,checkTimes=1s)
$ subscribeTable(tableName="trades", actionName="append_trades", offset=0, handler=appendForJoin{tradesLookupJoin, true}, msgAsTable=true)
$ def writeData(t,n){
$ timev = 2021.10.08T01:01:01.001 + timestamp(1..n)
$ volumev = take(1..n, n)
$ id = take(1 2 3, n)
$ insert into t values(timev, volumev, id)
$ }
$ //n=7
$ writeData(rt, 10)
$ prices.append!(rt)
$ sleep(2000)
$ writeData(trades, 6)
$ sleep(2)
$ select * from outputTable
id |
volume |
price |
prod |
---|---|---|---|
1 |
1 |
10 |
10 |
2 |
2 |
8 |
16 |
3 |
3 |
9 |
27 |
1 |
4 |
10 |
40 |
2 |
5 |
8 |
40 |
3 |
6 |
9 |
54 |