diff --git a/src/braket/aws/aws_quantum_task_batch.py b/src/braket/aws/aws_quantum_task_batch.py index 88f226798..d9010c92d 100644 --- a/src/braket/aws/aws_quantum_task_batch.py +++ b/src/braket/aws/aws_quantum_task_batch.py @@ -16,7 +16,7 @@ import time from concurrent.futures.thread import ThreadPoolExecutor from itertools import repeat -from typing import TYPE_CHECKING, Any, Union +from typing import TYPE_CHECKING, Any, Optional from braket.ahs.analog_hamiltonian_simulation import AnalogHamiltonianSimulation from braket.annealing import Problem @@ -71,14 +71,14 @@ def __init__( max_workers: int = MAX_CONNECTIONS_DEFAULT, poll_timeout_seconds: float = AwsQuantumTask.DEFAULT_RESULTS_POLL_TIMEOUT, poll_interval_seconds: float = AwsQuantumTask.DEFAULT_RESULTS_POLL_INTERVAL, - inputs: Optional[dict[str, float] | list[dict[str, float]]] = None, # noqa: UP007 + inputs: Optional[dict[str, float] | list[dict[str, float]]] = None, # noqa: UP007 gate_definitions: ( - Optional[ # noqa: UP007 + Optional[ # noqa: UP007 dict[tuple[Gate, QubitSet], PulseSequence] | list[dict[tuple[Gate, QubitSet], PulseSequence]] ] ) = None, - reservation_arn: Optional[str] = None, # noqa: UP007 + reservation_arn: Optional[str] = None, # noqa: UP007 *aws_quantum_task_args: Any, **aws_quantum_task_kwargs: Any, ): @@ -164,8 +164,8 @@ def _tasks_inputs_gatedefs( | list[ Circuit | Problem | OpenQasmProgram | BlackbirdProgram | AnalogHamiltonianSimulation ], - inputs: Optional[dict[str, float] | list[dict[str, float]]] = None, # noqa: UP007 - gate_definitions: Optional[ # noqa: UP007 + inputs: Optional[dict[str, float] | list[dict[str, float]]] = None, # noqa: UP007 + gate_definitions: Optional[ # noqa: UP007 dict[tuple[Gate, QubitSet], PulseSequence] | list[dict[tuple[Gate, QubitSet], PulseSequence]] ] = None, @@ -243,12 +243,12 @@ def _execute( poll_interval_seconds: float = AwsQuantumTask.DEFAULT_RESULTS_POLL_INTERVAL, inputs: Optional[dict[str, float] | list[dict[str, float]]] = None, # noqa: UP007 gate_definitions: ( - Optional[ # noqa: UP007 + Optional[ # noqa: UP007 dict[tuple[Gate, QubitSet], PulseSequence] | list[dict[tuple[Gate, QubitSet], PulseSequence]] ] ) = None, - reservation_arn: Optional[str] = None, # noqa: UP007 + reservation_arn: Optional[str] = None, # noqa: UP007 *args, **kwargs, ) -> list[AwsQuantumTask]: @@ -304,9 +304,9 @@ def _create_task( s3_destination_folder: AwsSession.S3DestinationFolder, shots: int, poll_interval_seconds: float = AwsQuantumTask.DEFAULT_RESULTS_POLL_INTERVAL, - inputs: Optional[dict[str, float]] = None, # noqa: UP007 - gate_definitions: Optional[dict[tuple[Gate, QubitSet], PulseSequence]] = None, # noqa: UP007 - reservation_arn: Optional[str] = None, # noqa: UP007 + inputs: Optional[dict[str, float]] = None, # noqa: UP007 + gate_definitions: Optional[dict[tuple[Gate, QubitSet], PulseSequence]] = None, # noqa: UP007 + reservation_arn: str | None = None, *args, **kwargs, ) -> AwsQuantumTask: @@ -359,10 +359,10 @@ def results( even when results have already been cached. Default: `True`. Returns: - list[GateModelQuantumTaskResult | AnnealingQuantumTaskResult | PhotonicModelQuantumTaskResult | AnalogHamiltonianSimulationQuantumTaskResult]: The # noqa: E501 + list[GateModelQuantumTaskResult | AnnealingQuantumTaskResult | PhotonicModelQuantumTaskResult | AnalogHamiltonianSimulationQuantumTaskResult]: The results of all of the quantum tasks in the batch. `FAILED`, `CANCELLED`, or timed out quantum tasks will have a result of None - """ + """ # noqa: E501 if not self._results or not use_cached_value: self._results = AwsQuantumTaskBatch._retrieve_results(self._tasks, self._max_workers) self._unsuccessful = {