当用户执行Query时,Query被翻译成执行计划,物理执行计划其实就是ExecNode组成的DAG图。这其中扫描数据由ScanNode完成。 节点之间交互数据由ExchangeNode完成;
在节点之间交互的数据流采用推的方式,具体而言:
Impalad底层的读取文件的IO模型是典型的生产者-消费者模型,Scanner是消费者,而DiskIoMgr线程是生产者.
总结起来HIVE是算子树内部求值是推的方式,Task之间是拉的方式。而Impala则算子树求值是拉的方式,PlanFragment之间是推送的方式。 这样的优势我能想到两个明显的优势:
ImpalaServer在初始化环境时,创建ExecEnv对象
ExecEnv的构造函数 {
新建各种对象,与IO相关的,主要包括DataStreamMgr,DiskIoMgr;
DiskIoMgr的构造函数 {
设置每个disk对应一个DiskQueue;
设置每个disk读取的线程数目为FLAGS_num_threads_per_disk;
}
调用disk_io_mgr的Init() {
For 每个disk_queue:
构造DiskQueue对象;
每个DiskQueue启动num_threads_per_disk个线程;
线程的主循环是ReadLoop :{
while (true) {
调用GetNextScanRange获取下一个要读的ScanRange: {
DiskQueue采用Round-Robined轮转算法,取出队首ReaderContext;
获取ReaderContext的要读取的ScanRange;
判断进程是否超内存限制;
判断Reader是否超内存限制;
If 进程超内存限制,调用GcIoBuffers();
If 依然超内存限制,就Reader->Cancel()
If reader是Canceled状态:
continue;
If reader下一个ScanRange为空并且unstarted_ranges队列非空:
将reader的unstarted_ranges队列取出一个ScanRange放入到>
ready_to_start_ranges_队列;
// 这样做的主要目的在于节省GetNextScanRange等待时间
唤醒一个阻塞在读GetNextScanRange的线程;
将reader重新放回disk_queue的队列;这样线程可以处理其他ScanRange
}
GetFreeBuffer(reader);//分配max_reader_size_ byte
GetBufferDesc;
range->OpenScanRange //打开文件,然后fseek到要读取的offset;(如果hdfs_connection_为空,是本地读取)
更新Counter;
range->ReadFromScanRange //从打开的文件句柄中读取数据;
} // while true
}
}//Init
}
Prepare
tuple_desc_=state->desc_tbl().GetTupleDescriptor();
collect all materialized (partition key and not) slots
// Finally, populate materialized_slots_ and partition_key_slots_ in the order that
the slots appear in the file.
Codegen Scanner需要的函数
设置当thread可用时启用的callback函数HdfsScanNode::ThreadTokenAvailableCb,至少要求一个线程
ThreadTokenAvaliableCb的实现 {
只要有线程可启用,就启动Scanner线程。不过以下几种情况不用启动新线程:
1. ScanNode已经完成
2. 所有的Ranges已经被其他线程拿走 (?)
3. 剩余的Ranges < 活跃的线程数
线程的主循环 {
while (true) {
disk_io_mgr获取下一个ScanRange;
创建新的Scanner对象来处理这个ScanRange;
scanner->Prepare;
scanner->ProcessSplit;
scanner->Close;
}
}
}
//Open时初始化hdfs的连接,同时会启用Scanner线程. Scanner将向DiskIOMgr的队列添加Splits
Open(RuntimeState* state):
向disk_io_mgr注册Reader,传递hdfs_connections
调用ScanRange的每个Partition的PrepareExprs
初始化所有Scanner的Ranges
ProcessSplit:{
//对于每一个ScanRange,需要InitNewRange, ProcessRange
header = scan_node_->GetFileMetadata();
if (header_ == NULL)
ReadFileHeader(); // parsing header
scan_node_->SetFileMetadata();
scan_node_->AddDiskIoRanges();
InitNewRange;
// 第一条记录
if stream_->scan_range->offset() == 0 :
skip header;
else
skip to sync
do {
//处理当前的Range直到出错或者结束
//[要求在Data block的开始位置,这个通过sync marker来保证]
ProcessRange() {
while (!eof) {
GetRecord;
GetMemory;
如果当前slot需要物化,解析当前Record
否则,直接写入
}
}
如果不允许错误,退出;
否则,跳到下一个Sync
} while (!stream_->eosr());
}