Documentation
¶
Overview ¶
Arrow record batch flattener transforms complex types unsupported by Perspective (Struct, List, Map, Union) into simple columns. Structs are recursively flattened with dot-separated names; List, Map, and Union values are serialized to JSON strings.
Index ¶
- Variables
- func ColumnValue(a arrow.Array, i int) any
- func ContextWithValidateOnly(ctx context.Context) context.Context
- func DataClose(data any)
- func ExtractResponseData(path string, data map[string]any) any
- func FlattenRecord(rec arrow.RecordBatch, mem memory.Allocator) arrow.RecordBatch
- func FlattenSchema(schema *arrow.Schema) *arrow.Schema
- func IsValidateOnlyContext(ctx context.Context) bool
- func NeedsFlatten(schema *arrow.Schema) bool
- func ParseJsonValue(v any) (map[string]interface{}, error)
- func RecordToJSON(rec arrow.RecordBatch, asArray bool, w io.Writer) error
- func RecordsColNums(rr []arrow.RecordBatch) int64
- func RecordsRowNums(rr []arrow.RecordBatch) int64
- func ReleaseRecords(rr []arrow.RecordBatch)
- func RetainRecords(rr []arrow.RecordBatch)
- func WarpGraphQLError(err error) gqlerror.List
- type ArrowTable
- type ArrowTableChunked
- func (t *ArrowTableChunked) Append(rec arrow.RecordBatch)
- func (t *ArrowTableChunked) Chunk(i int) arrow.RecordBatch
- func (t *ArrowTableChunked) DecodeMsgpack(dec *msgpack.Decoder) error
- func (t *ArrowTableChunked) EncodeMsgpack(enc *msgpack.Encoder) error
- func (t *ArrowTableChunked) Info() string
- func (t *ArrowTableChunked) MarshalJSON() ([]byte, error)
- func (t *ArrowTableChunked) NumChunks() int
- func (t *ArrowTableChunked) NumCols() int
- func (t *ArrowTableChunked) NumRows() int
- func (t *ArrowTableChunked) Reader(retain bool) (array.RecordReader, error)
- func (t *ArrowTableChunked) Records() ([]arrow.RecordBatch, error)
- func (t *ArrowTableChunked) Release()
- func (t *ArrowTableChunked) Retain()
- func (t *ArrowTableChunked) RowData(i int) (map[string]any, bool)
- func (t *ArrowTableChunked) SetInfo(info string)
- type ArrowTableStream
- func (t *ArrowTableStream) DecodeMsgpack(dec *msgpack.Decoder) error
- func (t *ArrowTableStream) EncodeMsgpack(enc *msgpack.Encoder) error
- func (t *ArrowTableStream) Info() string
- func (t *ArrowTableStream) MarshalJSON() ([]byte, error)
- func (t *ArrowTableStream) Reader(retain bool) (array.RecordReader, error)
- func (t *ArrowTableStream) Records() ([]arrow.RecordBatch, error)
- func (t *ArrowTableStream) Release()
- func (t *ArrowTableStream) Retain()
- func (t *ArrowTableStream) SetInfo(info string)
- type CatalogSource
- type CatalogSourceType
- type DataSource
- type DataSourceType
- type DateTime
- type JQRequest
- type JsonValue
- type OperationResult
- type Querier
- type Request
- type Response
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoData = errors.New("no data") ErrWrongDataPath = errors.New("wrong data path") )
Functions ¶
func FlattenRecord ¶ added in v0.3.6
func FlattenRecord(rec arrow.RecordBatch, mem memory.Allocator) arrow.RecordBatch
FlattenRecord transforms a record batch by recursively flattening Struct fields and converting List/Map/Union fields to JSON strings. If the record has no complex types, it is returned as-is (with Retain). The caller must Release the returned record.
func FlattenSchema ¶ added in v0.3.6
FlattenSchema returns the flattened schema without processing data.
func IsValidateOnlyContext ¶
func NeedsFlatten ¶ added in v0.3.6
NeedsFlatten returns true if the schema contains any complex types (Struct, List, Map, Union) that need transformation.
func ParseJsonValue ¶
func RecordToJSON ¶
func RecordsColNums ¶
func RecordsColNums(rr []arrow.RecordBatch) int64
func RecordsRowNums ¶
func RecordsRowNums(rr []arrow.RecordBatch) int64
func ReleaseRecords ¶
func ReleaseRecords(rr []arrow.RecordBatch)
func RetainRecords ¶
func RetainRecords(rr []arrow.RecordBatch)
func WarpGraphQLError ¶
Types ¶
type ArrowTable ¶
type ArrowTableChunked ¶
type ArrowTableChunked struct {
// contains filtered or unexported fields
}
func NewArrowTable ¶
func NewArrowTable() *ArrowTableChunked
func NewArrowTableFromReader ¶
func NewArrowTableFromReader(reader array.RecordReader) (*ArrowTableChunked, error)
func (*ArrowTableChunked) Append ¶
func (t *ArrowTableChunked) Append(rec arrow.RecordBatch)
func (*ArrowTableChunked) Chunk ¶
func (t *ArrowTableChunked) Chunk(i int) arrow.RecordBatch
func (*ArrowTableChunked) DecodeMsgpack ¶
func (t *ArrowTableChunked) DecodeMsgpack(dec *msgpack.Decoder) error
func (*ArrowTableChunked) EncodeMsgpack ¶
func (t *ArrowTableChunked) EncodeMsgpack(enc *msgpack.Encoder) error
func (*ArrowTableChunked) Info ¶
func (t *ArrowTableChunked) Info() string
func (*ArrowTableChunked) MarshalJSON ¶
func (t *ArrowTableChunked) MarshalJSON() ([]byte, error)
func (*ArrowTableChunked) NumChunks ¶
func (t *ArrowTableChunked) NumChunks() int
func (*ArrowTableChunked) NumCols ¶
func (t *ArrowTableChunked) NumCols() int
func (*ArrowTableChunked) NumRows ¶
func (t *ArrowTableChunked) NumRows() int
func (*ArrowTableChunked) Reader ¶
func (t *ArrowTableChunked) Reader(retain bool) (array.RecordReader, error)
func (*ArrowTableChunked) Records ¶
func (t *ArrowTableChunked) Records() ([]arrow.RecordBatch, error)
func (*ArrowTableChunked) Release ¶
func (t *ArrowTableChunked) Release()
func (*ArrowTableChunked) Retain ¶
func (t *ArrowTableChunked) Retain()
func (*ArrowTableChunked) RowData ¶
func (t *ArrowTableChunked) RowData(i int) (map[string]any, bool)
func (*ArrowTableChunked) SetInfo ¶
func (t *ArrowTableChunked) SetInfo(info string)
type ArrowTableStream ¶
type ArrowTableStream struct {
// contains filtered or unexported fields
}
func NewArrowTableStream ¶
func NewArrowTableStream(reader array.RecordReader) *ArrowTableStream
func (*ArrowTableStream) DecodeMsgpack ¶
func (t *ArrowTableStream) DecodeMsgpack(dec *msgpack.Decoder) error
func (*ArrowTableStream) EncodeMsgpack ¶
func (t *ArrowTableStream) EncodeMsgpack(enc *msgpack.Encoder) error
func (*ArrowTableStream) Info ¶
func (t *ArrowTableStream) Info() string
func (*ArrowTableStream) MarshalJSON ¶
func (t *ArrowTableStream) MarshalJSON() ([]byte, error)
func (*ArrowTableStream) Reader ¶
func (t *ArrowTableStream) Reader(retain bool) (array.RecordReader, error)
func (*ArrowTableStream) Records ¶
func (t *ArrowTableStream) Records() ([]arrow.RecordBatch, error)
func (*ArrowTableStream) Release ¶
func (t *ArrowTableStream) Release()
func (*ArrowTableStream) Retain ¶
func (t *ArrowTableStream) Retain()
func (*ArrowTableStream) SetInfo ¶
func (t *ArrowTableStream) SetInfo(info string)
type CatalogSource ¶
type CatalogSource struct {
Name string `json:"name"`
Type CatalogSourceType `json:"type"`
Path string `json:"path"`
Description string `json:"description"`
}
type CatalogSourceType ¶
type CatalogSourceType string
type DataSource ¶
type DataSource struct {
Name string `json:"name"`
Description string `json:"description"`
Type DataSourceType `json:"type"`
Prefix string `json:"prefix"`
Path string `json:"path"`
AsModule bool `json:"as_module"`
SelfDefined bool `json:"self_defined"`
ReadOnly bool `json:"read_only"`
Disabled bool `json:"disabled"`
Sources []CatalogSource `json:"catalogs"`
}
type DataSourceType ¶
type DataSourceType string
type DateTime ¶ added in v0.3.11
DateTime represents a naive date-time value WITHOUT timezone. Used by the DateTime scalar to distinguish from Timestamp (which uses time.Time directly). Engine SQLValue dispatches on this type to cast as TIMESTAMP instead of TIMESTAMPTZ.
type OperationResult ¶
type OperationResult struct {
Succeed bool `json:"success"`
Msg string `json:"message"`
Rows int `json:"affected_rows"`
LastId int `json:"last_id"`
}
func ErrResult ¶
func ErrResult(err error) *OperationResult
func Result ¶
func Result(msg string, rows, lastId int) *OperationResult
func SQLError ¶
func SQLError(msg string, err error) *OperationResult
func (*OperationResult) CollectSQL ¶
func (r *OperationResult) CollectSQL(res sql.Result)
func (*OperationResult) ToDuckdb ¶
func (r *OperationResult) ToDuckdb() map[string]any
type Querier ¶
type Querier interface {
Query(ctx context.Context, query string, vars map[string]any) (*Response, error)
RegisterDataSource(ctx context.Context, ds DataSource) error
LoadDataSource(ctx context.Context, name string) error
UnloadDataSource(ctx context.Context, name string) error
DataSourceStatus(ctx context.Context, name string) (string, error)
DescribeDataSource(ctx context.Context, name string, self bool) (string, error)
}