pipeline

Parent Previous Next


语法


pipeline(initTasks, followers, [queueDepth=2])


参数


initTasks是所有任务初始步骤的集合,其中每个任务都是由无参数的函数表示。例如,我们有10个任务,那么initTasks是一个包含10个无参数函数的元组。


followers是一元函数的集合,每个函数代表初始步骤之后的一个步骤。如果一个任务有N个步骤,followers具有N-1个一元函数。followers的输出是下一个followers的输入。最后一个followers有可能返回一个对象。任务的初始步骤是在主线程(接受任务的线程)中执行的,剩下的步骤在单独的线程中执行。如果pipeline函数用于执行N个步骤的任务,系统会创建N-1个线程并且这些线程会在工作完成后销毁。


queueDepth是队列的最大长度。每个步骤的中间结果保存在队列中,用于下一个步骤。当队列满了,执行会中止,直到下一个步骤使用了队列中的数据。队列的长度越长,下一个步骤的等待时间越短。但是,长的队列占用更多内存。queueDepth的默认值是2。


详情


pipeline函数用于处理特殊类型的计算任务,这种类型任务包含多个步骤,并且每个步骤连续执行。


如果任务的最后一个步骤返回一个对象,pipeline函数返回一个元组。否则不返回任何内容。


例子


例1. 假设我们需要执行具有两个步骤的任务:生成随机数,然后使用这些数据执行耗时较长的计算。假设每个步骤需要1秒,那么整个任务需要2秒。如果使用单线程执行10次这样的任务,并且第一个任务需要连续地执行,那个需要耗费20秒。如果使用pipeline函数和2个线程,执行10次这样的任务只需11秒。



def dataGen(n): sort(rand(1.0, n))

def dataProc(v): std sin sqrt pow(deltas v,2);


>timer pipeline(each(partial{dataGen}, take(10000000,10)), dataProc);

Time elapsed: 3048.15 ms


>timer(10) dataProc(dataGen(10000000));

Time elapsed: 5276.07 ms



例2. 分区表stockData包含了2008年到2018年的数据。我们想要把它转换成单个csv文件。但是,该表的大小超过了系统的可用内存,因此我们不能把整个表加载到内存后,再转换成csv文件。我们把它分成两个步骤:一次加载一个月的数据到内存,然后把内存中的数据转换为csv文件。



v = 2000.01M..2018.12M

def queryData(m){

return select * from loadTable("dfs://stockDB", "stockData") where TradingTime between datetime(date(m)) : datetime(date(m+1))

}

def saveData(tb){

tb.saveText("/hdd/hdd0/data/stockData.csv",',', true)

}

pipeline(each(partial{queryData}, v),saveData);