Skip to content

Commit

Permalink
small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
zehiko authored and teh-cmc committed Oct 21, 2024
1 parent ac5e9e2 commit 0d658fe
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ message QueryRequest {
}

message QueryResponse {
// TODO(zehiko) we need to expand this to become something like 'encoder options'
// as we will need to specify additional options like compression, including schema
// in payload, etc.
EncoderVersion encoder_version = 1;
// payload is raw bytes that the relevant codec can interpret
bytes payload = 2;
Expand Down
43 changes: 22 additions & 21 deletions crates/store/re_remote_store_types/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ impl TransportMessageV0 {
MessageHader::RECORD_BATCH => {
let metadata = read::read_stream_metadata(&mut reader)
.map_err(CodecError::ArrowSerialization)?;
let mut sr = read::StreamReader::new(&mut reader, metadata, None);
let mut stream = read::StreamReader::new(&mut reader, metadata, None);

let schema = sr.schema().clone();
let schema = stream.schema().clone();
// there should be at least one record batch in the stream
// TODO(zehiko) isn't there a "read one record batch from bytes" function??
let stream_state = sr
// TODO(zehiko) isn't there a "read one record batch from bytes" arrow2 function??
let stream_state = stream
.next()
.ok_or(CodecError::MissingRecordBatch)?
.map_err(CodecError::ArrowSerialization)?;
Expand All @@ -123,14 +123,18 @@ impl TransportMessageV0 {
}
}

// TODO(zehiko) add support for separately encoding schema from the record batch to get rid of overhead
// of sending schema in each transport message for the same stream of batches. This will require codec
// to become stateful and keep track if schema was sent / received.
/// Encode a transport chunk into a byte stream.
pub fn encode(version: EncoderVersion, chunk: TransportChunk) -> Result<Vec<u8>, CodecError> {
match version {
EncoderVersion::V0 => TransportMessageV0::RecordBatch(chunk).to_bytes(),
}
}

/// Encode a `NoData` message into a byte stream.
/// Encode a `NoData` message into a byte stream. This can be used by the remote store
/// (i.e. data producer) to signal back to the client that there's no data available.
pub fn no_data(version: EncoderVersion) -> Result<Vec<u8>, CodecError> {
match version {
EncoderVersion::V0 => TransportMessageV0::NoData.to_bytes(),
Expand All @@ -152,18 +156,15 @@ pub fn decode(version: EncoderVersion, data: &[u8]) -> Result<Option<TransportCh

#[cfg(test)]
mod tests {
use re_dataframe::{
external::re_chunk::{Chunk, RowId},
TransportChunk,
};
use re_dataframe::external::re_chunk::{Chunk, RowId};
use re_log_types::{example_components::MyPoint, Timeline};

use crate::{
codec::{decode, encode, CodecError, TransportMessageV0},
v0::EncoderVersion,
};

fn get_test_chunk() -> TransportChunk {
fn get_test_chunk() -> Chunk {
let row_id1 = RowId::new();
let row_id2 = RowId::new();

Expand All @@ -179,13 +180,11 @@ mod tests {
let points1 = &[MyPoint::new(1.0, 1.0)];
let points2 = &[MyPoint::new(2.0, 2.0)];

let chunk = Chunk::builder("mypoints".into())
Chunk::builder("mypoints".into())
.with_component_batches(row_id1, timepoint1, [points1 as _])
.with_component_batches(row_id2, timepoint2, [points2 as _])
.build()
.unwrap();

chunk.to_transport().unwrap()
.unwrap()
}

#[test]
Expand All @@ -198,19 +197,18 @@ mod tests {

#[test]
fn test_message_v0_record_batch() {
let transport = get_test_chunk();
let expected_chunk = Chunk::from_transport(&transport).unwrap();
let expected_chunk = get_test_chunk();

let msg = TransportMessageV0::RecordBatch(transport);
let msg = TransportMessageV0::RecordBatch(expected_chunk.clone().to_transport().unwrap());
let data = msg.to_bytes().unwrap();
let decoded = TransportMessageV0::from_bytes(&data).unwrap();

#[allow(clippy::match_wildcard_for_single_variants)]
match decoded {
TransportMessageV0::RecordBatch(transport) => {
let decoded_chunk = Chunk::from_transport(&transport).unwrap();
assert_eq!(expected_chunk, decoded_chunk);
}
#[allow(clippy::match_wildcard_for_single_variants)]
_ => panic!("unexpected message type"),
}
}
Expand Down Expand Up @@ -240,10 +238,13 @@ mod tests {

#[test]
fn test_v0_codec() {
let transport_chunk = get_test_chunk();
let expected_chunk = Chunk::from_transport(&transport_chunk).unwrap();
let expected_chunk = get_test_chunk();

let encoded = encode(EncoderVersion::V0, transport_chunk.clone()).unwrap();
let encoded = encode(
EncoderVersion::V0,
expected_chunk.clone().to_transport().unwrap(),
)
.unwrap();
let decoded = decode(EncoderVersion::V0, &encoded).unwrap().unwrap();
let decoded_chunk = Chunk::from_transport(&decoded).unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ pub struct QueryRequest {
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueryResponse {
/// TODO(zehiko) we need to expand this to become something like 'encoder options'
/// as we will need to specify additional options like compression, including schema
/// in payload, etc.
#[prost(enumeration = "EncoderVersion", tag = "1")]
pub encoder_version: i32,
/// payload is raw bytes that the relevant codec can interpret
Expand Down

0 comments on commit 0d658fe

Please sign in to comment.