-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patheval_partition_pipeline.py
More file actions
199 lines (155 loc) · 7.15 KB
/
eval_partition_pipeline.py
File metadata and controls
199 lines (155 loc) · 7.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import itertools
import json
import os
from datetime import date, datetime
from queue import Queue
from typing import List
import numpy as np
from qiskit import execute
from qiskit.providers.aer import Aer, AerJob
import config.load_config as cfg
import ibmq_account
import logger
from evaluate.circuit_gen import circ_gen
from evaluate.util import dict_to_array, sv_to_probability
from execution_handler.execution_handler import ExecutionHandler
from partitioner.partition_result_processing import (ResultProcessing,
ResultWriter)
from partitioner.partitioner import Partitioner
from quantum_execution_job import QuantumExecutionJob
from resource_mapping.backend_chooser import Backend_Data
from resource_mapping.result_analyzer import ResultAnalyzer
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime, date)):
return obj.isoformat()
if isinstance(obj, complex):
return str(obj)
raise TypeError("Type %s not serializable" % type(obj))
def get_all_permutations(input_list):
return list(itertools.chain(*itertools.permutations(input_list)))
def write_file(dir_path, backend, results, part_results, sv_res_prob: List[np.ndarray], n_qubits: int, circuits, circuit_type, permute, shots):
res_prob = [dict_to_array(r, n_qubits) for r in results]
part_res_prob = [dict_to_array(r, n_qubits) for r in part_results]
data = []
n_circuits = len(circuits)
for i in range(n_circuits):
data.append({"circuit": circuits[i].qasm(), "sv-result": sv_res_prob[i].tolist(
), "result": res_prob[i].tolist(), "part-result": part_res_prob[i].tolist()})
backend_dict = {"name": backend.name()}
if backend.configuration() != None:
backend_dict["config"] = backend.configuration().to_dict()
if backend.status() != None:
backend_dict["status"] = backend.status().to_dict()
if backend.properties() != None:
backend_dict["properties"] = backend.properties().to_dict()
now = datetime.now()
now_str = now.strftime('%Y-%m-%d-%H-%M-%S')
with open(f'{dir_path}/{backend.name()}.json', 'w') as f:
json.dump({"date": now_str, "circuit_type": circuit_type, "n_circuits": n_circuits, "n_qubits": n_qubits,
"permute": permute, "shots": shots, "backend": backend_dict, "data": data}, f, indent=4, default=json_serial)
log.info("Wrote results to file.")
if __name__ == "__main__":
"""
Configure the evaluation here:
"""
# backend_names = ['ibmq_qasm_simulator' , 'ibmq_athens', 'ibmq_santiago', 'ibmq_belem']
# backend_names = ['ibmq_qasm_simulator' , 'ibmq_athens', 'ibmq_santiago', 'ibmq_quito', 'ibmq_lima', 'ibmq_belem']
backend_names = ['ibmq_qasm_simulator']
shots = 8192
n_circuits = 1
n_qubits = 5
subcircuit_max_qubits = 3
circuit_type = "adder"
permute = False
"""
Configuration End
"""
config = cfg.load_or_create()
logger.set_log_level_from_config(config)
provider = ibmq_account.get_provider(config)
log = logger.get_logger("Evaluate")
now = datetime.now()
now_str = now.strftime('%Y-%m-%d-%H-%M-%S')
dir_path = f"part_data/{circuit_type}_{n_qubits}_{subcircuit_max_qubits}_{now_str}"
os.makedirs(dir_path)
log.info(f"Created directory {dir_path}")
circuits, n_circuits = circ_gen(circuit_type, n_qubits, n_circuits)
log.info(f"Generated {n_circuits} circuits")
print(circuits[0])
statevector_backend = Aer.get_backend('statevector_simulator')
sv_job: AerJob = execute(circuits, statevector_backend)
sv_res = sv_job.result()
sv_results = [sv_res.get_statevector(circ) for circ in circuits]
sv_res_prob = [sv_to_probability(sv) for sv in sv_results]
log.info("Executed the circuits with local statevector simulator")
if permute:
circuits = get_all_permutations(circuits)
sv_res_prob = get_all_permutations(sv_res_prob)
n_circuits = len(circuits)
log.info(
f"Generated all permutations. Now there are {n_circuits} circuits")
backend_data_list = []
backends = {}
for backend_name in backend_names:
backend = provider.get_backend(backend_name)
backend_data = Backend_Data(backend)
backend_data_list.append(backend_data)
backends[backend_name] = {
"backend": backend, "backend_data": backend_data}
input_pipeline = Queue()
input_exec = Queue()
output_exec = Queue()
part_results = Queue()
all_results_are_available = Queue()
output_pipline = Queue()
errors = Queue()
for backend_data in backend_data_list:
for circ in circuits:
input_pipeline.put(QuantumExecutionJob(circuit=circ.measure_all(inplace=False), shots=shots, backend_data=backend_data, config={
"partitioner": {"subcircuit_max_qubits": subcircuit_max_qubits}}))
input_exec.put(QuantumExecutionJob(circuit=circ.measure_all(
inplace=False), shots=shots, backend_data=backend_data))
partition_dict = {}
partitioner = Partitioner(input=input_pipeline, output=input_exec,
partition_dict=partition_dict, error_queue=errors, **config["partitioner"])
partitioner.start()
exec_handler = ExecutionHandler(
provider, input=input_exec, output=output_exec)
exec_handler.start()
result_analyzer = ResultAnalyzer(
input=output_exec, output=output_pipline, output_agg=None, output_part=part_results)
result_analyzer.start()
partition_result_writer = ResultWriter(
input=part_results, completed_jobs=all_results_are_available, partition_dict=partition_dict)
partition_result_writer.start()
partition_result_processor = ResultProcessing(
input=all_results_are_available, output=output_pipline, partition_dict=partition_dict)
partition_result_processor.start()
log.info("Started the partition pipeline")
results = {}
part_results = {}
n_results = 2*n_circuits*len(backend_names)
for backend_name in backend_names:
results[backend_name] = []
part_results[backend_name] = []
i = 0
while i < n_results:
job = output_pipline.get()
i += 1
r = job.result_prob
backend_name = job.backend_data.name
log.debug(
f"{i}: Got job {job.id},type {job.type}, from backend {backend_name}")
if len(results[backend_name]) < n_circuits:
results[backend_name].append(r)
else:
part_results[backend_name].append(r)
if len(results[backend_name]) == n_circuits and len(part_results[backend_name]) == 0:
log.info(
f"All results for not partitioned circuits are available for backend {backend_name}")
elif len(part_results[backend_name]) == n_circuits:
log.info(
f"All results for partitioned circuits are available for backend {backend_name}")
write_file(dir_path, backends[backend_name]["backend"], results.pop(backend_name), part_results.pop(
backend_name), sv_res_prob, n_qubits, circuits, circuit_type, permute, shots)