replay

根据指定的回放模式,将一个或多个数据表(数据源)的数据以一定的速率回放到数据表中,以模拟流数据实时注入。

语法

replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [absoluteRate=true], [parallelLevel=1])

详情

按时间顺序将一个或多个数据表或数据源回放到数据表,以模拟流数据实时注入。如需回放历史数据, 可通过 replayDS 函数生成数据源来输入 replay 。

根据输入表和输出表的映射关系(map),replay 支持一对一,N 对一,N 对 N 三种回放形式。注意 N 对 N 回放时,若多表时间段存在重合,不能保证该段时间完全按时间序回放。

对于 N 对一回放,1.30.17 版本前,输入表结构必须相同,称为同构回放。该版本后,replay 支持以字典的形式输入结构不同的表,称为异构回放。异构回放时,replay 的输出结果可以通过 streamFilter 进行反序列化,并进行数据过滤和分离,详情请参考 streamFilter

注意:进行多对一回放时,若 inputTables 为数据源,各数据源 size 以及其 dateColumn 列对应的时间范围必须保持一致。

如需终止回放,使用 cancelJobcancelConsoleJob 命令可以终止回放。

1.30.12 新版功能: N对一同构回放

1.30.17 新版功能: 异构流表回放

参数

inputTables 是未分区的内存表或 replayDS 函数生成的数据源。

  • 如果 inputTables 是单个或多个结构相同的表对象或数据源, 用标量或元组表示。

  • 如果 inputTables 是多个结构不同的表对象或数据源,用字典表示。字典的 key 是用户自定义的字符串,表示输入表或数据源的唯一标识,value 是表对象或数据源。

outputTables 是内存表或流数据表对象。若输出表是共享表,可以指定为共享表的表名。根据回放形式的不同,outputTables 的取值有所区别:

(1)一对一回放或 N 对一同构回放时,outputTables 是一个表对象或字符串标量,必须与输入表的 schema 相同。

(2)N 对 N 回放,outputTables 是字符串向量或表单对象组成的元组,且它的长度必须与 inputTables 相同。输出表和输入表一一对应,且表的 schema 必须相同。

(3)N 对一异构回放时 outputTables 是一个标量,至少包含三列:

  • 第一列为 dateColumntimeColumn 指定的回放时间的时间戳;

  • 第二列为 SYMBOL 或 STRING 类型,对应 inputTables 字典的 key;

  • 第三列为 BLOB 类型,用于存储被回放的每条记录序列化后的结果。

此外,可输出各输入表的公共列(列名和类型一致的列)。

dateColumntimeColumn 为时间列的列名,至少指定其中一个参数。根据回放形式的不同,dateColumntimeColumn 的取值有所区别:

  • 一对一回放或 N 对一同构回放: 输入表与输出表的时间列列名必须相同,指定为字符串标量;

  • N 对 N 回放:输入表时间列的列名不同,指定为字符串向量;

  • N 对一异构回放:输入表时间列的列名不同,指定为字典,其中 key 为输入表的唯一标识符,value 是对应表时间列的列名。

只指定 dateColumntimeColumn 中的一个参数,或者 dateColumntimeColumn 指定为同一列时,对指定的时间列的类型没有限制。

若指定 dateColumntimeColumn 为不同列,dateColumn 必须是 DATE 类型,timeColumn 只能是 SECOND, TIME 或 NANOTIME 类型。

replayRate 整数,和参数 absoluteRate 共同决定了回放的速率。

absoluteRate 布尔值。默认值为 true,表示每秒按 replayRate 指定的记录数回放。若为 false,表示依照数据中的时间戳加速 replayRate 倍回放。

根据不同的场景需求,replay 提供以下三种回放模式:

  • 匀速回放:如果 replayRate 为正整数,且 absoluteRate 为 true,则回放的速率基于记录数计算,按照每秒 replayRate 条记录进行回放。假设输入表的总记录条数为 total(多表回放时为所有表的总记录数),那么回放该表所需的时间大致为 total/replayRate 秒。

  • 倍速回放:如果 replayRate 为正整数,并且 absoluteRate 为 false,则将输入表中的数据按照时间加速 replayRate 倍回放。假设输入表中 dateColumntimeColumn 的最大时间戳和最小时间戳相差 n 秒,数据总数为 s,则系统每秒回放 replayRate*s/n 条记录(每秒至少回放 1 条),即回放时间大致为 n/replayRate 秒。

  • 极速回放:如果 replayRate 未指定或者为负,无论 absoluteRate 为 false 还是 true,系统都将以最快的速率进行回放。回放所需的时间与 DolphinDB 所在的服务器性能有关。

上述回放耗时主要基于同构回放估算,异构回放的耗时开销会略高于上述估算的回放耗时。

parallelLevel 正整数,表示从数据源加载数据到内存的工作线程数量,默认值为 1。如果 inputTables 不是数据源,无需指定该参数。

例子

例1. 回放一张表

$ n=1000
$ sym = take(`IBM,n)
$ timestamp= take(temporalAdd(2012.12.06T09:30:12.000,1..500,'s'),n)
$ volume = rand(100,n)
$ trades=table(sym,timestamp,volume)
$ trades.sortBy!(`timestamp)
$ share streamTable(100:0,`sym`timestamp`volume,[SYMBOL,TIMESTAMP,INT]) as st
  • 每秒回放100条数据:

$ timer replay(inputTables=trades, outputTables=st, dateColumn=`timestamp, timeColumn=`timestamp,replayRate=100, absoluteRate=true);
Time elapsed: 10001.195 ms

表 trades 中一共有1000条数据,每秒回放100条耗时大约10秒。

  • 加速100倍时间回放:

$ timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp,replayRate=100,absoluteRate=false);
Time elapsed: 5001.909 ms

表 trades 中的最大时间与最小时间相差500秒,加速 100 倍时间回放耗时大约5秒。

  • 以最快的速率回放:

$ timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp);
Time elapsed: 2.026 ms

例2. 可以将多个数据源回放到一个或多个输出表中。

$ n=5000000
$ sym = rand(symbol(`IBM`APPL`MSFT`GOOG`GS),n)
$ date=take(2012.06.12..2012.06.16,n)
$ time=rand(13:30:11.008..13:30:11.012,n)
$ volume = rand(100,n)
$ t=table(sym,date,time,volume)
$ if(existsDatabase("dfs://test_stock")){
$ dropDatabase("dfs://test_stock")
$ }
$ db=database("dfs://test_stock",VALUE,2012.06.12..2012.06.16)
$ trades=db.createPartitionedTable(t,`trades,`date)
$ trades.append!(t)

$ share streamTable(100:0,`sym`date`time`volume,[SYMBOL,DATE,TIME,INT]) as st1
$ share streamTable(100:0,`sym`date`time`volume,[SYMBOL,DATE,TIME,INT]) as st2

$ ds1=replayDS(sqlObj=<select * from trades where date=2012.06.12>,dateColumn=`date,timeColumn=`time,timeRepartitionSchema=[13:30:11.008,13:30:11.010,13:30:11.013])
$ ds2=replayDS(sqlObj=<select * from trades where date=2012.06.16>,dateColumn=`date,timeColumn=`time,timeRepartitionSchema=[13:30:11.008,13:30:11.010,13:30:11.013])

$ replay(inputTables=[ds1,ds2],outputTables=[st1,st2],dateColumn=`date,timeColumn=`time);       //将ds1和ds2分别回放到st1和st2中
$ select * from st1;

sym

date

time

price

GOOG

2012.06.12

13:30:11.008

80.257105

MSFT

2012.06.12

13:30:11.008

19.509125

MSFT

2012.06.12

13:30:11.008

16.856134

GS

2012.06.12

13:30:11.008

77.784879

MSFT

2012.06.12

13:30:11.008

33.182977

GOOG

2012.06.12

13:30:11.008

0.411022

IBM

2012.06.12

13:30:11.008

64.201467

GS

2012.06.12

13:30:11.008

2.961122

GOOG

2012.06.12

13:30:11.008

19.06283

IBM

2012.06.12

13:30:11.008

9.793819

...

$ select * from st2;

sym

date

time

price

GS

2012.06.16

13:30:11.008

65.34449

GS

2012.06.16

13:30:11.008

69.313035

GOOG

2012.06.16

13:30:11.008

83.343602

IBM

2012.06.16

13:30:11.008

1.09428

APPL

2012.06.16

13:30:11.008

37.420217

APPL

2012.06.16

13:30:11.008

29.142734

IBM

2012.06.16

13:30:11.008

81.364092

GS

2012.06.16

13:30:11.008

94.248142

GS

2012.06.16

13:30:11.008

35.037498

GS

2012.06.16

13:30:11.008

19.514272

...

$ share streamTable(200:0,`sym`date`time`volume,[SYMBOL,DATE,TIME,INT]) as st3
$ replay(inputTables=[ds1,ds2],outputTables=st3,dateColumn=`date,timeColumn=`time);     //将 ds1 和 ds2 都回放到 st3 中
$ select count(*) from st3
2000000

例3. 异构回放

$ n=1000
$ sym = take(take(`IBM,n).join(take(`GS,n)), n)
$ myDate=take(2021.01.02..2021.01.06, n).sort!()
$ myTime=take(09:30:00..15:59:59,n)
$ vol = rand(100, n)
$ t=table(sym,myDate,myTime,vol)


$ n=1000
$ sym = take(take(`IBM,n).join(take(`GS,n)), n)
$ date=take(2021.01.02..2021.01.06, n).sort!()
$ time=take(09:30:00..15:59:59,n)
$ vol = rand(100, n)
$ t1=table(sym, date,time,vol)

$ if(existsDatabase("dfs://test_stock")){
$    dropDatabase("dfs://test_stock")
$  }
$ db1=database("",RANGE, 2021.01.01..2021.01.12)
$ db2=database("",VALUE,`IBM`GS)
$ db=database("dfs://test_stock",COMPO,[db1, db2])
$ trades=db.createPartitionedTable(t,`trades,`myDate`sym)
$ trades.append!(t);
$ trades1=db.createPartitionedTable(t1,`trades1,`date`sym)
$ trades1.append!(t1);

//获取数据源

$ ds = replayDS(sqlObj=<select * from loadTable(db, `trades)>, dateColumn=`myDate, timeColumn=`myTime)
$ ds.size();

$ ds1 = replayDS(sqlObj=<select * from loadTable(db, `trades1)>, dateColumn=`date, timeColumn=`time)
$ ds1.size();

$ input_dict  = dict(["msg1", "msg2"], [ds, ds1])
$ date_dict = dict(["msg1", "msg2"], [`myDate, `date])
$ time_dict = dict(["msg1", "msg2"], [`myTime, `time])
$ opt = streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT])
$ replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict,  replayRate=1000, absoluteRate=false);
$ select top 10 * from opt;