PostgreSQL数据库FDW——Parquet S3 ParquetReader类

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

ParquetReader类定义和实现位于src/reader.cpp和src/reader.hpp下其主要作用是创建postgres列数据类型和parquet列数据类型的映射、获取arrow array元素值并包装为Datum、arrow nested list和map数据转为datum。首先从create_parquet_reader函数可以看出提供两种ParquetReaderDefaultParquetReader和CachingParquetReader。这两种ParquetReader都是ParquetReader类的子类。因此首先从ParquetReader类说起。

ParquetReader *create_parquet_reader(const char *filename, MemoryContext cxt, int reader_id, bool caching){
    if (!caching)
        return new DefaultParquetReader(filename, cxt, reader_id);
    else
        return new CachingParquetReader(filename, cxt, reader_id);
}

ParquetReader

构造函数/析构函数

ParquetReader类声明于src/reader.hpp中首先其构造函数ParquetReader::ParquetReader(MemoryContext cxt) : allocator(new FastAllocatorS3(cxt)) {}形参为内存上下文cxt用于构造FastAllocatorS3类初始化std::unique_ptr<FastAllocatorS3> allocator成员。由于FastAllocatorS3代码量较少这里先介绍一下该类FastAllocatorS3类包含构造函数、析构函数、fast_alloc和recycle函数其构造函数传入内存上下文并设置到segments_cxt成员中其他成员segment_start_ptr等设置为nullptr其析构函数就是调用recycle函数fast_alloc函数预分配一个大内存段并从中分配块当段耗尽时它将被添加到垃圾段列表中并在下一个执行器迭代时释放如果请求的大小大于SEGMENT_size则只使用palloc并将其添加到垃圾段列表中recycle函数用于将垃圾段列表中申请的内存进行pfree。

class FastAllocatorS3{
private: /* Special memory segment to speed up bytea/Text allocations. */
    MemoryContext       segments_cxt;
    char               *segment_start_ptr; // 当前还有剩余空间内存段起始位置
    char               *segment_cur_ptr;   // 当前还有剩余空间内存段已分配位置
    char               *segment_last_ptr;  // 当前还有剩余空间内存段结束位置
    std::list<char *>   garbage_segments;

public:
    FastAllocatorS3(MemoryContext cxt) : segments_cxt(cxt), segment_start_ptr(nullptr), segment_cur_ptr(nullptr), segment_last_ptr(nullptr), garbage_segments() {}
    ~FastAllocatorS3() { this->recycle(); }

    /* fast_alloc      Preallocate a big memory segment and distribute blocks from it. When      segment is exhausted it is added to garbage_segments list and freed on the next executor's iteration. If requested size is bigger that SEGMENT_SIZE then just palloc is used. */
    inline void *fast_alloc(long size) {
        void   *ret;
        
        if (size > SEGMENT_SIZE) /* If allocation is bigger than segment then just palloc */ /* #define SEGMENT_SIZE (1024 * 1024) */ {
            MemoryContext oldcxt = MemoryContextSwitchTo(this->segments_cxt); // 切换到内存上下文
            void *block = exc_palloc(size); // exc_palloc定义在src/common.cpp文件中利用postgres内存上下文context->methods->alloc函数分配size内存
            this->garbage_segments.push_back((char *) block); // 将其添加到垃圾段列表中
            MemoryContextSwitchTo(oldcxt);
            return block;
        }
        size = MAXALIGN(size);  // 需求的内存大小不大于SEGMENT_SIZE      
        if (this->segment_last_ptr - this->segment_cur_ptr < size) { /* If there is not enough space in current segment create a new one */   
            if (this->segment_start_ptr) /* Recycle the last segment at the next iteration (if there was one) */
                this->garbage_segments.push_back(this->segment_start_ptr); // 将其添加到垃圾段列表中
            MemoryContext oldcxt = MemoryContextSwitchTo(this->segments_cxt); // 重新分配SEGMENT_SIZE大小内存
            this->segment_start_ptr = (char *) exc_palloc(SEGMENT_SIZE);
            this->segment_cur_ptr = this->segment_start_ptr;
            this->segment_last_ptr = this->segment_start_ptr + SEGMENT_SIZE - 1;
            MemoryContextSwitchTo(oldcxt);
        }

        ret = (void *) this->segment_cur_ptr;
        this->segment_cur_ptr += size;   // 设置当前还有剩余空间内存段已分配位置
        return ret;
    }

    void recycle(void){ /* recycle old segments if any */
        if (!this->garbage_segments.empty()) {
            bool    error = false;
            PG_TRY();
            {
                for (auto it : this->garbage_segments) pfree(it);
            }
            PG_CATCH();
            {
                error = true;
            }
            PG_END_TRY();
            if (error) throw std::runtime_error("garbage segments recycle failed");
            this->garbage_segments.clear();
            elog(DEBUG1, "parquet_s3_fdw: garbage segments recycled");
        }
    }

    MemoryContext context() { return segments_cxt; }
};

reader_id作为ParquetReader类的成员主要用于在并行foreign scan时作为ParquetReader对象的标识在DefaultParquetReader和CachingParquetReader中初始化和使用ParquetReader父类不涉及仅提供getter函数int32_t ParquetReader::id() { return reader_id; }

column_mapping

column_mapping函数用于创建postgres列数据类型和parquet列数据类型的映射create_column_mapping函数创建postgresl tuple descriptor包含的列类型和parquet列之间的映射schemaless_create_column_mapping创建postgres jsonb和parquet列之间的映射。ParquetReader类有结构体成员TypeInfo其中arrow结构体代表parquet数据类型pg结构体代表postgres数据类型need_cast标志是否需要类型转换castfunc、outfunc、infunc就是其转换函数children就是list和map类型所包含成员的数据类型TypeInfo。

    struct TypeInfo{
        int             index; /* Column index in parquet schema. For complex types and children index is equal -1. Currently only used for checking column statistics. */
        struct{
            arrow::Type::type   type_id;
            std::string         type_name;
        } arrow;
        struct{
            Oid         oid;
            int16       len;    /*                         */
            bool        byval;  /* Only for array elements */
            char        align;  /*                         */
        } pg;

        /* Cast functions from dafult postgres type defined in `to_postgres_type` to actual table column type. */
        bool            need_cast;
        FmgrInfo       *castfunc;
        FmgrInfo       *outfunc; /* For cast via IO and for maps */
        FmgrInfo       *infunc;  /* For cast via IO              */
        
        std::vector<TypeInfo> children; /* Underlying types for complex types like list and map */

             
        TypeInfo() : arrow{}, pg{}, need_cast(false), castfunc(nullptr), outfunc(nullptr), infunc(nullptr), index(-1) {}
        TypeInfo(TypeInfo &&ti) : arrow(ti.arrow), pg(ti.pg), need_cast(ti.need_cast), castfunc(ti.castfunc), outfunc(ti.outfunc), infunc(ti.infunc), children(std::move(ti.children)), index(-1) {}

        TypeInfo(std::shared_ptr<arrow::DataType> arrow_type, Oid typid=InvalidOid) : TypeInfo(){
            arrow.type_id = arrow_type->id();
            arrow.type_name = arrow_type->name();
            pg.oid = typid; pg.len = 0; pg.byval = false; pg.align = 0;
        }
    };

preSortedColumnData结构体用于存放sorted column列的排序信息this->sorted_cols_data成员用于存放这些列的preSortedColumnData信息this->sorted_col_map成员存放的是列在sorted_cols中的位置。

    struct preSortedColumnData{
        bool        is_available;   /* true if column is existed */
        char       *col_name;       /* sorted column name */
        Datum       val;            /* sorted column actual data */
        bool        is_null;        /* true if sorted column is NULL */
        SortSupportData sortkey;    /* sortkey make from presorted column */
        preSortedColumnData() :is_available(false), is_null(true)
        {}
    };

set_schemaless_info函数设置Schemaless mode所需要的成员schemaless bool标识、list actual column和list sorted column。

void ParquetReader::set_schemaless_info(bool schemaless, std::set<std::string> slcols, std::set<std::string> sorted_cols){
    this->schemaless = schemaless;
    this->slcols = slcols;
    this->sorted_cols = sorted_cols;
}

上一节我们提及Schemaless mode涉及的点为No specific foreign foreign schema (column difinition) for each parquet file. 每个parquet文件没有特定的外来模式列定义。The schemaless foreign table has only one jsonb column to represent the data from the parquet file by following rule: Jsonb Key: parquet column name. Jsonb Value: parquet column data. schemaless 外部表只有一个jsonb列通过以下规则表示parquet文件中的数据JsonbKey: parquet列名。Jsonb值parquet列数据。从schemaless_create_column_mapping函数可以看出首先判定是否对列进行project确定结果集类型映射字典的大小然后遍历parquet file文件的schema中的列确定是需要的列target_col或sorted_col对arrow类类型向postgres类型转换使用TypeInfo结构体代表转换mapping结果所有结果存放到types成员中为sorted_col创建preSortedColumnData结构体并存放到this->sorted_cols_data成员并设置this->sorted_col_map成员(存放的是列在sorted_cols中的位置)。注意这里arrow提供的类parquet::arrow::SchemaManifest和成员schema_fields

/* schemaless_create_column_mapping
 *      - Create mapping between jsonb column and actual parquet columns.
 *      - Create sortkeys for sorted column if existed. */
void ParquetReader::schemaless_create_column_mapping(parquet::arrow::SchemaManifest  manifest) {
    std::set<std::string> slcols = this->slcols; // slcols为list actual column for schemaless mode 
    std::set<std::string> sorted_cols = this->sorted_cols; // sorted_cols为list sorted column for schemaless mode 
    bool                  is_select_all;
    if (slcols.size() > 0) { /* If slcols != NIL, get mapping for column existed in this list */
        is_select_all = false; // select涉及该foreign table的project需要指定列的映射
        this->sorted_col_map.resize(slcols.size()); // sorted_col_map Mapping between sorted column and arrow result set columns. Corresponds to 'sorted_cols' vector. 该成员为该函数的结果输出即postgresl列类型和parquet列类型的映射
    }else {
        /* Otherwise, get mapping for all column in the parquet file */
        is_select_all = true;  // select查询所有列schemaless模式下依据parquet file文件的schema来确定
        this->sorted_col_map.resize(manifest.schema_fields.size());
    }
    /* Create sortkeys list for column in sortted column list */
    if (sorted_cols.size() > 0) this->sorted_cols_data.resize(sorted_cols.size());

    for (auto &schema_field : manifest.schema_fields){ // 遍历parquet file文件的schema
        auto field_name = schema_field.field->name();  // 列名
        auto arrow_type = schema_field.field->type();  // 列类型
        char arrow_colname[255];
        if (field_name.length() > NAMEDATALEN) throw Error("parquet column name '%s' is too long (max: %d)",field_name.c_str(), NAMEDATALEN - 1);
        tolowercase(field_name.c_str(), arrow_colname);

        /* Find sorted column in parquet file */ // 找到该列在sorted column list的位置
        size_t sorted_col_idx = std::distance(sorted_cols.begin(), sorted_cols.find(arrow_colname));
        /* Column will be fetch if existed in slcol list or in select all column query */
        bool is_target_col = is_select_all || (slcols.find(arrow_colname) != slcols.end());

        /* Create mapping for target column, and get information for sorted column. */
        if (is_target_col || sorted_col_idx < sorted_cols.size()){
            TypeInfo        typinfo(arrow_type);
            bool            error(false);
            std::string     col_name = std::move(arrow_colname);     
            this->column_names.push_back(col_name); /* Found mapping! */

            switch (arrow_type->id()) {
                case arrow::Type::LIST: { // LIST类型转为postgres JSONB类型
                    Assert(schema_field.children.size() == 1);
                    Oid     elem_type; int16   elem_len; bool    elem_byval; char    elem_align;
                    PG_TRY();
                    {
                        auto child_arrow_type = schema_field.children[0].field->type(); // LIST成员的数据类型
                        elem_type = to_postgres_type(TypeInfo(child_arrow_type).arrow.type_id); // 转为postgres类型oid
                        if (OidIsValid(elem_type)) { get_typlenbyvalalign(elem_type, &elem_len, &elem_byval, &elem_align); }
                    }
                    PG_CATCH();
                    {
                        error = true;
                    }
                    PG_END_TRY();
                    
                    if (error) throw Error("parquet_s3_fdw: failed to get type length (column '%s')", col_name.c_str());
                    if (!OidIsValid(elem_type)) throw Error("parquet_s3_fdw: cannot convert parquet column of type LIST to scalar type of  postgres column '%s'", col_name.c_str());

                    auto     &child = schema_field.children[0];
                    typinfo.children.emplace_back(child.field->type(), elem_type);
                    typinfo.pg.oid = JSONBOID;

                    this->indices.push_back(child.column_index);  /* Arrow column indices that are used in query */
                    break;
                }
                case arrow::Type::MAP: { // MAP类型转为postgres JSONB类型
                    /*
                     * Map has the following structure:
                     *
                     * Type::MAP
                     * └─ Type::STRUCT
                     *    ├─  key type
                     *    └─  item type
                     */
                    Assert(schema_field.children.size() == 1);
                    auto &strct = schema_field.children[0];
                    Assert(strct.children.size() == 2);  // key type and item type
                    auto &key = strct.children[0]; auto &item = strct.children[1];
                    Oid pg_key_type = to_postgres_type(key.field->type()->id()); // 转换key type和item type到postgres类型
                    Oid pg_item_type = to_postgres_type(item.field->type()->id());
                    typinfo.pg.oid = JSONBOID;
                    typinfo.children.emplace_back(key.field->type(), pg_key_type);
                    typinfo.children.emplace_back(item.field->type(), pg_item_type);

                    PG_TRY();
                    {
                        typinfo.children[0].outfunc = find_outfunc(pg_key_type);
                        typinfo.children[1].outfunc = find_outfunc(pg_item_type);
                    }
                    PG_CATCH();
                    {
                        error = true;
                    }
                    PG_END_TRY();
                    if (error) throw Error("failed to initialize output function for Map column '%s'", col_name.c_str());
                    
                    this->indices.push_back(key.column_index);  /* Arrow column indices that are used in query */
                    this->indices.push_back(item.column_index);
                    break;
                }
                default: { // 其他arrow类型INT8、INT16、INT32、INT64、FLOAT、DOUBLE、TIMESTAMP、DATE32、STRING、BINARY                
                    typinfo.pg.oid = to_postgres_type(typinfo.arrow.type_id);
                    typinfo.index = schema_field.column_index;
                    this->indices.push_back(schema_field.column_index);
                }
            }
                     
            typinfo.need_cast = false; /* In schemaless mode, parquet data is read as mapped type, so, cast is not needed */
            this->types.push_back(std::move(typinfo));

            /* Create sortkey for sorted_col if this column existed in parquet file */
            if (sorted_col_idx < sorted_cols.size()){
                SortSupportData sort_key; Oid             sort_op;                
                std::string     error;
                
                preSortedColumnData sorted_col_data;   
                /* Init sorted col data */
                sorted_col_data.is_available = true; /* true if column is existed */
                sorted_col_data.col_name = pstrdup(col_name.c_str()); /* sorted column name */
                sorted_col_data.is_null = true; /* true if sorted column is NULL */
                memset(&sort_key, 0, sizeof(SortSupportData));  /* sortkey make from presorted column */
                /* Init sortkey data */
                sort_key.ssup_cxt = allocator->context();
                sort_key.ssup_collation = InvalidOid;
                sort_key.ssup_nulls_first = true;
                sort_key.ssup_attno = sorted_col_idx;
                sort_key.abbreviate = false;
                get_sort_group_operators(typinfo.pg.oid , true, false, false, &sort_op, NULL, NULL, NULL);
                PrepareSortSupportFromOrderingOp(sort_op, &sort_key);
                sorted_col_data.sortkey = sort_key;

                this->sorted_cols_data[sorted_col_idx] = sorted_col_data;
                this->sorted_col_map[this->column_names.size() - 1] = sorted_col_idx;
            }else{
                this->sorted_col_map[this->column_names.size() - 1] = -1;
            }
        }
    }
}

create_column_mapping函数创建tuple descriptor和parquet columns之间的映射首先获取SchemaManifest如果是schemaless则调用上述函数获取类型映射遍历tupleDesc中的列遍历parquet文件schemamanifest.schema_fields列比较postgres列名和arrow schema列名(Compare postgres attribute name to the column name in arrow schema)设置映射信息TypeInfo

/* create_column_mapping
 *      Create mapping between tuple descriptor and parquet columns. */
void ParquetReader::create_column_mapping(TupleDesc tupleDesc, const std::set<int> &attrs_used){
    parquet::ArrowReaderProperties  props;    
    parquet::arrow::SchemaManifest  manifest;      // 获取SchemaManifest
    // std::unique_ptr<parquet::arrow::FileReader> reader
    auto    p_schema = this->reader->parquet_reader()->metadata()->schema(); 
    if (!parquet::arrow::SchemaManifest::Make(p_schema, nullptr, props, &manifest).ok())
        throw std::runtime_error("parquet_s3_fdw: error creating arrow schema");
    
    if (this->schemaless) { /* get the column mapping for schemaless mode */
        schemaless_create_column_mapping(manifest); return;
    }

    this->map.resize(tupleDesc->natts); // map成员Mapping between slot attributes and arrow result set columns. Corresponds to 'indices' vector.
    for (int i = 0; i < tupleDesc->natts; i++) {
        AttrNumber  attnum = i + 1 - FirstLowInvalidHeapAttributeNumber;        
        const char *attname = NameStr(TupleDescAttr(tupleDesc, i)->attname);
        this->map[i] = -1;
        
        if (attrs_used.find(attnum) == attrs_used.end())  continue; /* Skip columns we don't intend to use in query */
        
        char        pg_colname[255];
        tolowercase(NameStr(TupleDescAttr(tupleDesc, i)->attname), pg_colname);

        for (auto &schema_field : manifest.schema_fields){
            auto field_name = schema_field.field->name();
            auto arrow_type = schema_field.field->type();            
            if (field_name.length() > NAMEDATALEN) throw Error("parquet column name '%s' is too long (max: %d)", field_name.c_str(), NAMEDATALEN - 1);
            char arrow_colname[255]; tolowercase(schema_field.field->name().c_str(), arrow_colname);

            /* Compare postgres attribute name to the column name in arrow schema. */
            if (strcmp(pg_colname, arrow_colname) == 0) {
                TypeInfo        typinfo(arrow_type);
                bool            error(false);           
                this->column_names.push_back(std::move(arrow_colname));  /* Found mapping! */                
                this->map[i] = this->column_names.size() - 1; /* index of last element */

                typinfo.pg.oid = TupleDescAttr(tupleDesc, i)->atttypid;
                switch (arrow_type->id()) {
                    case arrow::Type::LIST: {  // LIST类型转为postgres ARRAY类型
                        Assert(schema_field.children.size() == 1);
                        Oid     elem_type; int16   elem_len; bool    elem_byval; char    elem_align;
                        PG_TRY();
                        {
                            elem_type = get_element_type(typinfo.pg.oid); 
                            if (OidIsValid(elem_type)) { get_typlenbyvalalign(elem_type, &elem_len, &elem_byval, &elem_align); }
                        }
                        PG_CATCH();
                        {
                            error = true;
                        }
                        PG_END_TRY();
                        if (error) throw Error("failed to get type length (column '%s')", pg_colname);
                        if (!OidIsValid(elem_type)) throw Error("cannot convert parquet column of type LIST to scalar type of postgres column '%s'", pg_colname);

                        auto     &child = schema_field.children[0];
                        typinfo.children.emplace_back(child.field->type(), elem_type);
                        TypeInfo &elem = typinfo.children[0];
                        elem.pg.len = elem_len; elem.pg.byval = elem_byval;  elem.pg.align = elem_align;
                        initialize_cast(elem, attname); // Check wether implicit cast will be required and prepare cast function call

                        this->indices.push_back(child.column_index);
                        break;
                    }
                    case arrow::Type::MAP: {
                        /*  Map has the following structure:
                         * Type::MAP
                         * └─ Type::STRUCT
                         *    ├─  key type
                         *    └─  item type
                         */
                        Assert(schema_field.children.size() == 1);
                        auto &strct = schema_field.children[0];
                        Assert(strct.children.size() == 2);
                        auto &key = strct.children[0]; auto &item = strct.children[1];
                        Oid pg_key_type = to_postgres_type(key.field->type()->id());
                        Oid pg_item_type = to_postgres_type(item.field->type()->id());
                        typinfo.children.emplace_back(key.field->type(), pg_key_type);
                        typinfo.children.emplace_back(item.field->type(), pg_item_type);

                        PG_TRY();
                        {
                            typinfo.children[0].outfunc = find_outfunc(pg_key_type);
                            typinfo.children[1].outfunc = find_outfunc(pg_item_type);
                        }
                        PG_CATCH();
                        {
                            error = true;
                        }
                        PG_END_TRY();
                        if (error) throw Error("failed to initialize output function for Map column '%s'", attname);

                        this->indices.push_back(key.column_index);
                        this->indices.push_back(item.column_index);
                        /* JSONB might need cast (e.g. to TEXT) */
                        initialize_cast(typinfo, attname);
                        break;
                    }
                    default:
                        initialize_cast(typinfo, attname);
                        typinfo.index = schema_field.column_index;
                        this->indices.push_back(schema_field.column_index);
                }
                this->types.push_back(std::move(typinfo));

                break;
            }
        }
    }
}

do_cast函数依据TypeInfo提供的arrow类型向postgres类型转换的信息对Datum进行数据类型转换。

Datum ParquetReader::do_cast(Datum val, const TypeInfo &typinfo){
    MemoryContext   ccxt = CurrentMemoryContext;
    bool            error = false;
    char            errstr[ERROR_STR_LEN];
    
    PG_TRY(); /* du, du cast, du cast mich... */
    {
        if (typinfo.castfunc != NULL){  // 首先使用cast函数
            val = FunctionCall1(typinfo.castfunc, val);
        } else if (typinfo.outfunc && typinfo.infunc) { // 然后使用output和Input函数
            char *str = OutputFunctionCall(typinfo.outfunc, val);
            /* TODO: specify typioparam and typmod */
            val = InputFunctionCall(typinfo.infunc, str, 0, 0);
        }
    }
    PG_CATCH();
    {
        ErrorData *errdata;
        MemoryContextSwitchTo(ccxt);
        error = true; errdata = CopyErrorData();
        FlushErrorState();
        strncpy(errstr, errdata->message, ERROR_STR_LEN - 1);
        FreeErrorData(errdata);
    }
    PG_END_TRY();
    if (error)  throw std::runtime_error(errstr);
    return val;
}

read_primitive_type

read_primitive_type函数从arrow array中返回第i个元素的primitive type值其实就是获取array元素值并包装为Datum。

/* read_primitive_type Returns primitive type value from arrow array */
Datum ParquetReader::read_primitive_type(arrow::Array *array,  const TypeInfo &typinfo, int64_t i){
    Datum   res;    
    switch (typinfo.arrow.type_id){ /* Get datum depending on the column type */
        case arrow::Type::BOOL: {
            arrow::BooleanArray *boolarray = (arrow::BooleanArray *) array;
            res = BoolGetDatum(boolarray->Value(i));
            break;
        }
        case arrow::Type::INT8:{
            arrow::Int8Array *intarray = (arrow::Int8Array *) array;
            int value = intarray->Value(i);
            res = Int8GetDatum(value);
            break;
        }
        case arrow::Type::INT16: {
            arrow::Int16Array *intarray = (arrow::Int16Array *) array;
            int value = intarray->Value(i);
            res = Int16GetDatum(value);
            break;
        }
        case arrow::Type::INT32: {
            arrow::Int32Array *intarray = (arrow::Int32Array *) array;
            int value = intarray->Value(i);
            res = Int32GetDatum(value);
            break;
        }
        case arrow::Type::INT64: {
            arrow::Int64Array *intarray = (arrow::Int64Array *) array;
            int64 value = intarray->Value(i);
            res = Int64GetDatum(value);
            break;
        }
        case arrow::Type::FLOAT: {
            arrow::FloatArray *farray = (arrow::FloatArray *) array;
            float value = farray->Value(i);
            res = Float4GetDatum(value);
            break;
        }
        case arrow::Type::DOUBLE: {
            arrow::DoubleArray *darray = (arrow::DoubleArray *) array;
            double value = darray->Value(i);
            res = Float8GetDatum(value);
            break;
        }
        case arrow::Type::STRING:
        case arrow::Type::BINARY: {
            arrow::BinaryArray *binarray = (arrow::BinaryArray *) array;
            int32_t vallen = 0;
            const char *value = reinterpret_cast<const char*>(binarray->GetValue(i, &vallen));         
            int64 bytea_len = vallen + VARHDRSZ; /* Build bytea */
            bytea *b = (bytea *) this->allocator->fast_alloc(bytea_len);
            SET_VARSIZE(b, bytea_len);
            memcpy(VARDATA(b), value, vallen);
            res = PointerGetDatum(b);
            break;
        }
        case arrow::Type::TIMESTAMP:{ /* TODO: deal with timezones */
            TimestampTz ts;
            arrow::TimestampArray *tsarray = (arrow::TimestampArray *) array;
            auto tstype = (arrow::TimestampType *) array->type().get();
            to_postgres_timestamp(tstype, tsarray->Value(i), ts);
            res = TimestampGetDatum(ts);
            break;
        }
        case arrow::Type::DATE32: {
            arrow::Date32Array *tsarray = (arrow::Date32Array *) array;
            int32 d = tsarray->Value(i);
            /* Postgres date starts with 2000-01-01 while unix date (which* Parquet is using) starts with 1970-01-01. So we need to do simple calculations here. */
            res = DateADTGetDatum(d + (UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE));
            break;
        }
        /* TODO: add other types */
        default: throw Error("parquet_s3_fdw: unsupported column type: %s", typinfo.arrow.type_name.c_str());
    }    
    if (typinfo.need_cast) res = do_cast(res, typinfo); /* Call cast function if needed */
    return res;
}

GetPrimitiveValues函数从arrow::Array中获取C语言T类型的数组。copy_to_c_array函数将arrow::Array中elem_size个元素的数据拷贝到T类型数组中。

/* copy_to_c_array      memcpy plain values from Arrow array to a C array. */
template<typename T> inline void ParquetReader::copy_to_c_array(T *values, const arrow::Array *array, int elem_size) {
    const T *in = GetPrimitiveValues<T>(*array);
    memcpy(values, in, elem_size * array->length());
}

/* GetPrimitiveValues      Get plain C value array. Copy-pasted from Arrow. */
template <typename T> inline const T*ParquetReader::GetPrimitiveValues(const arrow::Array& arr) {
    if (arr.length() == 0) { return nullptr; }
    const auto& prim_arr = arrow::internal::checked_cast<const arrow::PrimitiveArray&>(arr);
    const T* raw_values = reinterpret_cast<const T*>(prim_arr.values()->data());
    return raw_values + arr.offset();
}

nested list和map数据转为datum

nested_list_to_datum函数将arrow array元素组装成postgres array仅支持一维array。通过ListArray和pos获取指定位置的Array进行元素获取和组装成postgres array。

/* nested_list_to_datum  Returns postgres array build from elements of array. Only one dimensional arrays are supported. */
Datum ParquetReader::nested_list_to_datum(arrow::ListArray *larray, int pos, const TypeInfo &typinfo){
    MemoryContext oldcxt;
    bool       *nulls = NULL;
    bool        error = false;

    std::shared_ptr<arrow::Array> array = larray->values()->Slice(larray->value_offset(pos), larray->value_length(pos));
    const TypeInfo &elemtypinfo = typinfo.children[0];
    Datum      *values = (Datum *) this->allocator->fast_alloc(sizeof(Datum) * array->length()); // 分配空间

#if SIZEOF_DATUM == 8  /* Fill values and nulls arrays */
    if (array->null_count() == 0 && typinfo.arrow.type_id == arrow::Type::INT64) {
        /* Ok, there are no nulls, so probably we could just memcpy the entire array.
         * Warning: the code below is based on the assumption that Datum is 8 bytes long, which is true for most contemporary systems but this will not work on some exotic or really old systems. */
        copy_to_c_array<int64_t>((int64_t *) values, array.get(), elemtypinfo.pg.len);
        goto construct_array;
    }
#endif
    for (int64_t i = 0; i < array->length(); ++i){
        if (!array->IsNull(i)) values[i] = this->read_primitive_type(array.get(), elemtypinfo, i);
        else {
            if (!nulls) {
                Size size = sizeof(bool) * array->length();
                nulls = (bool *) this->allocator->fast_alloc(size);
                memset(nulls, 0, size);
            }
            nulls[i] = true; // 对应位置设置null为true表明数据为null值
        }
    }

construct_array:  /* Construct one dimensional array. We have to use PG_TRY / PG_CATCH to prevent any kind leaks of resources allocated by c++ in case of errors. */
    int         dims[1]; int         lbs[1];
    dims[0] = array->length(); lbs[0] = 1;
    PG_TRY();
    {
        oldcxt = MemoryContextSwitchTo(allocator->context());
        ArrayType  *res = construct_md_array(values, nulls, 1, dims, lbs, elemtypinfo.pg.oid, elemtypinfo.pg.len, elemtypinfo.pg.byval, elemtypinfo.pg.align);
        MemoryContextSwitchTo(oldcxt);
    }
    PG_CATCH();
    {
        error = true;
    }
    PG_END_TRY();
    if (error) throw std::runtime_error("failed to constuct an array");
    
    return PointerGetDatum(res);
}

nested_list_to_jsonb_datum函数将arrow array元素组装成postgres jsonb仅支持一维array。通过ListArray和pos获取指定位置的Array进行元素获取和组装成postgres jsonb。

/* nested_list_to_jsonb_datum  Returns postgres JSONB array build from elements of array. Only one dimensional arrays are supported. */
Datum ParquetReader::nested_list_to_jsonb_datum(arrow::ListArray *larray, int pos, const TypeInfo &typinfo){
    JsonbParseState *parseState = NULL;
    Datum      *values;
    bool       *nulls = NULL;
    bool        error = false;

    std::shared_ptr<arrow::Array> array = larray->values()->Slice(larray->value_offset(pos), larray->value_length(pos));
    const TypeInfo &elemtypinfo = typinfo.children[0];
    values = (Datum *) this->allocator->fast_alloc(sizeof(Datum) * array->length());
    nulls = (bool *) this->allocator->fast_alloc(sizeof(bool) * array->length());
    memset(nulls, 0, sizeof(bool) * array->length());

#if SIZEOF_DATUM == 8 /* Fill values and nulls arrays */
    if (array->null_count() == 0 && typinfo.arrow.type_id == arrow::Type::INT64){
        /* Ok, there are no nulls, so probably we could just memcpy the entire array. Warning: the code below is based on the assumption that Datum is 8 bytes long, which is true for most contemporary systems but this will not work on some exotic or really old systems. */
        copy_to_c_array<int64_t>((int64_t *) values, array.get(), elemtypinfo.pg.len);
        goto construct_jsonb;
    }
#endif
    for (int64_t i = 0; i < array->length(); ++i) {
        if (!array->IsNull(i)) values[i] = this->read_primitive_type(array.get(), elemtypinfo, i);
        else nulls[i] = true;
    }

construct_jsonb: /* Construct one dimensional jsonb array. We have to use PG_TRY / PG_CATCH to prevent any kind leaks of resources allocated by c++ in case of errors. */
    PG_TRY();
    {
        JsonbValue *res = pushJsonbValue(&parseState, WJB_BEGIN_ARRAY, NULL);
        for (int64_t i = 0; i < array->length(); ++i) {
            datum_to_jsonb(values[i], elemtypinfo.pg.oid, nulls[i], elemtypinfo.outfunc, parseState, WJB_ELEM);
        }
        res = pushJsonbValue(&parseState, WJB_END_ARRAY, NULL);
    }
    PG_CATCH();
    {
        error = true;
    }
    PG_END_TRY();
    if (error) throw std::runtime_error("failed to constuct an jsonb");

    return JsonbPGetDatum(JsonbValueToJsonb(res));
}

map_to_datum函数将arrow MapArray类型中相应位置pos的map key item转换为postgres jsonb。

Datum ParquetReader::map_to_datum(arrow::MapArray *maparray, int pos, const TypeInfo &typinfo){
	JsonbParseState *parseState = NULL;
    JsonbValue *jb;
    auto keys = maparray->keys()->Slice(maparray->value_offset(pos), maparray->value_length(pos));
    auto values = maparray->items()->Slice(maparray->value_offset(pos), maparray->value_length(pos));
    Assert(keys->length() == values->length());
    Assert(typinfo.children.size() == 2);

    jb = pushJsonbValue(&parseState, WJB_BEGIN_OBJECT, NULL);
    for (int i = 0; i < keys->length(); ++i) {
        Datum   key = (Datum) 0, value = (Datum) 0;
        bool    isnull = false;
        const TypeInfo &key_typinfo = typinfo.children[0];
        const TypeInfo &val_typinfo = typinfo.children[1];
        
        if (keys->IsNull(i)) throw std::runtime_error("key is null");
        if (!values->IsNull(i)){
            key = this->read_primitive_type(keys.get(), key_typinfo, i);
            value = this->read_primitive_type(values.get(), val_typinfo, i);
        } else  isnull = true;

        /* TODO: adding cstring would be cheaper than adding text */
        datum_to_jsonb(key, key_typinfo.pg.oid, false, key_typinfo.outfunc, parseState, WJB_KEY);
        datum_to_jsonb(value, val_typinfo.pg.oid, isnull, val_typinfo.outfunc, parseState, WJB_VALUE);
    }
    jb = pushJsonbValue(&parseState, WJB_END_OBJECT, NULL);
    Datum res = JsonbPGetDatum(JsonbValueToJsonb(jb));

    if (typinfo.need_cast) res = do_cast(res, typinfo);

    return res;
}

DefaultParquetReader

CachingParquetReader

https://github.com/adjust/parquet_fdw

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: 数据库