# Copyright (C) 2024 qBraid## This file is part of the qBraid-SDK## The qBraid-SDK is free software released under the GNU General Public License v3# or later. You can redistribute and/or modify it under the terms of the GPL v3.# See the LICENSE file in the project root or <https://www.gnu.org/licenses/gpl-3.0.html>.## THERE IS NO WARRANTY for the qBraid-SDK, as per Section 15 of the GPL v3."""Module defining QiskitJob Class"""from__future__importannotationsfromtypingimportTYPE_CHECKING,Optionalfromqbraid_core._importimportLazyLoaderfromqiskit.primitives.containers.primitive_resultimportPrimitiveResultfromqiskit_ibm_runtimeimportRuntimeJobV2fromqiskit_ibm_runtime.exceptionsimportRuntimeInvalidStateErrorfromqbraid._loggingimportloggerfromqbraid.runtime.enumsimportJobStatusfromqbraid.runtime.exceptionsimportJobStateError,QbraidRuntimeErrorfromqbraid.runtime.jobimportQuantumJobfromqbraid.runtime.resultimportResultfromqbraid.runtime.result_dataimportGateModelResultDatafrom.result_builderimportQiskitGateModelResultBuilderifTYPE_CHECKING:importqiskit.primitivesimportqiskit.resultimportqiskit_ibm_runtimeimportqbraid.runtime.ibmqbraid_rt_ibm:qbraid.runtime.ibm=LazyLoader("qbraid_rt_ibm",globals(),"qbraid.runtime.ibm")IBM_JOB_STATUS_MAP={"INITIALIZING":JobStatus.INITIALIZING,"QUEUED":JobStatus.QUEUED,"VALIDATING":JobStatus.VALIDATING,"RUNNING":JobStatus.RUNNING,"CANCELLED":JobStatus.CANCELLED,"DONE":JobStatus.COMPLETED,"ERROR":JobStatus.FAILED,}
[docs]classQiskitJob(QuantumJob):"""Class for interfacing with a Qiskit IBM ``RuntimeJob``."""
[docs]def__init__(self,job_id:str,job:Optional[qiskit_ibm_runtime.RuntimeJob|qiskit_ibm_runtime.RuntimeJobV2|qiskit.primitives.PrimitiveJob]=None,service:Optional[qiskit_ibm_runtime.QiskitRuntimeService]=None,**kwargs,):"""Create a ``QiskitJob`` instance."""super().__init__(job_id,**kwargs)self._job=joborself._get_job(service=service)
def_get_job(self,service:Optional[qiskit_ibm_runtime.QiskitRuntimeService]=None)->qiskit_ibm_runtime.RuntimeJob|qiskit_ibm_runtime.RuntimeJobV2:"""Return the qiskit_ibm_runtime.RuntimeJob associated with instance id attribute. Attempts to retrieve a job using a specified or default service. Handles service initialization with error management for authentication issues. Raises: QbraidRuntimeError: If there is an error initializing the service or retrieving the job. """ifserviceisNone:ifself._deviceandgetattr(self._device,"_service",None):service=self._device._serviceelse:try:service=qbraid_rt_ibm.QiskitRuntimeProvider().runtime_serviceexceptExceptionaserr:raiseQbraidRuntimeError("Failed to initialize the quantum service.")fromerrtry:returnservice.job(self.id)exceptExceptionaserr:raiseQbraidRuntimeError(f"Error retrieving job {self.id}")fromerrdefstatus(self):"""Returns status from Qiskit Job object."""job_status=self._job.status()job_status=job_statusifisinstance(job_status,str)elsejob_status.namestatus=IBM_JOB_STATUS_MAP.get(job_status,JobStatus.UNKNOWN)self._cache_metadata["status"]=statusreturnstatusdefqueue_position(self)->Optional[int]:"""Returns the position of the job in the server queue."""ifisinstance(self._job,RuntimeJobV2):raiseNotImplementedError("Queue position is not supported for RuntimeJobV2.")returnself._job.queue_position(refresh=True)defresult(self):"""Return the results of the job."""ifnotself.is_terminal_state():logger.info("Result will be available when job has reached final state.")runner_result:qiskit.result.Result|PrimitiveResult=self._job.result()ifisinstance(runner_result,PrimitiveResult):backend=self._job.backend()ifhasattr(self._job,"backend")elseNonedevice_id=(backend.nameifbackendelse(self._device.idifself._deviceelse"sampler"))job_id=self._job.job_id()success=self.status()==JobStatus.COMPLETEDrunner_data=runner_result.metadataelse:runner_data=runner_result.to_dict()job_id=runner_data.pop("job_id",self._job.job_id())success=runner_data.pop("success",runner_result.success)device_id=runner_result.backend_namebuilder=QiskitGateModelResultBuilder(runner_result)measurement_counts=builder.get_counts()measurements=builder.measurements()data=GateModelResultData(measurement_counts=measurement_counts,measurements=measurements)returnResult(device_id=device_id,job_id=job_id,success=success,data=data,**runner_data,)defcancel(self):"""Attempt to cancel the job."""ifself.is_terminal_state():raiseJobStateError("Cannot cancel quantum job in non-terminal state.")try:returnself._job.cancel()exceptRuntimeInvalidStateErroraserr:raiseJobStateErrorfromerr