流数据

参考教程:流数据教程

流数据订阅

DolphinDB 集群中对流数据的处理采用发布-订阅的模式。流数据首先注入流数据表中,在发布端通过流数据表来发布数据,供其它数据节点或 API 订阅消费。若要开启流数据,需要在集群配置文件 cluster.cfg 为发布节点和订阅节点分别进行以下配置。

注意:为某个节点指定某个配置参数的格式为 alias.item,如:nodes1.subPort。(详情参考 集群路径配置说明

发布节点

发布节点支持配置一些发布数据相关的信息,如发布的消息块大小,消息队列深度。此外还需指定可以连接的订阅节点的连接数上限 maxPubConnections,由于该参数默认为 0,因此若启用流数据必须指定该参数为一个正数。

配置参数

解释

maxMsgNumPerBlock=1024

一个消息块中最多的记录条数。默认值为1024。

maxPersistenceQueueDepth=10000000

把流数据表保存到磁盘时,消息队列的最大深度(记录条数)。默认值为10,000,000。

maxPubQueueDepthPerSite=10000000

发布节点的消息队列的最大深度(记录条数)。默认值为10,000,000。

maxPubConnections=0

发布节点可以连接的订阅节点数量上限,默认值为0。只有指定 maxPubConnections 为正整数后,该节点才可作为发布节点。

DolphinDB 提供了流数据持久化的功能,其作用主要为:

  • 备份恢复流数据表,避免发布节点宕机,造成流数据表数据丢失。

  • 避免流表过大造成内存不足。

  • 支持从任意位置开始重新订阅。

开启持久化只需为发布节点的配置以下选项:

配置参数

解释

persistenceDir=/home/DolphinDB/Data/Persistence

共享流数据表的保存路径。如果要将流数据表保存到磁盘上,必须指定 persistenceDir。在集群模式中,需要保证同一机器上的数据节点配置了不同的 persistenceDir。

persistenceWorkerNum=1

负责以异步模式保存流数据表的工作线程数。若为高可用流数据表,该参数的默认值为 1;否则默认值为 0。

2.00.7 新版功能: 参数 persistenceWorkerNum 支持高可用流表

订阅节点

订阅节点订阅流数据表数据。其同发布节点一样,支持指定订阅消息的队列深度以及连接发布节点的数量上限(可以选择不指定,按默认值即可)。此外,订阅节点还可以对流数据进行消费,因此还支持对消息处理的线程数、消息处理时间间隔等进行配置。

配置参数

解释

subPort=8000

订阅线程监听的端口号。若要该节点作为订阅节点,必须指定该参数。

maxSubConnections=64

该订阅节点可以连接的的发布节点数量上限。默认值为64。

maxSubQueueDepth=10000000

该订阅节点的消息队列的最大深度(记录条数)。

subExecutorPooling=false

表示流计算线程是否为 pooling 模式。默认值为 false。注意:使用响应式状态引擎时,必须设置该参数为 false。

subExecutors=1

该订阅节点中消息处理线程的数量。只有当启用订阅功能时,该参数才有意义。默认值为1。如果 subExecutors = 0,表示该线程既可以进行消息转换也可以处理消息。

subThrottle

非负整数,单位为毫秒,默认值为 1000。系统检查订阅函数(subscribeTable)消息处理情况的时间间隔。

若 subscribeTable 的 throttle 参数指定了小于配置参数 subThrottle的值,则触发消息处理的时间间隔为 subThrottle。若要设置订阅函数消息处理的时间间隔小于1秒,则需要先修改配置项 subThrottle。

例如:要使 throttle=0.001 秒生效,需设置 subThrottle =1。(注意:只有 subscribeTable 函数指定参数 batchSize 后,该参数才有效。)

若订阅节点消费流数据时发生宕机,重启后可能会无法获知之前消费的进度。DolphinDB 支持将订阅消费数据的偏移量进行持久化,以避免此类情况的发生。

配置参数

解释

persistenceOffsetDir=/home/DolphinDB/streamlog

持久化订阅端消费数据偏移量的保存路径,用于保存订阅消费数据的偏移量。

若没有指定 persistenceOffsetDir,但指定了 persistenceDir,则会保存至 persistenceDir 目录;

如果既没指定 persistenceOffsetDir 也没指定 persistenceDir,会在节点目录下生成 streamlog 目录。

流数据高可用

参考教程: DolphinDB教程:流数据高可用

流数据高可用和集群高可用一样采用 raft 机制,不同的是集群高可用是控制节点的高可用,而流数据高可用为数据节点的高可用。流数据高可用分为发布端、订阅端、流数据计算引擎高可用三种,其高可用的 raft 组都通过 streamingRaftGroups 参数进行配置。

  • 发布端高可用(高可用流表):开启发布端高可用后,高可用流表自动在 raft 组内的节点进行同步。订阅端只需向 leader 节点订阅高可用流数据即可。若发布端 raft 组 leader 宕机,系统也可以迅速重新选举出新的 leader,供订阅端继续订阅。

  • 订阅端高可用:需在订阅函数 subscribeTable 中设置 reconnect=true,并指定 raftGroup。若订阅端 raft 组 leader 宕机,系统也可以迅速重新选举出新的 leader,继续从发布端订阅数据。

  • 流数据计算引擎高可用:通过配置引擎创建函数的参数 snapshot 和 raftGroup 实现高可用。参考流计算引擎详情页:流数据引擎

注意:启用高可用流表,必须开启发布节点的流表持久化。

配置参数

解释

streamingHAMode=raft

高可用功能采用的协议,目前固定配置为 raft,表明流数据高可用功能采用了 raft 协议。

streamingRaftGroups=2:NODE1:NODE2:NODE3,3:NODE3:NODE4:NODE5

raft 组信息,包含 ID 和组成 raft 组的数据节点别名,使用冒号分隔。raft 组的 ID 必须是大于1的整数,一个 raft 组至少包含3个不同的数据节点。如果有多个 raft 组,使用逗号分隔每个 raft 组的信息。

streamingHADir=/home/DolphinDB/Data/NODE1/log/streamLog

流数据 raft 日志文件的存储目录。如果没有指定,默认值为 <HomeDir>/log/streamLog 。每个数据节点应当配置不同的 streamingHADir。

streamingHAPurgeInterval=300

raft 日志垃圾回收周期。默认值300,单位为秒。