diff --git a/event_model/__init__.py b/event_model/__init__.py index 766ce657..5ce07d17 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1996,11 +1996,6 @@ def compose_resource( ) -# A dict of Tuple[str, StreamRange] where the string is the StreamDatum uuid - -_stream_datum_seq_nums: Dict[str, StreamRange] = {} - - @dataclass class ComposeStreamDatum: stream_resource: StreamResource @@ -2008,7 +2003,6 @@ class ComposeStreamDatum: def __call__( self, - data_keys: List[str], indices: StreamRange, seq_nums: Optional[StreamRange] = None, descriptor: Optional[EventDescriptor] = None, @@ -2024,7 +2018,6 @@ def __call__( doc = StreamDatum( stream_resource=resource_uid, uid=f"{resource_uid}/{next(self.counter)}", - data_keys=data_keys, seq_nums=seq_nums, indices=indices, descriptor=descriptor["uid"] if descriptor else "", @@ -2040,7 +2033,6 @@ def compose_stream_datum( *, stream_resource: StreamResource, counter: Iterator, - data_keys: List[str], seq_nums: StreamRange, indices: StreamRange, validate: bool = True, @@ -2053,7 +2045,6 @@ def compose_stream_datum( DeprecationWarning, ) return ComposeStreamDatum(stream_resource, counter)( - data_keys, seq_nums, indices, validate=validate, @@ -2084,6 +2075,7 @@ def __call__( spec: str, root: str, resource_path: str, + data_key: str, resource_kwargs: Dict[str, Any], path_semantics: Literal["posix", "windows"] = default_path_semantics, uid: Optional[str] = None, @@ -2094,6 +2086,7 @@ def __call__( doc = StreamResource( uid=uid, + data_key=data_key, spec=spec, root=root, resource_path=resource_path, @@ -2119,6 +2112,7 @@ def compose_stream_resource( spec: str, root: str, resource_path: str, + data_key: str, resource_kwargs: Dict[str, Any], counters: List = [], path_semantics: Literal["posix", "windows"] = default_path_semantics, @@ -2133,6 +2127,7 @@ def compose_stream_resource( spec, root, resource_path, + data_key, resource_kwargs, path_semantics=path_semantics, uid=uid, diff --git a/event_model/documents/stream_datum.py b/event_model/documents/stream_datum.py index 58091c47..06f7f83a 100644 --- a/event_model/documents/stream_datum.py +++ b/event_model/documents/stream_datum.py @@ -1,5 +1,3 @@ -from typing import List - from typing_extensions import Annotated, TypedDict from .generate.type_wrapper import Field, add_extra_schema @@ -42,13 +40,6 @@ class StreamDatum(TypedDict): "handler so it can hand back data and timestamps." ), ] - data_keys: Annotated[ - List[str], - Field( - description="A list to show which data_keys of the " - "Descriptor are being streamed" - ), - ] seq_nums: Annotated[ StreamRange, Field( diff --git a/event_model/documents/stream_resource.py b/event_model/documents/stream_resource.py index cafcf710..2d456c35 100644 --- a/event_model/documents/stream_resource.py +++ b/event_model/documents/stream_resource.py @@ -20,6 +20,13 @@ class StreamResource(TypedDict): Field(description="Rules for joining paths"), ] ] + data_key: Annotated[ + str, + Field( + description="A string to show which data_key of the " + "Descriptor are being streamed" + ), + ] resource_kwargs: Annotated[ Dict[str, Any], Field( diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index d2cf4909..124bbd0b 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -26,14 +26,6 @@ } }, "properties": { - "data_keys": { - "title": "Data Keys", - "description": "A list to show which data_keys of the Descriptor are being streamed", - "type": "array", - "items": { - "type": "string" - } - }, "descriptor": { "title": "Descriptor", "description": "UID of the EventDescriptor to which this Datum belongs", @@ -69,7 +61,6 @@ } }, "required": [ - "data_keys", "descriptor", "indices", "seq_nums", diff --git a/event_model/schemas/stream_resource.json b/event_model/schemas/stream_resource.json index 2ab8719a..cb42b78f 100644 --- a/event_model/schemas/stream_resource.json +++ b/event_model/schemas/stream_resource.json @@ -3,6 +3,11 @@ "description": "Document to reference a collection (e.g. file or group of files) of externally-stored data streams", "type": "object", "properties": { + "data_key": { + "title": "Data Key", + "description": "A string to show which data_key of the Descriptor are being streamed", + "type": "string" + }, "path_semantics": { "title": "Path Semantics", "description": "Rules for joining paths", @@ -44,6 +49,7 @@ } }, "required": [ + "data_key", "resource_kwargs", "resource_path", "root", diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 059339ca..4f9714bb 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -113,13 +113,14 @@ def test_compose_stream_resource(tmp_path): bundle = compose_stream_resource( spec="TIFF_STREAM", root=str(tmp_path), + data_key="det1", resource_path="test_streams", resource_kwargs={}, ) resource_doc, compose_stream_datum = bundle assert bundle.stream_resource_doc is resource_doc assert bundle.compose_stream_data is compose_stream_datum - compose_stream_datum([], StreamRange(start=0, stop=0), StreamRange(start=0, stop=0)) + compose_stream_datum(StreamRange(start=0, stop=0), StreamRange(start=0, stop=0)) def test_round_trip_pagination(): @@ -392,13 +393,14 @@ def test_document_router_streams_smoke_test(tmp_path): dr("start", start) stream_resource_doc, compose_stream_datum = compose_stream_resource( spec="TIFF_STREAM", + data_key="det1", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, ) dr("stream_resource", stream_resource_doc) datum_doc = compose_stream_datum( - [], StreamRange(start=0, stop=0), StreamRange(start=0, stop=0) + StreamRange(start=0, stop=0), StreamRange(start=0, stop=0) ) dr("stream_datum", datum_doc) dr("stop", run_bundle.compose_stop()) diff --git a/event_model/tests/test_run_router.py b/event_model/tests/test_run_router.py index 54df7335..93149d99 100644 --- a/event_model/tests/test_run_router.py +++ b/event_model/tests/test_run_router.py @@ -218,13 +218,14 @@ def test_run_router_streams(tmp_path): docs.append(("start", start_doc)) stream_resource_doc, compose_stream_datum = compose_stream_resource( spec="TIFF_STREAM", + data_key="det1", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, ) docs.append(("stream_resource", stream_resource_doc)) datum_doc = compose_stream_datum( - [], StreamRange(start=0, stop=0), StreamRange(start=0, stop=0) + StreamRange(start=0, stop=0), StreamRange(start=0, stop=0) ) docs.append(("stream_datum", datum_doc)) docs.append(("stop", stop_doc))