Skip to content

Commit

Permalink
Introduce codec for the gRPC QueryResponse payload (#7733)
Browse files Browse the repository at this point in the history
Initial version of codec for gRPC query response payload

### What

gRPC layer ``QueryResponse`` contains raw bytes as the payload. We keep
interpretation of the payload to our own encoder which this PR
introduces. V0 codec is simple and stateless and doesn't deal with the
fact every serialized record batch has schema included.

As for the encoding, we simply rely on arrow2 serialization and
deserialization constructs for the record batch and we have our own
header to differentiate between "no data" and a "record batch" in the
QueryResponse.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [ x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/7733?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/7733?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!
* [x] If have noted any breaking changes to the log API in
`CHANGELOG.md` and the migration guide

- [PR Build Summary](https://build.rerun.io/pr/7733)
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.
  • Loading branch information
zehiko authored Oct 21, 2024
1 parent 1e529e3 commit 14a7dd4
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5626,6 +5626,7 @@ name = "re_remote_store_types"
version = "0.20.0-alpha.1+dev"
dependencies = [
"prost 0.13.3",
"re_arrow2",
"re_dataframe",
"re_log_types",
"thiserror",
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_dataframe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub use self::query::QueryHandle;
#[doc(no_inline)]
pub use self::external::arrow2::chunk::Chunk as ArrowChunk;
#[doc(no_inline)]
pub use self::external::re_chunk::util::concatenate_record_batches;
pub use self::external::re_chunk::{util::concatenate_record_batches, TransportChunk};
#[doc(no_inline)]
pub use self::external::re_chunk_store::{
ColumnSelector, ComponentColumnSelector, Index, IndexRange, IndexValue, QueryExpression,
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_remote_store_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ re_log_types.workspace = true
re_dataframe.workspace = true

# External
arrow2 = { workspace = true, features = ["io_ipc"] }
prost.workspace = true
thiserror.workspace = true
tonic.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,21 @@ message QueryRequest {
// unique identifier of the recording
RecordingId recording_id = 1;
// query to execute
Query query = 2;
Query query = 3;
}

message QueryResponse {
// single record batch (encoding TBD - TODO).
bytes record_batch = 1;
// TODO(zehiko) we need to expand this to become something like 'encoder options'
// as we will need to specify additional options like compression, including schema
// in payload, etc.
EncoderVersion encoder_version = 1;
// payload is raw bytes that the relevant codec can interpret
bytes payload = 2;
}


enum EncoderVersion {
V0 = 0;
}


Expand Down
253 changes: 253 additions & 0 deletions crates/store/re_remote_store_types/src/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
use arrow2::error::Error as ArrowError;
use arrow2::io::ipc::{read, write};
use re_dataframe::TransportChunk;

use crate::v0::EncoderVersion;

#[derive(Debug, thiserror::Error)]
pub enum CodecError {
#[error("Arrow serialization error: {0}")]
ArrowSerialization(ArrowError),

#[error("Failed to decode message header {0}")]
HeaderDecoding(std::io::Error),

#[error("Failed to encode message header {0}")]
HeaderEncoding(std::io::Error),

#[error("Missing record batch")]
MissingRecordBatch,

#[error("Unexpected stream state")]
UnexpectedStreamState,

#[error("Unknown message header")]
UnknownMessageHeader,
}

#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)]
pub struct MessageHader(pub u8);

impl MessageHader {
pub const NO_DATA: Self = Self(1);
pub const RECORD_BATCH: Self = Self(2);

pub const SIZE_BYTES: usize = 1;
}

impl MessageHader {
fn decode(read: &mut impl std::io::Read) -> Result<Self, CodecError> {
let mut buffer = [0_u8; Self::SIZE_BYTES];
read.read_exact(&mut buffer)
.map_err(CodecError::HeaderDecoding)?;

let header = u8::from_le(buffer[0]);

Ok(Self(header))
}

fn encode(&self, write: &mut impl std::io::Write) -> Result<(), CodecError> {
write
.write_all(&[self.0])
.map_err(CodecError::HeaderEncoding)?;

Ok(())
}
}

#[derive(Debug)]
pub enum TransportMessageV0 {
NoData,
RecordBatch(TransportChunk),
}

impl TransportMessageV0 {
fn to_bytes(&self) -> Result<Vec<u8>, CodecError> {
match self {
Self::NoData => {
let mut data: Vec<u8> = Vec::new();
MessageHader::NO_DATA.encode(&mut data)?;
Ok(data)
}
Self::RecordBatch(chunk) => {
let mut data: Vec<u8> = Vec::new();
MessageHader::RECORD_BATCH.encode(&mut data)?;

let options = write::WriteOptions { compression: None };
let mut sw = write::StreamWriter::new(&mut data, options);

sw.start(&chunk.schema, None)
.map_err(CodecError::ArrowSerialization)?;
sw.write(&chunk.data, None)
.map_err(CodecError::ArrowSerialization)?;
sw.finish().map_err(CodecError::ArrowSerialization)?;

Ok(data)
}
}
}

fn from_bytes(data: &[u8]) -> Result<Self, CodecError> {
let mut reader = std::io::Cursor::new(data);
let header = MessageHader::decode(&mut reader)?;

match header {
MessageHader::NO_DATA => Ok(Self::NoData),
MessageHader::RECORD_BATCH => {
let metadata = read::read_stream_metadata(&mut reader)
.map_err(CodecError::ArrowSerialization)?;
let mut stream = read::StreamReader::new(&mut reader, metadata, None);

let schema = stream.schema().clone();
// there should be at least one record batch in the stream
// TODO(zehiko) isn't there a "read one record batch from bytes" arrow2 function??
let stream_state = stream
.next()
.ok_or(CodecError::MissingRecordBatch)?
.map_err(CodecError::ArrowSerialization)?;

match stream_state {
read::StreamState::Waiting => Err(CodecError::UnexpectedStreamState),
read::StreamState::Some(chunk) => {
let tc = TransportChunk {
schema: schema.clone(),
data: chunk,
};

Ok(Self::RecordBatch(tc))
}
}
}
_ => Err(CodecError::UnknownMessageHeader),
}
}
}

// TODO(zehiko) add support for separately encoding schema from the record batch to get rid of overhead
// of sending schema in each transport message for the same stream of batches. This will require codec
// to become stateful and keep track if schema was sent / received.
/// Encode a transport chunk into a byte stream.
pub fn encode(version: EncoderVersion, chunk: TransportChunk) -> Result<Vec<u8>, CodecError> {
match version {
EncoderVersion::V0 => TransportMessageV0::RecordBatch(chunk).to_bytes(),
}
}

/// Encode a `NoData` message into a byte stream. This can be used by the remote store
/// (i.e. data producer) to signal back to the client that there's no data available.
pub fn no_data(version: EncoderVersion) -> Result<Vec<u8>, CodecError> {
match version {
EncoderVersion::V0 => TransportMessageV0::NoData.to_bytes(),
}
}

/// Decode transport data from a byte stream - if there's a record batch present, return it, otherwise return `None`.
pub fn decode(version: EncoderVersion, data: &[u8]) -> Result<Option<TransportChunk>, CodecError> {
match version {
EncoderVersion::V0 => {
let msg = TransportMessageV0::from_bytes(data)?;
match msg {
TransportMessageV0::RecordBatch(chunk) => Ok(Some(chunk)),
TransportMessageV0::NoData => Ok(None),
}
}
}
}

#[cfg(test)]
mod tests {
use re_dataframe::external::re_chunk::{Chunk, RowId};
use re_log_types::{example_components::MyPoint, Timeline};

use crate::{
codec::{decode, encode, CodecError, TransportMessageV0},
v0::EncoderVersion,
};

fn get_test_chunk() -> Chunk {
let row_id1 = RowId::new();
let row_id2 = RowId::new();

let timepoint1 = [
(Timeline::log_time(), 100),
(Timeline::new_sequence("frame"), 1),
];
let timepoint2 = [
(Timeline::log_time(), 104),
(Timeline::new_sequence("frame"), 1),
];

let points1 = &[MyPoint::new(1.0, 1.0)];
let points2 = &[MyPoint::new(2.0, 2.0)];

Chunk::builder("mypoints".into())
.with_component_batches(row_id1, timepoint1, [points1 as _])
.with_component_batches(row_id2, timepoint2, [points2 as _])
.build()
.unwrap()
}

#[test]
fn test_message_v0_no_data() {
let msg = TransportMessageV0::NoData;
let data = msg.to_bytes().unwrap();
let decoded = TransportMessageV0::from_bytes(&data).unwrap();
assert!(matches!(decoded, TransportMessageV0::NoData));
}

#[test]
fn test_message_v0_record_batch() {
let expected_chunk = get_test_chunk();

let msg = TransportMessageV0::RecordBatch(expected_chunk.clone().to_transport().unwrap());
let data = msg.to_bytes().unwrap();
let decoded = TransportMessageV0::from_bytes(&data).unwrap();

#[allow(clippy::match_wildcard_for_single_variants)]
match decoded {
TransportMessageV0::RecordBatch(transport) => {
let decoded_chunk = Chunk::from_transport(&transport).unwrap();
assert_eq!(expected_chunk, decoded_chunk);
}
_ => panic!("unexpected message type"),
}
}

#[test]
fn test_invalid_batch_data() {
let data = vec![2, 3, 4]; // '1' is NO_DATA message header
let decoded = TransportMessageV0::from_bytes(&data);

assert!(matches!(
decoded.err().unwrap(),
CodecError::ArrowSerialization(_)
));
}

#[test]
fn test_unknown_header() {
let data = vec![3];
let decoded = TransportMessageV0::from_bytes(&data);
assert!(decoded.is_err());

assert!(matches!(
decoded.err().unwrap(),
CodecError::UnknownMessageHeader
));
}

#[test]
fn test_v0_codec() {
let expected_chunk = get_test_chunk();

let encoded = encode(
EncoderVersion::V0,
expected_chunk.clone().to_transport().unwrap(),
)
.unwrap();
let decoded = decode(EncoderVersion::V0, &encoded).unwrap().unwrap();
let decoded_chunk = Chunk::from_transport(&decoded).unwrap();

assert_eq!(expected_chunk, decoded_chunk);
}
}
3 changes: 3 additions & 0 deletions crates/store/re_remote_store_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
//! necessary conversion code (in the form of `From` and `TryFrom` traits) in this crate.
//!

/// Codec for serializing and deserializing query response (record batch) data
pub mod codec;

/// Generated types for the remote store gRPC service API v0.
pub mod v0 {
// Ignoring all warnings for the auto-generated code.
Expand Down
36 changes: 32 additions & 4 deletions crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,19 @@ pub struct QueryRequest {
#[prost(message, optional, tag = "1")]
pub recording_id: ::core::option::Option<RecordingId>,
/// query to execute
#[prost(message, optional, tag = "2")]
#[prost(message, optional, tag = "3")]
pub query: ::core::option::Option<Query>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueryResponse {
/// single record batch (encoding TBD - TODO).
#[prost(bytes = "vec", tag = "1")]
pub record_batch: ::prost::alloc::vec::Vec<u8>,
/// TODO(zehiko) we need to expand this to become something like 'encoder options'
/// as we will need to specify additional options like compression, including schema
/// in payload, etc.
#[prost(enumeration = "EncoderVersion", tag = "1")]
pub encoder_version: i32,
/// payload is raw bytes that the relevant codec can interpret
#[prost(bytes = "vec", tag = "2")]
pub payload: ::prost::alloc::vec::Vec<u8>,
}
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ListRecordingsRequest {}
Expand All @@ -298,6 +303,29 @@ pub struct RecordingInfo {
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum EncoderVersion {
V0 = 0,
}
impl EncoderVersion {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Self::V0 => "V0",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"V0" => Some(Self::V0),
_ => None,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum RecordingType {
Rrd = 0,
}
Expand Down

0 comments on commit 14a7dd4

Please sign in to comment.