replay

根据指定的回放模式,将一个或多个数据表(数据源)的数据以一定的速率回放到数据表中,以模拟流数据实时注入。

语法

replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [absoluteRate=true], [parallelLevel=1])

参数

inputTables 是一个未分区的内存表或 replayDS 函数生成的数据源。

  • 如果 inputTables 是单个或多个结构相同的表对象或数据源, 则用标量或元组表示。

  • 如果 inputTables 是多个结构不同的表对象或数据源,则用字典表示。字典的 key 是用户自定义的字符串,表示输入表的唯一标识,value 是表对象或数据源。

outputTables 内存表或流数据表对象。若输出表是共享表,也可以指定为共享表的表名。根据回放形式的不同(参考详情部分的说明),outputTables 的取值有所区别:

(1)一对一回放、同构回放时,outputTables 是一个标量 ;输出表的 schema 必须与输入表的 schema 相同。

(2)多对多回放,outputTables 是一个向量或元组,且它的长度必须与 inputTables 相同。输出表和输入表两两对应,对应的两表的 schema 必须相同。

(3)异构回放时 outputTables 是一个标量,至少包含三列:

  • 第一列用于存储时间戳,为 dateColumn 和 timeColumn 指定的回放时间;

  • 第二列为 SYMBOL 或 STRING 类型,对应 inputTables 字典的 key;

  • 第三列必须指定为 BLOB 类型,用于存储被回放的每条记录序列化后的结果。

此外,还支持指定输出各输入表的公共列(列名和类型一致的列)。

dateColumntimeColumn 表示时间列的列名。必须至少指定其中一列。根据回放形式的不同(参考详情部分的说明),dateColumn 和 timeColumn 的取值有所区别:

  • 一对一回放、同构回放时,由于输入输出表的所有时间列的列名必须相同,因此可以指定为字符串标量;

  • 输入表时间列的列名不同:

    • 多对多回放:可以指定为字符串向量;

    • 异构回放:可以指定为一个字典,其中 key 为输入表的标识符,value 是对应表时间列的列名。

只指定 dateColumntimeColumn 中的一个参数,或者 dateColumntimeColumn 指定为同一列时,对指定的时间列的类型没有限制。若指定 dateColumntimeColumn 为不同列,dateColumn 必须是 DATE 类型,timeColumn 只能是 SECOND, TIME 或 NANOTIME 类型。

replayRate 整数,和参数 absoluteRate 共同决定了回放的速率。

absoluteRate 布尔值。默认值为 true,表示每秒按 replayRate 指定的记录数回放。若为 false,表示依照数据中的时间戳加速 replayRate 倍回放。

parallelLevel 正整数,表示将数据从数据源加载到内存的工作线程数量,默认值为1。如果 inputTables 不是数据源,无需指定该参数。

详情

按照给定的回放模式,按时间顺对数据进行回放。

根据输入表和输出表的映射关系(map),replay 支持一对一,多对一,多对多三种回放形式。

对于多对一回放,1.30.17 / 2.00.5 版本前,输入表结构必须相同,称为同构回放。该版本后,replay 支持以字典的形式输入结构不同的表,称为异构回放。

根据不同的场景需求,replay 提供以下三种回放模式:

  • 匀速回放:如果 replayRate 为正整数,且 absoluteRate 为 true,则回放的速率基于记录数计算,按照每秒 replayRate 条记录进行回放。假设输入表的总记录条数为 total(多表回放时以记录数最多的表为基准),那么回放该表所需的时间大致为 total/replayRate 秒。

  • 倍速回放:如果 replayRate 为正整数,并且 absoluteRate 为 false,则将输入表中的数据按照时间加速 replayRate 倍回放。假设输入表中 dateColumntimeColumn 的最大时间戳和最小时间戳相差 n 秒,数据总数为 s,则系统每秒回放 replayRate*s/n 条记录(每秒至少回放 1 条),即回放时间大致为 n/replayRate 秒。

  • 极速回放:如果 replayRate 未指定或者为负,无论 absoluteRate 为 false 还是 true,系统都将以最快的速率进行回放。回放所需的时间与 DolphinDB 所在的服务器性能有关。

上述回放耗时主要基于同构回放估算,异构回放的耗时开销会略高于上述估算的回放耗时。

如需回放历史数据, 可通过 replayDS 函数直接生成数据源来作为 replay 的输入。

注意:进行多对一回放时,若 inputTables 为数据源,各数据源 size 以及其 dateColumn 列对应的时间范围必须保持一致。

异构回放时,replay 的输出结果可以通过 streamFilter 进行反序列化,并进行数据过滤和分离,详情请参考 streamFilter

使用 cancelJobcancelConsoleJob 命令可以终止回放。

例子

例1. 回放一张表

$ n=1000
$ sym = take(`IBM,n)
$ timestamp= take(temporalAdd(2012.12.06T09:30:12.000,1..500,'s'),n)
$ volume = rand(100,n)
$ trades=table(sym,timestamp,volume)
$ trades.sortBy!(`timestamp)
$ share streamTable(100:0,`sym`timestamp`volume,[SYMBOL,TIMESTAMP,INT]) as st
  • 每秒回放100条数据:

$ timer replay(inputTables=trades, outputTables=st, dateColumn=`timestamp, timeColumn=`timestamp,replayRate=100, absoluteRate=true);
Time elapsed: 10001.195 ms

表 trades 中一共有1000条数据,每秒回放100条耗时大约10秒。

  • 加速100倍时间回放:

$ timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp,replayRate=100,absoluteRate=false);
Time elapsed: 5001.909 ms

表 trades 中的最大时间与最小时间相差500秒,加速 100 倍时间回放耗时大约5秒。

  • 以最快的速率回放:

$ timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp);
Time elapsed: 2.026 ms

例2. 可以将多个数据源回放到一个或多个输出表中。

$ n=5000000
$ sym = rand(symbol(`IBM`APPL`MSFT`GOOG`GS),n)
$ date=take(2012.06.12..2012.06.16,n)
$ time=rand(13:30:11.008..13:30:11.012,n)
$ volume = rand(100,n)
$ t=table(sym,date,time,volume)
$ if(existsDatabase("dfs://test_stock")){
$ dropDatabase("dfs://test_stock")
$ }
$ db=database("dfs://test_stock",VALUE,2012.06.12..2012.06.16)
$ trades=db.createPartitionedTable(t,`trades,`date)
$ trades.append!(t)

$ share streamTable(100:0,`sym`date`time`volume,[SYMBOL,DATE,TIME,INT]) as st1
$ share streamTable(100:0,`sym`date`time`volume,[SYMBOL,DATE,TIME,INT]) as st2

$ ds1=replayDS(sqlObj=<select * from trades where date=2012.06.12>,dateColumn=`date,timeColumn=`time,timeRepartitionSchema=[13:30:11.008,13:30:11.010,13:30:11.013])
$ ds2=replayDS(sqlObj=<select * from trades where date=2012.06.16>,dateColumn=`date,timeColumn=`time,timeRepartitionSchema=[13:30:11.008,13:30:11.010,13:30:11.013])

$ replay(inputTables=[ds1,ds2],outputTables=[st1,st2],dateColumn=`date,timeColumn=`time);       //将ds1和ds2分别回放到st1和st2中
$ select * from st1;

sym

date

time

price

GOOG

2012.06.12

13:30:11.008

80.257105

MSFT

2012.06.12

13:30:11.008

19.509125

MSFT

2012.06.12

13:30:11.008

16.856134

GS

2012.06.12

13:30:11.008

77.784879

MSFT

2012.06.12

13:30:11.008

33.182977

GOOG

2012.06.12

13:30:11.008

0.411022

IBM

2012.06.12

13:30:11.008

64.201467

GS

2012.06.12

13:30:11.008

2.961122

GOOG

2012.06.12

13:30:11.008

19.06283

IBM

2012.06.12

13:30:11.008

9.793819

...

$ select * from st2;

sym

date

time

price

GS

2012.06.16

13:30:11.008

65.34449

GS

2012.06.16

13:30:11.008

69.313035

GOOG

2012.06.16

13:30:11.008

83.343602

IBM

2012.06.16

13:30:11.008

1.09428

APPL

2012.06.16

13:30:11.008

37.420217

APPL

2012.06.16

13:30:11.008

29.142734

IBM

2012.06.16

13:30:11.008

81.364092

GS

2012.06.16

13:30:11.008

94.248142

GS

2012.06.16

13:30:11.008

35.037498

GS

2012.06.16

13:30:11.008

19.514272

...

$ share streamTable(200:0,`sym`date`time`volume,[SYMBOL,DATE,TIME,INT]) as st3
$ replay(inputTables=[ds1,ds2],outputTables=st3,dateColumn=`date,timeColumn=`time);     //将 ds1 和 ds2 都回放到 st3 中
$ select count(*) from st3
2000000

例3. 异构回放

 $ n=1000
 $ sym = take(take(`IBM,n).join(take(`GS,n)), n)
 $ myDate=take(2021.01.02..2021.01.06, n).sort!()
 $ myTime=take(09:30:00..15:59:59,n)
 $ vol = rand(100, n)
 $ t=table(sym,myDate,myTime,vol)


 $ n=1000
 $ sym = take(take(`IBM,n).join(take(`GS,n)), n)
 $ date=take(2021.01.02..2021.01.06, n).sort!()
 $ time=take(09:30:00..15:59:59,n)
 $ vol = rand(100, n)
 $ t1=table(sym, date,time,vol)

 $ if(existsDatabase("dfs://test_stock")){
   dropDatabase("dfs://test_stock")
 }
 $ db1=database("",RANGE, 2021.01.01..2021.01.12)
 $ db2=database("",VALUE,`IBM`GS)
 $ db=database("dfs://test_stock",COMPO,[db1, db2])
 $ trades=db.createPartitionedTable(t,`trades,`myDate`sym)
 $ trades.append!(t);
 $ trades1=db.createPartitionedTable(t1,`trades1,`date`sym)
 $ trades1.append!(t1);

 //获取数据源

 $ ds = replayDS(sqlObj=<select * from loadTable(db, `trades)>, dateColumn=`myDate, timeColumn=`myTime)
 $ ds.size();

 $ ds1 = replayDS(sqlObj=<select * from loadTable(db, `trades1)>, dateColumn=`date, timeColumn=`time)
 $ ds1.size();

 $ input_dict  = dict(["msg1", "msg2"], [ds, ds1])
 $ date_dict = dict(["msg1", "msg2"], [`myDate, `date])
 $ time_dict = dict(["msg1", "msg2"], [`myTime, `time])
 $ opt = streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT])
 $ replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict,  replayRate=1000, absoluteRate=false);
 $ select top 10 * from opt;