流数据引擎

使用 DolphinDB 处理历史数据时,可以通过 SQL 语句配合内置计算函数进行查询和计算(全量或增量)。但在实时数据流计算场景下,计算要求高效和即时,全量查询和计算则无法满足该场景的需求。因此, DolphinDB 精心研发了适合流计算场景的引擎,系统内部采用了增量计算,优化了实时计算的性能。

实际应用中,流数据引擎的计算结果可以输出到共享内存表、流数据表、消息中间件、数据库、API 等终端,以做进一步的处理。计算复杂表达式时,亦可将多个流数据引擎通过级联的方式合并成一个复杂的数据流拓扑。

DolphinDB 共提供了 10 种引擎以应对不同的计算场景:

流数据计算引擎

时间序列引擎

DolphinDB 提供了三种时间序列引擎,分别为时间序列引擎(createTimeSeriesEngine)、 日级时间序列引擎(createDailyTimeSeriesEngine) 和 会话窗口引擎(createSessionWindowEngine),它们都以时间度量窗口。

日级时序引擎和时序引擎功能相近,可以按照指定频率对时序数据进行滑动聚合计算,如计算 K 线等。

日级时序引擎在时序引擎的基础上进一步扩展,除了可以实现时序引擎的全部功能外,它还可以指定交易时间段,将一个自然日之内各个交易时段开始之前的所有未参与计算的数据,并入该交易时段的第一个窗口进行计算。

在物联网或数字货币,外汇等无固定交易时段的场景,适合使用时序引擎;但在股票、期货市场等有固定交易时段的场景,更适合使用日级时序引擎。

相关教程:DolphinDB教程:流数据时序引擎

会话窗口引擎(createSessionWindowEngine)

会话窗口可以理解为一个活动阶段(数据产生阶段)。其前后都是非活动阶段(无数据产生阶段)。

会话窗口引擎与时间序列引擎极为相似,它们计算规则和触发计算的方式相同。不同之处在于时间序列引擎具有固定的窗口长度和滑动步长,但会话窗口引擎的窗口不是按照固定的频率产生的,其窗口长度也不是固定的。以引擎收到的第一条数据的时间戳作为第一个会话窗口的起始时间。会话窗口收到某条数据之后,若在指定的等待时间内仍未收到下一条新数据,则(该数据的时间戳 + 等待时间)是该窗口的结束时间。窗口结束后收到的第一条新数据的时间戳是新的会话窗口的起始时间。

以物联网场景为例,根据设备在线时间段的不同,可能某些时间段有大量数据产生,而某些时间段完全没有数据。若对这类特征的数据进行滑动窗口计算,无数据的窗口会增加不必要的计算开销。因此 DolphinDB 开发了会话窗口引擎,以解决此类问题。

响应式状态引擎(createReactiveStateEngine)

DolphinDB 流数据引擎所计算的因子可分为无状态因子与有状态因子。无状态因子仅根据最新一条数据即可完成计算,不需要之前的数据,亦不依赖之前的计算结果。有状态因子计算除需要最新的数据,还需要历史数据或之前计算得到的中间结果,统称为状态。因此有状态因子计算需要存储状态,以供后续因子计算使用,且每次计算都会更新状态。

响应式状态引擎每输入一条数据引擎都将触发一条结果输出,因此输入和输出数据量一致。响应式状态引擎的算子中只能包含向量函数,DolphinDB 针对生产业务中的常见状态算子(滑动窗口函数、累积函数、序列相关函数和 topN 相关函数等)进行优化,大幅提升了这些算子在响应式状态引擎中的计算效率。注意,响应式状态引擎目前仅支持系统优化过的状态函数。此外,DolphinDB 支持用户使用自定义函数封装复杂的状态算子,但需通过 @state 进行声明。

响应式状态引擎应用广泛,例如金融场景下计算有状态的高频因子、主买成交量占比,以及大小单资金流等;物联网场景下检测传感器数据流中的温度是否在持续上升等。

相关教程:金融高频因子的流批统一计算:DolphinDB响应式状态引擎介绍

横截面引擎(createCrossSectionalEngine)

横截面引擎适用于对截面数据(如每只股票代码最新时间戳下的数据)进行实时计算。如:金融场景下,使用某个指数的所有成分股的最新价格计算该指数的内在价值;工业物联网场景下,对某批设备最新温度求最值等等。

相关教程:流数据横截面引擎

异常检测引擎(createAnomalyDetectionEngine)

异常检测引擎根据指定的条件指标,自动对数据进行筛查,只有符合指标条件的数据才会输出。该引擎通常用于对实时数据进行监控的场景,因此在物联网场景下有较广泛的应用,金融领域的风控场景也可以使用该引擎。

  • 物联网:对设备进行监测,例如监测设备温度、湿度;电力监控,例如电压,用电量等。

  • 金融:风控场景,例如根据指定规则过滤订单,监控股票成交量,设置超量预警信号等。

相关教程:流数据异常检测引擎

流数据连接引擎

通用 SQL 语句查询历史数据时,经常需要进行表关联操作。但在流数据场景下,若通过 SQL 进行表连接操作,就必须将流数据按固定的时间间隔保存快照后单独计算再合并计算结果,实现代码复杂且效率低下,达不到实时计算的需求。因此,DolphinDB 开发了针对流数据表间关联的连接引擎。

[ 参考附录表,快速了解引擎的关联机制 ]

Asof Join 引擎(createAsofJoinEngine)

asof join 是时间序列分析中常见的操作。在时间精度较高的场景下,通常两表的时间戳大部分不匹配,此时对于左表中新到来的一条记录,Asof Join 引擎将匹配当前右表中符合连接条件(例如同一支股票)且在该记录前时间戳最近的记录。

在处理美股数据以及国内逐笔数据时,可以使用 Asof Join 引擎关联交易数据表和报价数据表,用于计算交易成本。

等值连接引擎(createEquiJoinEngine)

equi Join 引擎通常用于关联两表公共列,用于将表信息汇总。例如:可以运用等值连接引擎关联日线行情数据和日行情指标数据,将多个指标关联在同一张表中,以进一步测试动态交易策略。

窗口连接引擎(createWindowJoinEngine)

window join 引擎以窗口聚合的结果替代单条记录的结果可以降低单条匹配结果的偶然性,如:在估计个股交易成本时,通常需要关联交易数据表和报价数据表,此时使用 Window Join 引擎可以降低单次报价的偶然性,通过窗口求均值或者中位数来获取一个更合理的报价基准。

以上三种引擎仅支持流数据表间的连接操作,为了关联流数据表和历史数据表(仅支持内存表与维度表),DolphinDB 提供了 Lookup Join 引擎。

Lookup Join 引擎(createLookupJoinEngine)

Lookup Join 引擎内部的连接机制为左连接(left join),该引擎用于将实时数据源与一张描述数据源固有属性的表(通常是维度表)关联,以实现表信息的补全功能。如在金融场景下,可通过 Lookup Join 引擎关联实时的交易价格表和保存汇率信息的维度表,以对交易进行进一步分析。

左半等值连接引擎(createLeftSemiJoinEngine)

left semi join 引擎通常对行情数据中的逐笔成交数据和逐笔委托数据进行关联。

拓展

某些特殊场景下,用户提供的因子可能涉及到多个不同的计算维度,即可能涉及到多种计算引擎的级联。此时,用户需要手动分解计算因子,将其拆分成不同的计算维度,然后分别在不同的流数据引擎中计算。通过将一种引擎作为另一种引擎的输入,可以将多个引擎串联起来。

注意:从 1.30.17 版本开始,DolphinDB 支持其他流数据引擎通过 getLeftStreamgetRightStream,为流数据 join 引擎注入数据,以实现非连接普通引擎和连接引擎间的级联。

为了带来更好的用户体验,减小用户开发脚本的复杂度,DolphinDB 为几种常见场景的计算引擎(时序引擎,响应式状态引擎,横截面引擎)研发了引擎流水线解析器(streamEngineParser)。该解析器支持直接输入涉及到以上三种引擎计算场景的复杂因子,引擎内部自动将其分解并分配给不同的引擎进行计算。

附录表

连接引擎

连接列

关联机制

createAsofJoinEngine

matchingColumn

左表每到来一条记录,匹配右表连接列一致且时间戳最近的一条记录。

createEquiJoinEngine

matchingColumn+timeColumn

左(右)表每到来一条记录,匹配右(左)表连接列一致的最新的一条记录。

createWindowJoinEngine

matchingColumn

左表每到来一条记录,匹配右表中连接列一致,且在由左表时间戳确定的窗口范围内的数据。

createLookupJoinEngine

matchingColumn

左表每到来一条记录,匹配右表连接列一致的最新的一条记录。

createLeftSemiJoinEngine

matchingColumn

对于左表的每一条记录,匹配右表连接列一致的第一条或最后一条记录。