diff --git a/recirq/dfl/__init__.py b/recirq/dfl/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/recirq/dfl/dfl_1d.py b/recirq/dfl/dfl_1d.py new file mode 100644 index 00000000..559f3ae6 --- /dev/null +++ b/recirq/dfl/dfl_1d.py @@ -0,0 +1,321 @@ +# Copyright 2025 Google +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Generates the Trotterized circuits for 1D Disorder-Free Localization (DFL) experiments. +""" + +from collections.abc import Sequence +from typing import List + +import cirq +import matplotlib.pyplot as plt +import numpy as np +import numpy.typing as npt +from tqdm import tqdm + +#import Enums +from recirq.dfl.dfl_enums import TwoQubitGate, InitialState, Basis + +def trotter_circuit( + grid: Sequence[cirq.GridQubit], + n_cycles: int, + dt: float, + h: float, + mu: float, + two_qubit_gate: TwoQubitGate = TwoQubitGate.CZ, +) -> cirq.Circuit: + """Constructs a Trotter circuit for 1D Disorder-Free Localization (DFL) simulation. + + Args: + grid: The 1D sequence of qubits used in the experiment. + n_cycles: The number of Trotter steps/cycles to include. + dt: The time step size for the Trotterization. + h: The gauge field strength coefficient. + mu: The matter field strength coefficient. + two_qubit_gate: The type of two-qubit gate to use in the layers. + Use an Enum member from TwoQubitGate (CZ or CPHASE). + + Returns: + The complete cirq.Circuit for the Trotter evolution. + + Raises: + ValueError: If an invalid `two_qubit_gate` option is provided. + """ + if two_qubit_gate.value == TwoQubitGate.CZ.value: + return _layer_floquet_cz(grid, dt, h, mu) * n_cycles + + elif two_qubit_gate.value == TwoQubitGate.CPHASE.value: + if n_cycles == 0: + return cirq.Circuit() + if n_cycles == 1: + return _layer_floquet_cphase_first( + grid, dt, h, mu + ) + _layer_floquet_cphase_last_missing_piece(grid, dt, h, mu) + else: + return ( + _layer_floquet_cphase_first(grid, dt, h, mu) + + (n_cycles - 1) * _layer_floquet_cphase_middle(grid, dt, h, mu) + + _layer_floquet_cphase_last_missing_piece(grid, dt, h, mu) + ) + else: + raise ValueError("Two-qubit gate can only be cz or cphase") + + +def _layer_floquet_cz( + grid: Sequence[cirq.GridQubit], dt: float, h: float, mu: float +) -> cirq.Circuit: + moment_rz = [] + moment_rx = [] + moment_h = [] + for i in range(len(grid)): + q = grid[i] + if i % 2 == 1: + moment_rz.append(cirq.rz(dt).on(q)) + moment_h.append(cirq.H(q)) + moment_rx.append(cirq.rx(2 * h * dt).on(q)) + else: + moment_rx.append(cirq.rx(2 * mu * dt).on(q)) + return ( + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + + _change_basis(grid) + + cirq.Circuit.from_moments(cirq.Moment(moment_rx)) + + _change_basis(grid) + + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + ) + + +def _layer_floquet_cphase_middle( + grid: Sequence[cirq.GridQubit], dt: float, h: float, mu: float +) -> cirq.Circuit: + n = len(grid) + moment_0 = [] + moment_1 = [] + moment_h = [] + moment_rz = [] + moment_rx = [] + + for i in range(n // 2): + q0 = grid[(2 * i)] + q = grid[2 * i + 1] + q1 = grid[(2 * i + 2) % n] + moment_0.append(cirq.CZ(q0, q)) # left to right + moment_1.append(cirq.cphase(-4 * dt).on(q1, q)) # right to left + + moment_h.append(cirq.H(q)) + + moment_rz.append(cirq.rz(2 * dt).on(q)) + moment_rz.append(cirq.rz(2 * dt).on(q1)) + + for i in range(n): + q = grid[i] + if i % 2 == 1: + moment_rx.append(cirq.rx(2 * h * dt).on(q)) + else: + moment_rx.append(cirq.rx(2 * mu * dt).on(q)) + + moment_rz_new = [] + qubits_covered = [] + for gate in moment_rz: + count = moment_rz.count(gate) + qubit = gate.qubits[0] + if qubit not in qubits_covered: + moment_rz_new.append(cirq.rz(2 * count * dt).on(qubit)) + qubits_covered.append(qubit) + + return cirq.Circuit.from_moments( + cirq.Moment(moment_h), + cirq.Moment(moment_0), + cirq.Moment(moment_h), + cirq.Moment(moment_rz_new), + cirq.Moment(moment_1), + cirq.Moment(moment_h), + cirq.Moment(moment_0), + cirq.Moment(moment_h), + cirq.Moment(moment_rx), + ) + + +def _layer_floquet_cphase_first( + grid: Sequence[cirq.GridQubit], dt: float, h: float, mu: float +) -> cirq.Circuit: + moment_rz = [] + moment_rx = [] + + for i in range(len(grid)): + q = grid[i] + if i % 2 == 1: + moment_rz.append(cirq.rz(dt).on(q)) + moment_rx.append(cirq.rx(2 * h * dt).on(q)) + else: + moment_rx.append(cirq.rx(2 * mu * dt).on(q)) + + return ( + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + + _change_basis(grid) + + cirq.Circuit.from_moments(cirq.Moment(moment_rx)) + ) + + +def _layer_floquet_cphase_last_missing_piece( + grid: Sequence[cirq.GridQubit], dt: float, h: float, mu: float +) -> cirq.Circuit: + moment_rz = [] + for i in range(len(grid)): + q = grid[i] + if i % 2 == 1: + moment_rz.append(cirq.rz(dt).on(q)) + + return _change_basis(grid) + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + + +def _layer_hadamard(grid: Sequence[cirq.GridQubit], which_qubits="all") -> cirq.Circuit: + moment = [] + for i in range(len(grid)): + q = grid[i] + + if i % 2 == 1 and (which_qubits == "gauge" or which_qubits == "all"): + moment.append(cirq.H(q)) + + elif i % 2 == 0 and (which_qubits == "matter" or which_qubits == "all"): + moment.append(cirq.H(q)) + return cirq.Circuit.from_moments(cirq.Moment(moment)) + + +def _layer_measure(grid: Sequence[cirq.GridQubit]) -> cirq.Circuit: + moment = [] + for i in range(len(grid) // 2): + moment.append(cirq.measure(grid[2 * i], key="m")) + return cirq.Circuit.from_moments(cirq.Moment(moment)) + + +def _change_basis(grid: Sequence[cirq.GridQubit]) -> cirq.Circuit: + """change the basis so that the matter sites encode the gauge charge.""" + n = len(grid) + moment_1 = [] + moment_2 = [] + moment_h = [] + for i in range(0, n // 2): + q1 = grid[(2 * i)] + q2 = grid[2 * i + 1] + q3 = grid[(2 * i + 2) % n] + moment_1.append(cirq.CZ(q1, q2)) + moment_2.append(cirq.CZ(q3, q2)) + moment_h.append(cirq.H(q2)) + return cirq.Circuit.from_moments( + cirq.Moment(moment_h), + cirq.Moment(moment_1), + cirq.Moment(moment_2), + cirq.Moment(moment_h), + ) + + +def _energy_bump_initial_state( + grid, h: float, matter_config: InitialState, excited_qubits: Sequence[cirq.GridQubit] +) -> cirq.Circuit: + """Circuit for energy bump initial state.""" + + theta = np.arctan(h) + moment = [] + for i in range(len(grid)): + q = grid[i] + if i % 2 == 1: + if q in excited_qubits: + moment.append(cirq.ry(np.pi + theta).on(q)) + else: + moment.append(cirq.ry(theta).on(q)) + + else: + if matter_config.value == InitialState.SINGLE_SECTOR.value: + moment.append(cirq.H(q)) + return cirq.Circuit.from_moments(moment) + + +def get_1d_dfl_experiment_circuits( + grid: Sequence[cirq.GridQubit], + initial_state: InitialState, + n_cycles: Sequence[int] | npt.NDArray, + excited_qubits: Sequence[cirq.GridQubit], + dt: float, + h: float, + mu: float, + n_instances: int = 10, + two_qubit_gate: TwoQubitGate = TwoQubitGate.CZ, + basis: Basis = Basis.DUAL, +) -> List[cirq.Circuit]: + """Generates the circuits needed for the 1D DFL experiment. + + Args: + grid: The 1D sequence of qubits used in the experiment. + initial_state: The initial state preparation. Use an Enum member from + InitialState (SINGLE_SECTOR or SUPERPOSITION). + n_cycles: The number of Trotter steps (cycles) to simulate. + excited_qubits: Qubits to be excited in the initial state. + dt: The time step size for the Trotterization. + h: The gauge field strength coefficient. + mu: The matter field strength coefficient. + n_instances: The number of instances to generate. + two_qubit_gate: The type of two-qubit gate to use in the Trotter step. + Use an Enum member from TwoQubitGate (CZ or CPHASE). + basis: The basis for the final circuit structure. Use an Enum member from + Basis (LGT or DUAL). + + Returns: + A list of all generated cirq.Circuit objects. + + Raises: + ValueError: If an invalid option for `initial_state` or + `basis` is given. + """ + if initial_state.value == InitialState.SINGLE_SECTOR.value: + initial_circuit = _energy_bump_initial_state( + grid, h, InitialState.SINGLE_SECTOR, excited_qubits + ) + elif initial_state.value == InitialState.SUPERPOSITION.value: + initial_circuit = _energy_bump_initial_state( + grid, h, InitialState.SUPERPOSITION, excited_qubits + ) + else: + raise ValueError("Invalid initial state") + circuits = [] + for n_cycle in tqdm(n_cycles): + print(int(np.max([0, n_cycle - 1]))) + circ = initial_circuit + trotter_circuit( + grid, n_cycle, dt, h, mu, two_qubit_gate + ) + if basis.value == Basis.LGT.value: + circ += _change_basis(grid) + elif basis.value == Basis.DUAL.value: + pass + else: + raise ValueError("Invalid option for basis") + for _ in range(n_instances): + + if basis.value == Basis.LGT.value: + circ_z = circ + cirq.measure([q for q in grid], key="m") + elif basis.value == Basis.DUAL.value: + circ_z = ( + circ + + _layer_hadamard(grid, "matter") + + cirq.measure([q for q in grid], key="m") + ) + else: + raise ValueError("Invalid option for basis") + circ_x = ( + circ + + _layer_hadamard(grid, "all") + + cirq.measure([q for q in grid], key="m") + ) + circuits.append(circ_z) + circuits.append(circ_x) + return circuits diff --git a/recirq/dfl/dfl_2d_second_order_trotter.py b/recirq/dfl/dfl_2d_second_order_trotter.py new file mode 100644 index 00000000..39f9632e --- /dev/null +++ b/recirq/dfl/dfl_2d_second_order_trotter.py @@ -0,0 +1,782 @@ +# Copyright 2025 Google +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Core class and methods for simulating Disorder-Free Localization (DFL) on a 2D +Lattice Gauge Theory (LGT) model using second-order trotterization. +""" + +from typing import cast, List, Sequence, Set, Tuple + +import cirq +import numpy as np +import numpy.typing as npt +from tqdm import tqdm + +#import Enums +from recirq.dfl.dfl_enums import TwoQubitGate, InitialState, Basis + + +class LatticeGaugeTheoryDFL2D: + """A class for simulating Disorder-free localization (DFL) on a 2-dimensional Lattice Gauge + Theory (LGT) with Second order trotter dynamics. + + Attributes: + qubit_grid: A list of cirq.GridQubit objects representing the 2D grid. + origin: Top right matter qubit + dt: The time step for the simulation. + h: The gauge field strength. + mu: The matter field strength. + matter_qubits: A sorted list of cirq.GridQubit objects representing matter qubits. + gauge_qubits: A sorted list of cirq.GridQubit objects representing gauge qubits. + all_qubits: A sorted list of all qubits (matter + gauge). + """ + + def __init__( + self, + qubit_grid: Sequence[cirq.GridQubit], + origin: cirq.GridQubit, + dt: float, + h: float, + mu: float, + ): + """Initializes the LatticeGaugeTheoryDFL2D class. + + Args: + qubit_grid: A list of cirq.GridQubit objects representing the 2D grid. + origin: The starting cirq.GridQubit object for grid generation. + dt: The time step for the simulation. + h: The gauge field strength. + mu: The matter field strength. + """ + self.qubit_grid = qubit_grid + self.origin = origin + self.dt = dt + self.h = h + self.mu = mu + self.matter_qubits, self.gauge_qubits = self.create_lgt_grid() + self.all_qubits = sorted(self.matter_qubits + self.gauge_qubits) + + def get_y_neighbors(self, qubit: cirq.GridQubit) -> Sequence[cirq.GridQubit]: + """Returns the vertical (row-axis) neighbors of a qubit. + If a matter qubit is provided, this returns the neighboring gauge qubits, + and vice-versa. + + Args: + qubit: The central qubit. + + Returns: + A sequence of neighboring qubits of the opposite type that are + within the defined grid. + """ + if qubit in self.gauge_qubits: + return [ + q + for q in [ + cirq.GridQubit(qubit.row + 1, qubit.col), + cirq.GridQubit(qubit.row - 1, qubit.col), + ] + if q in self.matter_qubits + ] + else: + return [ + q + for q in [ + cirq.GridQubit(qubit.row + 1, qubit.col), + cirq.GridQubit(qubit.row - 1, qubit.col), + ] + if q in self.gauge_qubits + ] + + def get_x_neighbors(self, qubit: cirq.GridQubit) -> Sequence[cirq.GridQubit]: + """Returns the horizontal (column-axis) neighbors of a qubit. + If a matter qubit is provided, this returns the neighboring gauge qubits, + and vice-versa. + + Args: + qubit: The central qubit. + + Returns: + A sequence of neighboring qubits of the opposite type that are + within the defined grid. + """ + if qubit in self.gauge_qubits: + return [ + q + for q in [ + cirq.GridQubit(qubit.row, qubit.col + 1), + cirq.GridQubit(qubit.row, qubit.col - 1), + ] + if q in self.matter_qubits + ] + else: + return [ + q + for q in [ + cirq.GridQubit(qubit.row, qubit.col + 1), + cirq.GridQubit(qubit.row, qubit.col - 1), + ] + if q in self.gauge_qubits + ] + + def create_lgt_grid( + self, + ) -> Tuple[Tuple[cirq.GridQubit, ...], Tuple[cirq.GridQubit, ...]]: + """Creates a lattice gauge theory (LGT) grid with simplified logic. + + Args: + qubit_grid: A list of cirq.GridQubit objects. + origin: The starting cirq.GridQubit object. + + Returns: + A tuple containing two lists: (matter_qubits, gauge_qubits). + """ + + qubit_set = self.qubit_grid + matter_qubits = [self.origin] + gauge_qubits = [] + queue = [self.origin] + + while queue: + current_matter = queue.pop(0) + neighbors = current_matter.neighbors() + + for neighbor in neighbors: + if neighbor in qubit_set and neighbor not in gauge_qubits: + gauge_qubits.append(neighbor) + + # Find matter neighbors for gauge qubits + if neighbor.row == current_matter.row: # Horizontal gauge + horizontal_neighbor = cirq.GridQubit( + neighbor.row, + neighbor.col + (neighbor.col - current_matter.col), + ) + if ( + horizontal_neighbor in self.qubit_grid + and horizontal_neighbor not in matter_qubits + ): + matter_qubits.append(horizontal_neighbor) + queue.append(horizontal_neighbor) + else: # Vertical gauge + vertical_neighbor = cirq.GridQubit( + neighbor.row + (neighbor.row - current_matter.row), + neighbor.col, + ) + if ( + vertical_neighbor in self.qubit_grid + and vertical_neighbor not in matter_qubits + ): + matter_qubits.append(vertical_neighbor) + queue.append(vertical_neighbor) + + return tuple(sorted(matter_qubits)), tuple(sorted(gauge_qubits)) # type: ignore + + def draw_lgt_grid(self): + """Draw the LGT grid.""" + d = {q: 1 for q in self.gauge_qubits} + d.update({q: 0 for q in self.matter_qubits}) + cirq.Heatmap(d).plot() + + def _layer_matter_gauge_x(self) -> cirq.Circuit: + """Single qubit rotations i.e., the mu and h terms.""" + moment = [] + for q in self.matter_qubits: + moment.append(cirq.rx(2 * self.mu * self.dt).on(q)) + for q in self.gauge_qubits: + moment.append(cirq.rx(2 * self.h * self.dt).on(q)) + return cirq.Circuit.from_moments(cirq.Moment(moment)) + + def layer_hadamard(self, which_qubits="all") -> cirq.Circuit: + """Creates a circuit layer containing Hadamard gates on the specified qubits. + + Args: + which_qubits: Specifies which qubits get the Hadamard gates. + Valid values are "all", "gauge", or "matter". + + Returns: + A cirq.Circuit containing a single Moment of Hadamard operations. + """ + moment = [] + if which_qubits == "gauge": + for q in self.gauge_qubits: + moment.append(cirq.H(q)) + elif which_qubits == "matter": + for q in self.matter_qubits: + moment.append(cirq.H(q)) + elif which_qubits == "all": + for q in self.all_qubits: + moment.append(cirq.H(q)) + return cirq.Circuit.from_moments(cirq.Moment(moment)) + + def layer_measure(self) -> cirq.Circuit: + """Creates a circuit layer containing measurement operations on all qubits. + + Returns: + A cirq.Circuit containing a single Moment of measurement operations + with the key "m". + """ + moment = [] + for q in self.all_qubits: + moment.append(cirq.measure(q, key="m")) + return cirq.Circuit.from_moments(cirq.Moment(moment)) + + def trotter_circuit(self, n_cycles, two_qubit_gate: TwoQubitGate = TwoQubitGate.CZ) -> cirq.Circuit: + """Constructs the second-order Trotter circuit for the DFL Hamiltonian. + + Args: + n_cycles: The number of Trotter steps/cycles to include in the circuit. + two_qubit_gate: The type of two-qubit gate to use in the layers. + Use an Enum member from TwoQubitGate (CZ or CPHASE). + + Returns: + The complete cirq.Circuit for the Trotter evolution. + """ + if two_qubit_gate.value == TwoQubitGate.CZ.value: + return self._layer_floquet_cz_simultaneous() * n_cycles + + elif two_qubit_gate.value == TwoQubitGate.CPHASE.value: + if n_cycles == 0: + return cirq.Circuit() + if n_cycles == 1: + return ( + self._layer_floquet_cphase_simultaneous_first() + + self._layer_floquet_cphase_simultaneous_last_missing_piece() + ) + else: + return ( + self._layer_floquet_cphase_simultaneous_first() + + (n_cycles - 1) * self._layer_floquet_cphase_simultaneous_middle() + + self._layer_floquet_cphase_simultaneous_last_missing_piece() + ) + + def layer_floquet( + self, two_qubit_gate: TwoQubitGate = TwoQubitGate.CZ, layer="middle" + ) -> cirq.Circuit: + + """Constructs a layer of the Trotter circuit. + + Args: + two_qubit_gate: The type of two-qubit gate to use. + Use an Enum member from TwoQubitGate (CZ or CPHASE). + layer: Specifies which piece of the sequence to return. + Valid values are "middle", "last", or "first". + + Returns: + The cirq.Circuit representing the requested Trotter layer. + + Raises: + ValueError: If an invalid option for `two_qubit_gate` or `layer` is given. + """ + + if two_qubit_gate.value == TwoQubitGate.CZ.value: + return self._layer_floquet_cz_simultaneous() + + elif two_qubit_gate.value == TwoQubitGate.CPHASE.value: + if layer == "middle": + return self._layer_floquet_cphase_simultaneous_middle() + elif layer == "last": + return self._layer_floquet_cphase_simultaneous_last_missing_piece() + elif layer == "first": + return self._layer_floquet_cphase_simultaneous_first() + else: + raise ValueError("Invalid layer option") + else: + raise ValueError("Invalid two_qubit_gate option") + + def _energy_bump_initial_state( + self, + matter_config: InitialState, + excited_qubits: Sequence[cirq.GridQubit], + ) -> cirq.Circuit: + """Circuit for energy bump initial state.""" + + theta = np.arctan(self.h) + moment = [] + for q in self.gauge_qubits: + if q in excited_qubits: + moment.append(cirq.ry(np.pi + theta).on(q)) + else: + moment.append(cirq.ry(theta).on(q)) + + for q in self.matter_qubits: + if matter_config.value == InitialState.SINGLE_SECTOR.value: + moment.append(cirq.H(q)) + + return cirq.Circuit.from_moments(moment) + + def _layer_floquet_cz_simultaneous(self) -> cirq.Circuit: + """Circuit for a trotter step of the DFL Hamiltonian + Second order Trotter U(t) = e^(-itA/2) e^(-itB) e^(-iA/2). + We take A = zZz term and B = Rx terms, so the trotter layer + should look like UB Rz(t/2) UB Rx(t) UB Rz(t/2) UB. + However, UB from previous layer cancels out with UB in the + following layer so the trotter layer now looks like + ....Rz(t/2) UB Rx(t) UB Rz(t/2).... + """ + + moment_rz = [] + moment_rx = [] + moment_h = [] + for q in self.gauge_qubits: + moment_rz.append(cirq.rz(self.dt).on(q)) + moment_h.append(cirq.H(q)) + moment_rx.append(cirq.rx(2 * self.h * self.dt).on(q)) + + for q in self.matter_qubits: + moment_rx.append(cirq.rx(2 * self.mu * self.dt).on(q)) + + return ( + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + + self._change_basis() + + cirq.Circuit.from_moments(cirq.Moment(moment_rx)) + + self._change_basis() + + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + ) + + def _layer_floquet_cphase_simultaneous_middle(self) -> cirq.Circuit: + """Circuit for a trotter step of the DFL Hamiltonian in terms of CPhase + Second order Trotter U(t) = e^(-itA/2) e^(-itB) e^(-iA/2). + After cancelling U_B's the circuit for the trotter layer looks like + Rz(t/2) UB Rx(t) UB Rz(t/2). However we can condense UB Rz(t) UB of two + consecutive layers in terms of Cphase gates bringing 8 CZ gates down to + 4 CZ and 2 CPhase gates. Let's take a repeating layer in the middle of the form + ...UB Rz(t/2) Rz(t/2) UB Rx(t)..... = ...UB Rz(t) UB Rx(t)... + Thus, the first layer should look like Rz(t/2)UB Rx(t)..... + and the last layer is missing ....UB Rz(T/2) + """ + + moment_0x = [] + moment_1x = [] + + moment_0y = [] + moment_1y = [] + + moment_h = [] + + moment_rz1 = [] + moment_rz2 = [] + + moment_rx = [] + + for q in self.gauge_qubits: + nbrs = self.get_x_neighbors(q) + if len(nbrs) == 2: + q0, q1 = nbrs[0], nbrs[1] + moment_0x.append(cirq.CZ(q0, q)) # left to right + moment_1x.append(cirq.cphase(-4 * self.dt).on(q1, q)) # right to left + + moment_h.append(cirq.H(q)) + + moment_rz1.append(cirq.rz(2 * self.dt).on(q)) + moment_rz1.append(cirq.rz(2 * self.dt).on(q1)) + + for q in self.gauge_qubits: + nbrs = self.get_y_neighbors(q) + if len(nbrs) == 2: + q0, q1 = nbrs[0], nbrs[1] + moment_0y.append(cirq.CZ(q0, q)) # top to bottom + moment_1y.append(cirq.cphase(-4 * self.dt).on(q1, q)) # bottom to top + + moment_h.append(cirq.H(q)) + + moment_rz2.append(cirq.rz(2 * self.dt).on(q)) + moment_rz2.append(cirq.rz(2 * self.dt).on(q1)) + + for q in self.gauge_qubits: + moment_rx.append(cirq.rx(2 * self.h * self.dt).on(q)) + + for q in self.matter_qubits: + moment_rx.append(cirq.rx(2 * self.mu * self.dt).on(q)) + + return cirq.Circuit.from_moments( + cirq.Moment(moment_h), + cirq.Moment(moment_0x), + cirq.Moment(moment_0y), + cirq.Moment(moment_h), + cirq.Moment(moment_1x), + cirq.Moment(moment_rz1), + cirq.Moment(moment_1y), + cirq.Moment(moment_rz2), + cirq.Moment(moment_h), + cirq.Moment(moment_0x), + cirq.Moment(moment_0y), + cirq.Moment(moment_h), + cirq.Moment(moment_rx), + ) + + def _layer_floquet_cphase_simultaneous_first(self) -> cirq.Circuit: + """Circuit for a trotter step of the DFL Hamiltonian in terms of CPhase. + After cancelling U_B's the circuit for the middle trotter layer looks like + ...Rz(t/2) UB Rx(t) UB Rz(t/2).... However we can condense UB Rz(t) UB of two + consecutive layers in terms of Cphase gates bringing 8 CZ gates down to + 4 CZ and 2 CPhase gates. A layer in the middle then takes the form + ...UB Rz(t/2) Rz(t/2) UB Rx(t)..... = ...UB Rz(t) UB Rx(t)... + + However,the first floquet layer for the cphase implementation looks like + Rz(t/2)UB Rx(t).... Here, we will use the CZ implementation of UB. + """ + + moment_rz = [] + moment_rx = [] + + for q in self.gauge_qubits: + moment_rz.append(cirq.rz(self.dt).on(q)) + moment_rx.append(cirq.rx(2 * self.h * self.dt).on(q)) + + for q in self.matter_qubits: + moment_rx.append(cirq.rx(2 * self.mu * self.dt).on(q)) + + return ( + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + + self._change_basis() + + cirq.Circuit.from_moments(cirq.Moment(moment_rx)) + ) + + def _layer_floquet_cphase_simultaneous_last_missing_piece(self) -> cirq.Circuit: + """Circuit for a trotter step of the DFL Hamiltonian in terms of CPhase. + After cancelling U_B's the circuit for the trotter layer looks like + Rz(t/2) UB Rx(t) UB Rz(t/2). However we can condense UB Rz(t) UB of two + consecutive layers in terms of Cphase gates bringing 8 CZ gates down to + 4 CZ and 2 CPhase gates. A layer in the middle then takes the form + ...UB Rz(t/2) Rz(t/2) UB Rx(t)..... = ...UB Rz(t) UB Rx(t)... + + However,the last floquet layer for the cphase implementation looks like + ......UB Rz(t/2) Here, we will use the CZ implementation of UB. + """ + moment_rz = [] + for q in self.gauge_qubits: + moment_rz.append(cirq.rz(self.dt).on(q)) + + return self._change_basis() + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + + def _change_basis(self) -> cirq.Circuit: + """Transform the LGT basis to the dual basis.""" + moment_0x = [] + moment_1x = [] + + moment_0y = [] + moment_1y = [] + + moment_h = [] + + for q in self.gauge_qubits: + nbrs = self.get_x_neighbors(q) + if len(nbrs) == 2: + q0, q1 = nbrs[0], nbrs[1] + moment_0x.append(cirq.CZ(q0, q)) + moment_1x.append(cirq.CZ(q1, q)) + moment_h.append(cirq.H(q)) + + for q in self.gauge_qubits: + nbrs = self.get_y_neighbors(q) + if len(nbrs) == 2: + q0, q1 = nbrs[0], nbrs[1] + moment_0y.append(cirq.CZ(q0, q)) + moment_1y.append(cirq.CZ(q1, q)) + moment_h.append(cirq.H(q)) + + return cirq.Circuit.from_moments( + cirq.Moment(moment_h), + cirq.Moment(moment_0x), + cirq.Moment(moment_1x), + cirq.Moment(moment_0y), + cirq.Moment(moment_1y), + cirq.Moment(moment_h), + ) + + def _interaction_indices(self) -> Sequence[Tuple[int, int, int]]: + indices = [] + for q in self.gauge_qubits: + nbrs = self.get_x_neighbors(q) + if nbrs: + q0, q1 = nbrs[0], nbrs[1] + i0, i, i1 = ( + self.all_qubits.index(q0), + self.all_qubits.index(q), + self.all_qubits.index(q1), + ) + indices.append((i0, i, i1)) + + nbrs = self.get_y_neighbors(q) + if nbrs: + q0, q1 = nbrs[0], nbrs[1] + i0, i, i1 = ( + self.all_qubits.index(q0), + self.all_qubits.index(q), + self.all_qubits.index(q1), + ) + indices.append((i0, i, i1)) + return indices + + def _matter_indices(self) -> Sequence[int]: + indices = [] + for i, q in enumerate(self.all_qubits): + if q in self.matter_qubits: + indices.append(i) + return indices + + def _gauge_indices(self) -> Sequence[int]: + indices = [] + for i, q in enumerate(self.all_qubits): + if q in self.gauge_qubits: + indices.append(i) + return indices + + def _compute_observables_one_instance_dual_basis( + self, bits_z: npt.NDArray[np.int8], bits_x: npt.NDArray[np.int8] + ) -> Sequence[np.ndarray]: + """Computes all expectation values for a single measurement instance. + + This function calculates the mean and variance for the gauge X, matter X, + interaction, and total energy terms based on the measured bitstrings in + both the Z and X bases for a single instance of the experiment. + + Args: + bits_z: The measurement outcomes (bitstrings) in the Z basis. + bits_x: The measurement outcomes (bitstrings) in the X basis. + + Returns: + A sequence of NumPy arrays, where each array contains the calculated + expectation value (mean) and variance for one of the four observables. + """ + bits_x_rescaled = 1 - 2 * bits_x + bits_z_rescaled = 1 - 2 * bits_z + + gauge_inds = [self.all_qubits.index(q) for q in self.gauge_qubits] + + matter_inds = [self.all_qubits.index(q) for q in self.matter_qubits] + gauge_inds = [self.all_qubits.index(q) for q in self.gauge_qubits] + + exp_gauge_x = np.mean(bits_x_rescaled[:, gauge_inds], axis=0) + var_gauge_x = np.var(bits_x_rescaled[:, gauge_inds], axis=0) + + exp_interaction = np.mean(bits_z_rescaled[:, gauge_inds], axis=0) + var_interaction = np.mean(bits_z_rescaled[:, gauge_inds], axis=0) + + matter_x_terms = [] + # in the dual basis the matter x takes the form + # \sigma^x_j -> \sigma^x_j \prod_{k \in N(j)} X_{jk} + + for q in self.matter_qubits: + prod_x = bits_x_rescaled[:, self.all_qubits.index(q)] + q_nbrs = [q_n for q_n in q.neighbors() if q_n in self.all_qubits] + for q_n in q_nbrs: + prod_x *= bits_x_rescaled[ + :, self.all_qubits.index(cast(cirq.GridQubit, q_n)) + ] + matter_x_terms.append(prod_x) + + exp_matter_x = np.mean(matter_x_terms, axis=1) + var_matter_x = np.var(matter_x_terms, axis=1) + + exp_energy = [] + var_energy = [] + for _, idx in enumerate(self._interaction_indices()): + q0 = self.all_qubits[idx[0]] + q1 = self.all_qubits[idx[2]] + + w0 = 1 / len([n for n in q0.neighbors() if n in self.gauge_qubits]) + w1 = 1 / len([n for n in q1.neighbors() if n in self.gauge_qubits]) + + mat_1_index = self.matter_qubits.index(self.all_qubits[idx[0]]) + gauge_index = self.gauge_qubits.index(self.all_qubits[idx[1]]) + mat_2_index = self.matter_qubits.index(self.all_qubits[idx[2]]) + + em = ( + exp_interaction[gauge_index] + + self.h * exp_gauge_x[gauge_index] + + self.mu + * (w0 * exp_matter_x[mat_1_index] + w1 * exp_matter_x[mat_2_index]) + ) + ev = ( + var_interaction[gauge_index] + + self.h * var_gauge_x[gauge_index] + + self.mu + * (w0 * var_matter_x[mat_1_index] + w1 * var_matter_x[mat_2_index]) + ) + exp_energy.append(em) + var_energy.append(ev) + + return [ + np.stack([exp_matter_x, var_matter_x], axis=-1), + np.stack([exp_gauge_x, var_gauge_x], axis=-1), + np.stack([exp_interaction, var_interaction], axis=-1), + np.stack([exp_energy, var_energy], axis=-1), + ] + + def compute_observables_dual_basis( + self, + bits_z: Sequence[npt.NDArray[np.int8]], + bits_x: Sequence[npt.NDArray[np.int8]], + ) -> Sequence[np.ndarray]: + """This calculates the mean and variance over multiple instances + of measurement outcomes for the matter X, gauge X, interaction + and energy terms. + + Args: + bits_z: Measurement outcomes in the Z basis. + bits_x: Measurement outcomes in the X basis. + + Returns: + A sequence of NumPy arrays, where each array contains the mean and + variance for the respective observable. + """ + num_instances = len(bits_z) + if num_instances == 1: + return self._compute_observables_one_instance_dual_basis( + bits_z[0], bits_x[0] + ) + else: + # compute mean and variance of the mean over randomized instances + matter_x = [] + gauge_x = [] + interaction = [] + energy = [] + for ins in range(num_instances): + mx, gx, intr, e = self._compute_observables_one_instance_dual_basis( + bits_z[ins], bits_x[ins] + ) + matter_x.append(mx[:, 0]) + gauge_x.append(gx[:, 0]) + interaction.append(intr[:, 0]) + energy.append(e[:, 0]) + + return [ + np.stack( + [ + np.mean(np.array(matter_x), axis=0), + np.var(np.array(matter_x), axis=0) / num_instances, + ], + axis=-1, + ), + np.stack( + [ + np.mean(np.array(gauge_x), axis=0), + np.var(np.array(gauge_x), axis=0) / num_instances, + ], + axis=-1, + ), + np.stack( + [ + np.mean(np.array(interaction), axis=0), + np.var(np.array(interaction), axis=0) / num_instances, + ], + axis=-1, + ), + np.stack( + [ + np.mean(np.array(energy), axis=0), + np.var(np.array(energy), axis=0) / num_instances, + ], + axis=-1, + ), + ] + + def _postselect_on_charge_one_instance_dual_basis( + self, bits: npt.NDArray[np.int8] + ) -> npt.NDArray[np.int8]: + q_measured = [] + for q in self.matter_qubits: + q_measured.append(1 - 2 * bits[self.all_qubits.index(q)]) + q_measured_array = np.array(q_measured) + q_measured_array = np.transpose(q_measured_array) + charges = np.repeat(1, q_measured_array.shape[0]) + selected = np.all(q_measured_array == charges, axis=0) + return bits[selected] + + def postselect_on_charge_dual_basis(self, bits: npt.NDArray[np.int8]) -> np.ndarray: + """Performs charge post-selection on measurement outcomes. + + Args: + bits: The raw measurement outcomes. + + Returns: + A NumPy array containing only the post-selected measurement instances. + """ + num_instances = bits.shape[0] + bits_ps = [] + for ins in range(num_instances): + bits_ps_i = self._postselect_on_charge_one_instance_dual_basis(bits[ins]) + if len(bits_ps_i) > 0: + bits_ps.append(bits_ps_i) + return np.array(bits_ps) + + def get_2d_dfl_experiment_circuits( + self, + initial_state: InitialState, + n_cycles: Sequence[int] | npt.NDArray, + excited_qubits: Sequence[cirq.GridQubit], + n_instances: int = 10, + two_qubit_gate: TwoQubitGate = TwoQubitGate.CZ, + basis: Basis = Basis.DUAL, + ) -> List[cirq.Circuit]: + """Generates the set of circuits needed for the 2D DFL experiment. + + Args: + initial_state: The initial state preparation.Use an Enum member from + InitialState (SINGLE_SECTOR or SUPERPOSITION). + n_cycles: The number of Trotter steps (cycles) to simulate. + excited_qubits: Qubits to be excited in the initial state. + n_instances: The number of instances to generate + two_qubit_gate: The type of two-qubit gate to use in the Trotter step. + Use an Enum member from TwoQubitGate (CZ or CPHASE). + basis: The basis for the final circuit structure. Use an Enum member from + Basis (LGT or DUAL). + + + Returns: + A list of all generated cirq.Circuit objects. + + Raises: + ValueError: If an invalid option for `initial_state` + or `basis` is given. + """ + if initial_state.value == InitialState.SINGLE_SECTOR.value: + initial_circuit = self._energy_bump_initial_state( + InitialState.SINGLE_SECTOR, excited_qubits + ) + elif initial_state.value == InitialState.SUPERPOSITION.value: + initial_circuit = self._energy_bump_initial_state( + InitialState.SUPERPOSITION, excited_qubits + ) + else: + raise ValueError("Invalid initial state") + circuits = [] + for n_cycle in tqdm(n_cycles): + print(int(np.max([0, n_cycle - 1]))) + circ = initial_circuit + self.trotter_circuit(n_cycle, two_qubit_gate) + if basis.value == Basis.LGT.value: + circ += self._change_basis() + elif basis.value == Basis.DUAL.value: + pass + else: + raise ValueError("Invalid option for basis") + for _ in range(n_instances): + if basis.value == Basis.LGT.value: + circ_z = circ + cirq.measure([q for q in self.all_qubits], key="m") + elif basis.value == Basis.DUAL.value: + circ_z = ( + circ + + self.layer_hadamard("matter") + + cirq.measure([q for q in self.all_qubits], key="m") + ) + else: + raise ValueError("Invalid option for basis") + circ_x = ( + circ + + self.layer_hadamard("all") + + cirq.measure([q for q in self.all_qubits], key="m") + ) + circuits.append(circ_z) + circuits.append(circ_x) + return circuits diff --git a/recirq/dfl/dfl_enums.py b/recirq/dfl/dfl_enums.py new file mode 100644 index 00000000..ee2804ef --- /dev/null +++ b/recirq/dfl/dfl_enums.py @@ -0,0 +1,48 @@ +# Copyright 2025 Google +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Enums defining the core gates, states, and bases for +the Disorder-Free Localization (DFL) experiment. +""" + +import enum + +class TwoQubitGate(enum.Enum): + """Available two-qubit gate types for the Trotter simulation. + + CZ refers to the two-qubit 'cz' gate + CPHASE refers to the 'cphase' operation + """ + CZ = 'cz' + CPHASE = 'cphase' + + +class InitialState(enum.Enum): + """Available initial quantum states for the DFL experiment. + + SINGLE_SECTOR refers to the gauge invariant 'single_sector' state + SUPERPOSITION refers to the 'superposition' state of disorder configurations. + """ + SINGLE_SECTOR = 'single_sector' + SUPERPOSITION = 'superposition' + + +class Basis(enum.Enum): + """Available bases for the DFL experiment. + + LGT refers to the local lattice gauge theory ('lgt') basis. + DUAL refers to the 'dual' basis. + """ + LGT = 'lgt' + DUAL = 'dual' diff --git a/recirq/dfl/dfl_experiment.py b/recirq/dfl/dfl_experiment.py new file mode 100644 index 00000000..2d0a8ace --- /dev/null +++ b/recirq/dfl/dfl_experiment.py @@ -0,0 +1,1655 @@ +# Copyright 2025 Google +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Core experiment classes for the DFL project. + +This module defines the DFLExperiment base class and DFLExperiment2D derived +class, which handle the overall simulation workflow and data management for +the 1D and 2D Disorder-Free Localization (DFL) experiments. + +The methods are based on the paper: https://arxiv.org/abs/2410.06557 +""" + +import os +import pickle +from copy import deepcopy +from functools import partial +from typing import cast, Sequence + +import cirq +import cirq.transformers.dynamical_decoupling as dd +import numpy as np +from tqdm import tqdm + +import concurrent.futures +from recirq.dfl import dfl_1d as dfl +from recirq.dfl import dfl_2d_second_order_trotter as dfl_2d + +#import Enums +from recirq.dfl.dfl_enums import TwoQubitGate, InitialState, Basis + + + + +def _apply_gauge_compiling(seed: int, circuit: cirq.Circuit) -> cirq.Circuit: + return cirq.merge_single_qubit_moments_to_phxz( + cirq.transformers.gauge_compiling.CPhaseGaugeTransformerMM()( + circuit, rng_or_seed=seed + ) + ) + + +def _distance(q1: cirq.GridQubit, q2: cirq.GridQubit) -> int: + """Return the Manhattan distance between two qubits. + + Args: + q1: The first qubit. + q2: The second qubit. + + Returns: + The distance between the qubits. + """ + return abs(q1.row - q2.row) + abs(q1.col - q2.col) + + +class DFLExperiment: + """A class for performing the 1D DFL experiment (Fig 1 of the paper). + The paper is available at: https://arxiv.org/abs/2410.06557 + + Attrs: + qubits: The qubits to use for the experiment. + sampler: The cirq sampler to use. + j: The coefficient on the hopping term. + h: The coefficient on the gauge X term. + mu: The coefficient on the matter sigma_x term. + tau: The Trotter step size. + pbc: Whether to use periodic boundary conditions. + rng: The pseudorandom number generator to use for readout benchmarking. + cycles: The number of cycles for which to run the experiment. + num_gc_instances: The number of gauge compiling instances to use. + include_zero_trotter: Whether to include circuits with the Trotter step set to 0. + """ + + def __init__( + self, + qubits: list[cirq.GridQubit], + sampler: cirq.Sampler, + save_directory: str, + j: float = 1.0, + h: float = 1.3, + mu: float = 1.5, + tau: float = 0.25, + pbc: bool = True, + rng: np.random.Generator = np.random.default_rng(), + cycles: np.ndarray = np.arange(31), + num_gc_instances: int = 40, + use_cphase: bool = False, + include_zero_trotter: bool = True, + ): + self.qubits = qubits + self.sampler = sampler + self.j = j + self.h = h + self.mu = mu + self.tau = tau + assert len(qubits) % 2 == 0 + self.pbc = pbc + self.rng = rng + self.cycles = cycles + self.all_circuits: list[cirq.Circuit] = [] + self.e0 = np.array([]) + self.e1 = np.array([]) + self.num_gc_instances = num_gc_instances + self.executor = concurrent.futures.ProcessPoolExecutor() + self.use_cphase = use_cphase + self.readout_ideal_bitstrs = np.array([]) + self.save_directory = save_directory + self.include_zero_trotter = include_zero_trotter + self.matter_sites = np.arange(0, len(qubits), 2) + self.gauge_sites = np.arange(1, len(qubits), 2) + + def save_to_file(self, filename: str): + """Saves the experiment parameters and measurement + results to a file. + + Args: + filename: The path and filename to which the data + dictionary will be pickled. + """ + d_to_save = { + "measurements": self.measurements, + "readout_ideal_bitstrs": self.readout_ideal_bitstrs, + "j": self.j, + "h": self.h, + "mu": self.mu, + "tau": self.tau, + "pbc": self.pbc, + "cycles": self.cycles, + "num_gc_instances": self.num_gc_instances, + "use_cphase": self.use_cphase, + "e0": self.e0, + "e1": self.e1, + "qubits": self.qubits, + "save_directory": self.save_directory, + "include_zero_trotter": self.include_zero_trotter, + } + + pickle.dump(d_to_save, open(filename, "wb")) + + def load_from_file(self, filename: str): + """Loads experiment parameters and measurement results from a saved file. + + Args: + filename: The path and filename from which to load the data + dictionary. + """ + d = pickle.load(open(filename, "rb")) + self.measurements = d["measurements"] + self.readout_ideal_bitstrs = d["readout_ideal_bitstrs"] + self.j = d["j"] + self.h = d["h"] + self.mu = d["mu"] + self.tau = d["tau"] + self.pbc = d["pbc"] + self.cycles = d["cycles"] + self.num_gc_instances = d["num_gc_instances"] + self.use_cphase = d["use_cphase"] + self.e0 = d["e0"] + self.e1 = d["e1"] + self.qubits = d["qubits"] + self.save_directory = d.get("save_directory", "temp") + self.include_zero_trotter = d.get("include_zero_trotter", True) + + def generate_circuit_instances( + self, + initial_state: str, + ncycles: int, + basis: str, + gauge_compile: bool = True, + dynamical_decouple: bool = True, + dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), + zero_trotter: bool = False, + ) -> list[cirq.Circuit]: + """Generates the circuit instances for a given initial state, number of cycles, + and measurement basis. + + Args: + initial_state: The initial state. Can be "gauge_invariant", "single_sector", + or "superposition". + ncycles: The number of Trotter steps. + basis: The measurement basis. Must be "z" or "x". + gauge_compile: Whether to apply gauge compiling. + dynamical_decouple: Whether to apply dynamical decoupling. + dd_sequence: The dynamical decoupling sequence to use. + zero_trotter: Whether to set the Trotter step size to 0. + + Returns: + A list of the generated circuit instances. + """ + if initial_state == "gauge_invariant": + initial_state = "single_sector" + + assert initial_state in ["single_sector", "superposition"] + + if initial_state == "single_sector": + initial_state_enum = InitialState.SINGLE_SECTOR + else: + initial_state_enum = InitialState.SUPERPOSITION + + circuit = dfl.get_1d_dfl_experiment_circuits( + self.qubits, + initial_state_enum, + [ncycles], + [self.qubits[1]], # put the excitation on the first gauge site + 0.0 if zero_trotter else self.tau, + self.h, + self.mu, + 1, + TwoQubitGate.CPHASE if self.use_cphase else TwoQubitGate.CZ, + Basis.DUAL, + )[["z", "x"].index(basis)] + + if gauge_compile: + circuits = list( + self.executor.map( + partial(_apply_gauge_compiling, circuit=circuit), + range(self.num_gc_instances), + ) + ) + else: + circuits = [circuit] + + if dynamical_decouple: + circuits_dd = self.executor.map( + partial(dd.add_dynamical_decoupling, schema=dd_sequence), circuits + ) + circuits = list(circuits_dd) + + return circuits + + def create_readout_benchmark_circuits( + self, num_random_bitstrings: int = 30 + ) -> tuple[np.ndarray, list[cirq.Circuit]]: + """Creates circuits for readout error benchmarking. + Args: + num_random_bitstrings: The number of random bitstrings to use. + Returns: + A tuple containing: + 1. The ideal bitstrings + 2. A list of the corresponding readout benchmark circuits. + """ + n = len(self.qubits) + x_or_I = lambda bit: cirq.X if bit == 1 else cirq.I + bitstrs = self.rng.integers(0, 2, size=(num_random_bitstrings, n)) + random_circuits = [] + random_circuits = [ + cirq.Circuit( + [ + x_or_I(bit)(qubit) + for bit, qubit in zip(bitstr, self.qubits[: len(bitstr)]) + ] + + [cirq.M(*self.qubits[: len(bitstr)], key="m")] + ) + for bitstr in bitstrs + ] + return bitstrs, random_circuits + + def identify_ideal_readout_bitstrings_from_circuits(self): + """Identifies the ideal expected bitstrings for the readout calibration circuits. + """ + # first identify the number of readout bitstrings: + for num_bitstrs, circuit in enumerate(self.all_circuits[::-1]): + if len(circuit) > 2: + break + + bit_fn = lambda gate: 1 if gate == cirq.X else 0 + self.readout_ideal_bitstrs = np.array( + [ + [bit_fn(circuit[0][qubit].gate) for qubit in self.qubits] + for circuit in self.all_circuits[-num_bitstrs:] + ] + ) + + def generate_all_circuits( + self, + gauge_compile: bool = True, + dynamical_decouple: bool = True, + dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), + num_random_bitstrings: int = 30, + ): + """Generates and stores all circuits needed for the DFL experiment. + The results are stored in `self.all_circuits`. + + Args: + gauge_compile: Whether to apply gauge compiling instances. + dynamical_decouple: Whether to apply dynamical decoupling to the circuits. + dd_sequence: The dynamical decoupling sequence to use. + num_random_bitstrings: The number of random bitstrings to use for + readout error benchmarking. + """ + if not gauge_compile: + self.num_gc_instances = 1 + num_gc_instances = self.num_gc_instances + + zero_trotter_options = [False, True] if self.include_zero_trotter else [False] + + all_circuits = [] + with tqdm( + total=2 + * 2 + * len(zero_trotter_options) + * len(self.cycles) + * num_gc_instances + ) as pbar: + for initial_state in ["gauge_invariant", "superposition"]: + for basis in ["z", "x"]: + for zero_trotter in zero_trotter_options: + for ncycles in self.cycles: + all_circuits += self.generate_circuit_instances( + initial_state, + ncycles, + basis, + gauge_compile, + dynamical_decouple, + dd_sequence, + zero_trotter, + ) + pbar.update(num_gc_instances) + + bitstrs, readout_circuits = self.create_readout_benchmark_circuits( + num_random_bitstrings + ) + all_circuits += readout_circuits + + expected_number = ( + len(self.cycles) * 2 * 2 * len(zero_trotter_options) * num_gc_instances + + num_random_bitstrings + ) + assert len(all_circuits) == expected_number + + self.all_circuits = all_circuits + self.readout_ideal_bitstrs = bitstrs + + def load_circuits_from_file( + self, filename: str, old_qubits_list: list[cirq.Qid] | None = None + ): + """Load the circuits from a file. + + Args: + filename: The filename from which to load the circuits. + old_qubits_list: The previous ordered list of qubits if + mapping to different qubits. + """ + circuits = pickle.load(open(filename, "rb")) + if old_qubits_list is not None and old_qubits_list != self.qubits: + self.all_circuits = [ + circuit.transform_qubits(dict(zip(old_qubits_list, self.qubits))) + for circuit in tqdm(circuits, total=len(circuits)) + ] + else: + self.all_circuits = circuits + self.identify_ideal_readout_bitstrings_from_circuits() + + def shuffle_circuits_and_save( + self, batch_size: int = 500, delete_circuit_list: bool = True + ): + """Shuffle all of the circuits and save them to files. + + Args: + batch_size: The maximum number of circuits per file. + delete_circuit_list: Whether to delete the circuit list from this object after saving, saves memory. + """ + if not os.path.isdir(self.save_directory + "/shuffled_circuits"): + os.mkdir(self.save_directory + "/shuffled_circuits") + + # shuffle the circuits: + circuits = self.all_circuits + indices = np.arange(len(circuits)) + self.rng.shuffle(indices) + self.run_order = indices + inv_map = np.array([list(indices).index(_) for _ in range(len(circuits))]) + circuits_shuffled = [circuits[_] for _ in indices] + # save the shuffled circuits + for start_idx in tqdm(range(0, len(circuits), batch_size)): + circuits_i = circuits_shuffled[start_idx: (start_idx + batch_size)] + pickle.dump( + circuits_i, + open( + self.save_directory + f"/shuffled_circuits/{start_idx}.pickle", "wb" + ), + ) + + params = { + "indices": indices, + "inv_map": inv_map, + "readout_ideal_bitstrs": self.readout_ideal_bitstrs, + "batch_size": batch_size, + } + pickle.dump( + params, open(self.save_directory + "/shuffled_circuits/params.pickle", "wb") + ) + + if delete_circuit_list: + del self.all_circuits + + def run_experiment( + self, + repetitions_post_selection: int | list[int] = 2000, + repetitions_non_post_selection: int | list[int] = 2000, + batch_size: int = 500, + readout_repetitions: int = 1000, + gauge_compile: bool = True, + dynamical_decouple: bool = True, + dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), + num_random_bitstrings: int = 30, + ): + """Run the experiment. First, shuffles and saves the circuits and then runs from the saved + circuits. + + Calls generate_all_circuits, then shuffle_circuits_and_save, then run_experiment_from_saved_circuits, then load_shuffled_measurements. + + Note: These default values are used for the 1D experiment. For the 2D experiment, we use + ``` + repetitions_post_selection = [ + 1000, + 1000, + 1000, + 1000, + 1000, + 1000, + 1000, + 1000, + 1136, + 1317, + 1604, + 2267, + 4139, + 5655, + 6579, + 7688, + 11550, + 17332, + 25493, + 37463, + 46275, + 63527, + 69691, + 111862, + 137194, + 227824, + 348209, + 346286, + 406434, + 500000, + 682846, + ], + repetitions_non_post_selection = 1000 + ``` + + Args: + repetitions_post_selection: How many repetitions to use for the gauge invariant + initial state at each cycle number. Can be a single integer (for uniform repetitions) + or a list of integers (to specify repetitions per cycle). + repetitions_non_post_selection: How many repetitions to use for the superposition + initial state. Can be a single integer (for uniform repetitions) or a list of + integers (to specify repetitions per cycle). + batch_size: The maximum number of circuits per file and per run_batch call. + gauge_compile: Whether to add gauge compiling. + dynamical_decouple: Whether to add dynamical decoupling. + dd_sequence: The dynamical decoupling sequence to use. + num_random_bitstrings: The number of bitstrings to use for readout mitigation. + readout_repetitions: The number of repetitions to use for readout benchmarking. + """ + self.generate_all_circuits( + gauge_compile=gauge_compile, + dynamical_decouple=dynamical_decouple, + dd_sequence=dd_sequence, + num_random_bitstrings=num_random_bitstrings, + ) + self.shuffle_circuits_and_save(batch_size=batch_size, delete_circuit_list=True) + self.run_experiment_from_saved_circuits( + repetitions_post_selection=repetitions_post_selection, + repetitions_non_post_selection=repetitions_non_post_selection, + initial_start_index=0, + readout_repetitions=readout_repetitions, + ) + self.load_shuffled_measurements() + self.extract_readout_error_rates() + + def run_experiment_from_saved_circuits( + self, + repetitions_post_selection: int | list[int] = 2000, + repetitions_non_post_selection: int | list[int] = 2000, + initial_start_index: int = 0, + old_qubits: list[cirq.Qid] | None = None, + new_qubits: list[cirq.Qid] | None = None, + readout_repetitions: int = 1000, + ): + """Run the experiment from circuits that are saved ahead of time. Saves the results to + files. + + To use this, the circuits should have been saved with shuffle_circuits_and_save. + + The location of the saved files is `self.save_directory + "/shuffled_results"`. + + Note: These default values are used for the 1D experiment. For the 2D experiment, we use + the values specified in the docstring of method "run_experiment". + + Args: + repetitions_post_selection: How many repetitions to use for the gauge invariant + initial state at each cycle number. Can be a single integer (for uniform repetitions) + or a list of integers (to specify repetitions per cycle). + repetitions_non_post_selection: How many repetitions to use for the superposition + initial state. Can be a single integer (for uniform repetitions) or a list of + integers (to specify repetitions per cycle). + initial_start_index: The circuit number to start at (inteneded for resuming datataking if it crashed before). + old_qubits: The order of qubits for which the circuits were originally generated. + new_qubits: The order of qubits to use now. + readout_repetitions: The number of repetitions to use for readout benchmarking. + """ + + # convert to lists: + repetitions_post_selection_list = list( + np.zeros(len(self.cycles), dtype=int) + repetitions_post_selection + ) + repetitions_non_post_selection_list = list( + np.zeros(len(self.cycles), dtype=int) + repetitions_non_post_selection + ) + + # load the parameters for the shuffled circuits: + params = pickle.load( + open(self.save_directory + "/shuffled_circuits/params.pickle", "rb") + ) + indices = params["indices"] + inv_map = params["inv_map"] + self.readout_ideal_bitstrs = params["readout_ideal_bitstrs"] + batch_size = params["batch_size"] + self.run_order = indices + num_random_bitstrings = len(self.readout_ideal_bitstrs) + zero_trotter_options = [0, 1] if self.include_zero_trotter else [0] + tot_num_circuits = ( + len(self.cycles) * 2 * 2 * len(zero_trotter_options) * self.num_gc_instances + + num_random_bitstrings + ) + + repetitions = [readout_repetitions] * tot_num_circuits + + # fill in other repetition numbers + for init_state_number, reps_list in [ + (0, repetitions_post_selection_list), + (1, repetitions_non_post_selection_list), + ]: + for basis_number in [0, 1]: + for zero_trotter_number in zero_trotter_options: + for cycle_number in self.cycles: + start_index = ( + init_state_number + * 2 + * len(zero_trotter_options) + * len(self.cycles) + * self.num_gc_instances + + basis_number + * len(zero_trotter_options) + * len(self.cycles) + * self.num_gc_instances + + zero_trotter_number + * len(self.cycles) + * self.num_gc_instances + + cycle_number * self.num_gc_instances + ) + end_index = start_index + self.num_gc_instances + repetitions[start_index:end_index] = [ + reps_list[cycle_number] + ] * self.num_gc_instances + + # now apply the shuffle + repetitions = [repetitions[index] for index in indices] + + # run the experiment and save shuffled measurements to files (avoids memory issues) + for start_idx in tqdm(range(initial_start_index, tot_num_circuits, batch_size)): + circuits_i = pickle.load( + open( + self.save_directory + f"/shuffled_circuits/{start_idx}.pickle", "rb" + ) + ) + if old_qubits is not None and new_qubits is not None: + print("Transforming qubits") + circuits_i = [ + circuit.transform_qubits(dict(zip(old_qubits, new_qubits))) + for circuit in circuits_i + ] + print( + f"Shots this batch: {sum(repetitions[start_idx: (start_idx + batch_size)])}" + ) + result = self.sampler.run_batch( + circuits_i, + repetitions=repetitions[start_idx: (start_idx + batch_size)], + ) + results_i = [res[0].measurements["m"].astype(bool) for res in result] + if not os.path.isdir(self.save_directory + f"/shuffled_results"): + os.mkdir(self.save_directory + f"/shuffled_results") + pickle.dump( + results_i, + open( + self.save_directory + f"/shuffled_results/{start_idx}.pickle", "wb" + ), + ) + + def load_shuffled_measurements(self): + """Loads the measurement results from files. + + First, you should have run `run_experiment_from_saved_circuits` or `run_experiment`. + """ + params = pickle.load( + open(self.save_directory + "/shuffled_circuits/params.pickle", "rb") + ) + indices = params["indices"] + inv_map = params["inv_map"] + self.readout_ideal_bitstrs = params["readout_ideal_bitstrs"] + batch_size = params["batch_size"] + num_random_bitstrings = len(self.readout_ideal_bitstrs) + zero_trotter_options = [False, True] if self.include_zero_trotter else [False] + tot_num_circuits = ( + len(self.cycles) * 2 * 2 * len(zero_trotter_options) * self.num_gc_instances + + num_random_bitstrings + ) + measurements_shuffled = [] + for start_idx in tqdm(range(0, tot_num_circuits, batch_size)): + measurements_shuffled += pickle.load( + open( + self.save_directory + f"/shuffled_results/{start_idx}.pickle", "rb" + ) + ) + + self.measurements = [measurements_shuffled[i] for i in inv_map] + + def extract_readout_error_rates(self) -> None: + """Calculates the readout error rates e0 and e1 from the readout + benchmarking measurements. The results are stored in the `self.e0` + (1 -> 0 error rate) and `self.e1` (0 -> 1 error rate) attributes. + + Returns: + None. + """ + ideal_bitstrs = self.readout_ideal_bitstrs + num_bitstrs = len(ideal_bitstrs) + readout_measurements = np.array(self.measurements[-num_bitstrs:]) + repetitions = len(readout_measurements[0]) + num_prep_0 = np.sum(1 - ideal_bitstrs, axis=0) + num_prep_1 = np.sum(ideal_bitstrs, axis=0) + self.e0 = np.einsum("ik,ijk->k", (1 - ideal_bitstrs), readout_measurements) / ( + num_prep_0 * repetitions + ) + self.e1 = np.einsum("ik,ijk->k", ideal_bitstrs, (1 - readout_measurements)) / ( + num_prep_1 * repetitions + ) + + return None + + def extract_measurements( + self, + basis_number: int, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + post_select: bool = False, + ) -> np.ndarray: + """Extract the portion of the measurement results corresponding to the requested data. + + Args: + basis_number: 0 for z-basis and 1 for x-basis. + initial_state: Either "superposition", "single_sector", or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + + Returns: + The measurement results. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) + if initial_state == "single_sector": + initial_state = "gauge_invariant" + init_state_number = ["gauge_invariant", "superposition"].index(initial_state) + zero_trotter_options = [False, True] if self.include_zero_trotter else [False] + zero_trotter_number = zero_trotter_options.index(zero_trotter) + + start_index = ( + init_state_number + * 2 + * len(zero_trotter_options) + * len(self.cycles) + * self.num_gc_instances + + basis_number + * len(zero_trotter_options) + * len(self.cycles) + * self.num_gc_instances + + zero_trotter_number * len(self.cycles) * self.num_gc_instances + + cycle_number * self.num_gc_instances + ) + + measurements = np.array( + self.measurements[start_index: (start_index + self.num_gc_instances)] + ) + repetitions = len(measurements[0]) + measurements = measurements.reshape( + repetitions * self.num_gc_instances, len(self.qubits) + ) + if post_select: + mask = np.all(measurements[:, self.matter_sites] == False, axis=1) + measurements = measurements[mask, :] + return measurements + + def extract_zzz_single_cycle( + self, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the interaction term (ZZZ in the LGT basis but implemented + here as Z_gauge in the dual basis). + + Args: + initial_state: Either "superposition", "single_sector", or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) + + if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: + self.extract_readout_error_rates() + + measurements = self.extract_measurements( + 0, + initial_state, + cycle_number, + zero_trotter, + post_select and tolerated_distance_to_error == np.inf, + ) + if post_select and tolerated_distance_to_error < np.inf: + raise NotImplementedError + else: + p1 = np.mean(measurements[:, self.gauge_sites], axis=0) + dp1 = np.sqrt(p1 * (1 - p1) / len(measurements)) + if not readout_mitigate: + x_dx = np.array([1 - 2 * p1, 2 * dp1]) # value, uncertainty + else: + x_raw = 1 - 2 * p1 + e0 = self.e0[self.gauge_sites] + e1 = self.e1[self.gauge_sites] + x_mitigated = (x_raw + e0 - e1) / ( + 1.0 - e0 - e1 + ) # see Eq. F3 of https://arxiv.org/pdf/2106.01264 + x_dx = np.array([x_mitigated, 2 * dp1 / (1.0 - e0 - e1)]) + return x_dx + + def extract_interaction( + self, + initial_state: str, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ): + """Get the expectation value of the interaction term (ZZZ in the LGT basis but implemented + here as Z_gauge in the dual basis) at all cycles. + + Args: + initial_state: Either "superposition", "single_sector", or "gauge_invariant" + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [cycle_number, value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if initial_state == "superposition" and post_select: + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) + + return np.array( + [ + self.extract_zzz_single_cycle( + initial_state, + cycle, + zero_trotter, + readout_mitigate, + post_select, + tolerated_distance_to_error, + ) + for cycle in tqdm(self.cycles, total=len(self.cycles)) + ] + ) + + def extract_gauge_x_single_cycle( + self, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the h term, which is X_gauge in both the LGT and dual bases. + + Args: + initial_state: Either "superposition", "single_sector", or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) + + if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: + self.extract_readout_error_rates() + + measurements = self.extract_measurements( + 1, + initial_state, + cycle_number, + zero_trotter, + post_select and tolerated_distance_to_error == np.inf, + ) + if post_select and tolerated_distance_to_error < np.inf: + raise NotImplementedError + else: + p1 = np.mean(measurements[:, self.gauge_sites], axis=0) + dp1 = np.sqrt(p1 * (1 - p1) / len(measurements)) + if not readout_mitigate: + x_dx = np.array([1 - 2 * p1, 2 * dp1]) # value, uncertainty + else: + x_raw = 1 - 2 * p1 + e0 = self.e0[self.gauge_sites] + e1 = self.e1[self.gauge_sites] + x_mitigated = (x_raw + e0 - e1) / ( + 1.0 - e0 - e1 + ) # see Eq. F3 of https://arxiv.org/pdf/2106.01264 + x_dx = np.array([x_mitigated, 2 * dp1 / (1.0 - e0 - e1)]) + return x_dx + + def extract_gauge_x( + self, + initial_state: str, + readout_mitigate: bool = True, + post_select: bool = False, + zero_trotter: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the h term, which is X_gauge in both the LGT and dual bases + at all cycle numbers. + + Args: + initial_state: Either "superposition", "single_sector", or "gauge_invariant" + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [cycle_number, value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if initial_state == "superposition" and post_select: + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) + + return np.array( + [ + self.extract_gauge_x_single_cycle( + initial_state, + cycle, + readout_mitigate=readout_mitigate, + post_select=post_select, + zero_trotter=zero_trotter, + tolerated_distance_to_error=tolerated_distance_to_error, + ) + for cycle in self.cycles + ] + ) + + def extract_matter_x_single_cycle( + self, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the mu term. + + This is X_matter in the LGT basis and a product of Xs on a matter site and the neighboring gauge sites in the dual basis. + + Args: + initial_state: Either "superposition", "single_sector", or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) + + if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: + self.extract_readout_error_rates() + + measurements = self.extract_measurements( + 1, + initial_state, + cycle_number, + zero_trotter, + post_select and tolerated_distance_to_error == np.inf, + ) + x = [] + dx = [] + e0 = deepcopy(self.e0) + e1 = deepcopy(self.e1) + if post_select and readout_mitigate: + for idx in self.matter_sites: + e0[idx] = 0.0 + e1[idx] = 0.0 + for q_idx in self.matter_sites: + qubits_i = [ + self.qubits[q_idx - 1], + self.qubits[q_idx], + self.qubits[(q_idx + 1) % len(self.qubits)], + ] + indices = np.array( + [self.qubits.index(cast(cirq.GridQubit, qi)) for qi in qubits_i] + ) + if post_select and tolerated_distance_to_error < np.inf: + raise NotImplementedError + else: + measurements_i = measurements[:, indices] + if readout_mitigate: + single_qubit_cmats = [] + qubits_to_mitigate = [] + + for i in indices: + e0_sq = e0[i] + e1_sq = e1[i] + single_qubit_cmats.append( + np.array([[1 - e0_sq, e1_sq], [e0_sq, 1 - e1_sq]]) + ) + qubits_to_mitigate.append(self.qubits[i]) + + tcm = cirq.experiments.TensoredConfusionMatrices( + single_qubit_cmats, + [[q] for q in qubits_to_mitigate], + repetitions=measurements_i.shape[0], + timestamp=0.0, + ) + + x_i, dx_i = tcm.readout_mitigation_pauli_uncorrelated( + qubits_to_mitigate, measurements_i + ) + else: + repetitions = measurements_i.shape[0] + p1 = np.mean(np.sum(measurements_i, axis=1) % 2) + dp1 = np.sqrt(p1 * (1 - p1) / (repetitions * self.num_gc_instances)) + x_i = 1 - 2 * p1 + dx_i = 2 * dp1 + x.append(x_i) + dx.append(dx_i) + return np.array([x, dx]) + + def extract_matter_x( + self, + initial_state: str, + readout_mitigate: bool = True, + post_select: bool = False, + zero_trotter: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the mu term at all cycles. + + This is X_matter in the LGT basis and a product of Xs on a matter site and the neighboring gauge sites in the dual basis. + + Args: + initial_state: Either "superposition", "single_sector", or "gauge_invariant" + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [cycle_number, value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if initial_state == "superposition" and post_select: + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) + + return np.array( + [ + self.extract_matter_x_single_cycle( + initial_state, + cycle, + readout_mitigate=readout_mitigate, + post_select=post_select, + zero_trotter=zero_trotter, + tolerated_distance_to_error=tolerated_distance_to_error, + ) + for cycle in self.cycles + ] + ) + + +class DFLExperiment2D(DFLExperiment): + """A class for performing the 2D DFL experiment (Fig 3 of the paper). + + Attrs: + sampler: The cirq sampler to use. + qubits: The grid of qubits. + origin_qubit: One of the matter qubits. + h: The coefficient on the gauge X term. + mu: The coefficient on the matter sigma_x term. + tau: The Trotter step size. + rng: The pseudorandom number generator to use for readout benchmarking. + cycles: The number of cycles for which to run the experiment. + num_gc_instances: The number of gauge compiling instances to use. + excited_qubits: Which qubits to excite. + use_cphase: Whether to use cphase gates. + include_zero_trotter: Whether to include circuits with the Trotter step set to 0. + """ + + def __init__( + self, + sampler: cirq.Sampler, + qubits: list[cirq.GridQubit], + origin_qubit: cirq.GridQubit, + save_directory: str, + h: float = 1.5, + mu: float = 3.5, + tau: float = 0.35, + rng: np.random.Generator = np.random.default_rng(), + cycles: np.ndarray = np.arange(31), + num_gc_instances: int = 40, + excited_qubits: list[cirq.GridQubit] = [cirq.GridQubit(0, 7)], + use_cphase: bool = True, + include_zero_trotter: bool = True, + ): + self.sampler = sampler + self.h = h + self.mu = mu + self.tau = tau + self.rng = rng + self.cycles = cycles + self.all_circuits: list[cirq.Circuit] = [] + self.e0 = np.array([]) + self.e1 = np.array([]) + self.num_gc_instances = num_gc_instances + self.executor = concurrent.futures.ProcessPoolExecutor() + self.readout_ideal_bitstrs = np.array([]) + self.save_directory = save_directory + self.lgtdfl = dfl_2d.LatticeGaugeTheoryDFL2D(qubits, origin_qubit, tau, h, mu) + self.lgtdfl_zero_trotter = dfl_2d.LatticeGaugeTheoryDFL2D(qubits, origin_qubit, 0.0, h, mu) + self.qubits = self.lgtdfl.all_qubits + self.excited_qubits = excited_qubits + self.use_cphase = use_cphase + self.origin_qubit = origin_qubit + self.include_zero_trotter = include_zero_trotter + self.matter_sites = np.array(self.lgtdfl._matter_indices()) + self.gauge_sites = np.array(self.lgtdfl._gauge_indices()) + + def save_to_file(self, filename: str): + """Save a dictionary containing parameters and measurements. + + Args: + filename: The filename to save to. + """ + + d_to_save = { + "measurements": self.measurements, + "readout_ideal_bitstrs": self.readout_ideal_bitstrs, + "h": self.h, + "mu": self.mu, + "tau": self.tau, + "cycles": self.cycles, + "num_gc_instances": self.num_gc_instances, + "e0": self.e0, + "e1": self.e1, + "qubits": self.qubits, + "save_directory": self.save_directory, + "excited_qubits": self.excited_qubits, + "use_cphase": self.use_cphase, + "origin_qubit": self.origin_qubit, + "include_zero_trotter": self.include_zero_trotter, + } + + pickle.dump(d_to_save, open(filename, "wb")) + + def load_from_file(self, filename: str): + """Load parameters and measurements from a dictionary. + + Args: + filename: The filename to load from. + """ + + d = pickle.load(open(filename, "rb")) + self.measurements = d["measurements"] + self.readout_ideal_bitstrs = d["readout_ideal_bitstrs"] + self.h = d["h"] + self.mu = d["mu"] + self.tau = d["tau"] + self.cycles = d["cycles"] + self.num_gc_instances = d["num_gc_instances"] + self.e0 = d["e0"] + self.e1 = d["e1"] + self.qubits = d["qubits"] + self.save_directory = d.get("save_directory", "temp") + self.excited_qubits = d["excited_qubits"] + self.use_cphase = d["use_cphase"] + self.origin_qubit = d["origin_qubit"] + self.include_zero_trotter = d.get("include_zero_trotter", True) + + def generate_circuit_instances( + self, + initial_state: str, + ncycles: int, + basis: str, + gauge_compile: bool = True, + dynamical_decouple: bool = True, + dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), + zero_trotter: bool = False, + qubits_to_fix_disorder: list[cirq.Qid] = [], + disorder_pattern: list[bool] = [], + ) -> list[cirq.Circuit]: + """Generate the circuit instances for a given initial state, number of cycles, and + measurement basis. + + Args: + initial_state: Which initial state, either "gauge_invariant" or "superposition". + ncycles: The number of Trotter steps (can be 0). + basis: The basis in which to measure. Either "x" or "z". + gauge_compile: Whether to apply gauge compiling. + dynamical_decouple: Whether to apply dynamical decoupling. + dd_sequence: The dynamical decoupling sequence to use. + zero_trotter: Whether to set the trotter step size to 0 (used to calibrate error mitigation). + qubits_to_fix_disorder: Qubits on which to fix a disorder pattern for the superposition initial state. + disorder_pattern: The disorder pattern to fix. + + Returns: + A list of the circuit instances. + + Raises: + ValueError: If initial_state, ncycles, or basis is not valid. + """ + + for q in qubits_to_fix_disorder: + assert ( + q in self.lgtdfl.matter_qubits + ), "qubits_to_fix_disorder must all be matter qubits" + + if initial_state not in ["gauge_invariant", "single_sector", "superposition"]: + raise ValueError( + "initial_state should be 'gauge_invariant' or 'superposition' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if ncycles < 0: + raise ValueError("ncycles must be nonnegative") + if basis not in ["z", "x"]: + raise ValueError("basis must be 'z' or 'x'") + + basis_index = ["z", "x"].index(basis) + if initial_state == "gauge_invariant": + initial_state = "single_sector" + + if zero_trotter: + lgtdfl = self.lgtdfl_zero_trotter + else: + lgtdfl = self.lgtdfl + + if initial_state == "single_sector": + initial_state_enum = InitialState.SINGLE_SECTOR + else: + initial_state_enum = InitialState.SUPERPOSITION + circuit = lgtdfl.get_2d_dfl_experiment_circuits( + initial_state_enum, + [ncycles], + excited_qubits=self.excited_qubits, + n_instances=1, + two_qubit_gate= + TwoQubitGate.CPHASE if self.use_cphase else TwoQubitGate.CZ, + )[basis_index] + new_moment_0 = cirq.Moment( + list(circuit[0].operations) + + [ + cirq.Ry(rads=(1 - 2 * disorder) * np.pi / 2)(q) + for q, disorder in zip(qubits_to_fix_disorder, disorder_pattern) + ] + ) + circuit = cirq.Circuit(new_moment_0) + circuit[1:] + + if gauge_compile: + circuits = list( + self.executor.map( + partial(_apply_gauge_compiling, circuit=circuit), + range(self.num_gc_instances), + ) + ) + else: + circuits = [circuit] + + if dynamical_decouple: + circuits_dd = list( + self.executor.map( + partial(cirq.add_dynamical_decoupling, schema=dd_sequence), circuits + ) + ) + circuits = circuits_dd + + return circuits + + def post_select_measurements_on_distance( + self, + measurements: np.ndarray, + tolerated_distance_to_error: int | float, + operator_qubits: list[cirq.GridQubit], + ) -> np.ndarray: + """Return the subset of measurements where the errors occur at least a given distance from + the operator qubits. + + Args: + measurements: The non-post-selected measurements. + tolerated_distance_to_error: Allow errors greater than this distance from operator_qubits. + operator_qubits: The qubits on which we are measuring an operator. + + Returns: + The post-selected measurements. + """ + matter_qubits = self.lgtdfl.matter_qubits + matter_indices = np.array(self.lgtdfl._matter_indices()) + distances = np.array( + [ + min(_distance(q_operator, qubit) for q_operator in operator_qubits) + for qubit in matter_qubits + ] + ) + errors = measurements[:, matter_indices] + distance_to_error = np.min(np.where(errors, distances, np.inf), axis=1) + return measurements[distance_to_error >= tolerated_distance_to_error] + + def extract_zzz_single_cycle( + self, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the interaction term (ZZZ in the LGT basis but implemented + here as Z_gauge in the dual basis). + + Args: + initial_state: Either "superposition", "single_sector", or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) + + if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: + self.extract_readout_error_rates() + + measurements = self.extract_measurements( + 0, + initial_state, + cycle_number, + zero_trotter, + post_select and tolerated_distance_to_error == np.inf, + ) + if post_select and tolerated_distance_to_error < np.inf: + p1 = np.zeros(len(self.lgtdfl.gauge_qubits)) + dp1 = np.zeros(len(self.lgtdfl.gauge_qubits)) + for idx, (op_qubit, op_index) in enumerate( + zip(self.lgtdfl.gauge_qubits, self.lgtdfl._gauge_indices()) + ): + measurements_q = self.post_select_measurements_on_distance( + measurements, tolerated_distance_to_error, [op_qubit] + )[:, op_index] + print( + f"Interaction term, cycle {cycle_number}, qubit {op_qubit}, {len(measurements_q)} counts of {len(measurements)}" + ) + if os.path.isfile("dfl_surviving_counts.pickle"): + log_dict = pickle.load(open("dfl_surviving_counts.pickle", "rb")) + else: + log_dict = { + "gauge_x": [{} for _ in range(31)], + "matter_x": [{} for _ in range(31)], + "interaction": [{} for _ in range(31)], + "gauge_x_zero_trotter": [{} for _ in range(31)], + "matter_x_zero_trotter": [{} for _ in range(31)], + "interaction_zero_trotter": [{} for _ in range(31)], + "total_measurements": [0 for _ in range(31)], + } + key = "interaction" + if zero_trotter: + key += "_zero_trotter" + log_dict[key][cycle_number][op_qubit] = len(measurements_q) + log_dict["total_measurements"][cycle_number] = len(measurements) + pickle.dump(log_dict, open("dfl_surviving_counts.pickle", "wb")) + + if len(measurements_q) == 0: + p1[idx] = np.nan + dp1[idx] = np.nan + else: + p1[idx] = np.mean(measurements_q) + dp1[idx] = np.sqrt(p1[idx] * (1 - p1[idx]) / len(measurements_q)) + else: + p1 = np.mean(measurements[:, self.lgtdfl._gauge_indices()], axis=0) + dp1 = np.sqrt(p1 * (1 - p1) / len(measurements)) + if not readout_mitigate: + x_dx = np.array([1 - 2 * p1, 2 * dp1]) # value, uncertainty + else: + x_raw = 1 - 2 * p1 + e0 = self.e0[self.lgtdfl._gauge_indices()] + e1 = self.e1[self.lgtdfl._gauge_indices()] + x_mitigated = (x_raw + e0 - e1) / ( + 1.0 - e0 - e1 + ) # see Eq. F3 of https://arxiv.org/pdf/2106.01264 + x_dx = np.array([x_mitigated, 2 * dp1 / (1.0 - e0 - e1)]) + return x_dx + + def extract_gauge_x_single_cycle( + self, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the h term, which is X_gauge in both the LGT and dual bases. + + Args: + initial_state: Either "superposition", "single_sector", or "gauge_invariant". + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) + + if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: + self.extract_readout_error_rates() + + measurements = self.extract_measurements( + 1, + initial_state, + cycle_number, + zero_trotter, + post_select and tolerated_distance_to_error == np.inf, + ) + if post_select and tolerated_distance_to_error < np.inf: + p1 = np.zeros(len(self.lgtdfl.gauge_qubits)) + dp1 = np.zeros(len(self.lgtdfl.gauge_qubits)) + for idx, (op_qubit, op_index) in enumerate( + zip(self.lgtdfl.gauge_qubits, self.lgtdfl._gauge_indices()) + ): + measurements_q = self.post_select_measurements_on_distance( + measurements, tolerated_distance_to_error, [op_qubit] + )[:, op_index] + print( + f"Gauge X term, cycle {cycle_number}, qubit {op_qubit}, {len(measurements_q)} counts of {len(measurements)}" + ) + + if os.path.isfile("dfl_surviving_counts.pickle"): + log_dict = pickle.load(open("dfl_surviving_counts.pickle", "rb")) + else: + log_dict = { + "gauge_x": [{} for _ in range(31)], + "matter_x": [{} for _ in range(31)], + "interaction": [{} for _ in range(31)], + "gauge_x_zero_trotter": [{} for _ in range(31)], + "matter_x_zero_trotter": [{} for _ in range(31)], + "interaction_zero_trotter": [{} for _ in range(31)], + "total_measurements": [0 for _ in range(31)], + } + key = "gauge_x" + if zero_trotter: + key += "_zero_trotter" + log_dict[key][cycle_number][op_qubit] = len(measurements_q) + log_dict["total_measurements"][cycle_number] = len(measurements) + pickle.dump(log_dict, open("dfl_surviving_counts.pickle", "wb")) + + if len(measurements_q) == 0: + p1[idx] = np.nan + dp1[idx] = np.nan + else: + p1[idx] = np.mean(measurements_q) + dp1[idx] = np.sqrt(p1[idx] * (1 - p1[idx]) / len(measurements_q)) + else: + p1 = np.mean(measurements[:, self.lgtdfl._gauge_indices()], axis=0) + dp1 = np.sqrt(p1 * (1 - p1) / len(measurements)) + if not readout_mitigate: + x_dx = np.array([1 - 2 * p1, 2 * dp1]) # value, uncertainty + else: + x_raw = 1 - 2 * p1 + e0 = self.e0[self.lgtdfl._gauge_indices()] + e1 = self.e1[self.lgtdfl._gauge_indices()] + x_mitigated = (x_raw + e0 - e1) / ( + 1.0 - e0 - e1 + ) # see Eq. F3 of https://arxiv.org/pdf/2106.01264 + x_dx = np.array([x_mitigated, 2 * dp1 / (1.0 - e0 - e1)]) + return x_dx + + def extract_matter_x_single_cycle( + self, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the mu term. + + This is X_matter in the LGT basis and a product of Xs on a matter site and the neighboring gauge sites in the dual basis. + + Args: + initial_state: Either "superposition", "single_sector", or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) + + if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: + self.extract_readout_error_rates() + + measurements = self.extract_measurements( + 1, + initial_state, + cycle_number, + zero_trotter, + post_select and tolerated_distance_to_error == np.inf, + ) + x = [] + dx = [] + e0 = deepcopy(self.e0) + e1 = deepcopy(self.e1) + if post_select and readout_mitigate: + for idx in self.lgtdfl._matter_indices(): + e0[idx] = 0.0 + e1[idx] = 0.0 + for q in self.lgtdfl.matter_qubits: + qubits_i = [ + q_n for q_n in q.neighbors() if q_n in self.lgtdfl.all_qubits + ] + [q] + indices = np.array( + [ + self.lgtdfl.all_qubits.index(cast(cirq.GridQubit, qi)) + for qi in qubits_i + ] + ) + if post_select and tolerated_distance_to_error < np.inf: + measurements_i = self.post_select_measurements_on_distance( + measurements, + tolerated_distance_to_error, + cast(list[cirq.GridQubit], qubits_i), + )[:, indices] + print( + f"Matter X term, cycle {cycle_number}, qubit {q}, {len(measurements_i)} counts of {len(measurements)}" + ) + + if os.path.isfile("dfl_surviving_counts.pickle"): + log_dict = pickle.load(open("dfl_surviving_counts.pickle", "rb")) + else: + log_dict = { + "gauge_x": [{} for _ in range(31)], + "matter_x": [{} for _ in range(31)], + "interaction": [{} for _ in range(31)], + "gauge_x_zero_trotter": [{} for _ in range(31)], + "matter_x_zero_trotter": [{} for _ in range(31)], + "interaction_zero_trotter": [{} for _ in range(31)], + "total_measurements": [0 for _ in range(31)], + } + key = "matter_x" + if zero_trotter: + key += "_zero_trotter" + log_dict[key][cycle_number][q] = len(measurements_i) + log_dict["total_measurements"][cycle_number] = len(measurements) + pickle.dump(log_dict, open("dfl_surviving_counts.pickle", "wb")) + + else: + measurements_i = measurements[:, indices] + if readout_mitigate: + single_qubit_cmats = [] + qubits_to_mitigate = [] + + for i in indices: + e0_sq = e0[i] + e1_sq = e1[i] + single_qubit_cmats.append( + np.array([[1 - e0_sq, e1_sq], [e0_sq, 1 - e1_sq]]) + ) + qubits_to_mitigate.append(self.qubits[i]) + + tcm = cirq.experiments.TensoredConfusionMatrices( + single_qubit_cmats, + [[q] for q in qubits_to_mitigate], + repetitions=measurements_i.shape[0], + timestamp=0.0, + ) + + x_i, dx_i = tcm.readout_mitigation_pauli_uncorrelated( + qubits_to_mitigate, measurements_i + ) + else: + repetitions = measurements_i.shape[0] + p1 = np.mean(np.sum(measurements_i, axis=1) % 2) + dp1 = np.sqrt(p1 * (1 - p1) / (repetitions * self.num_gc_instances)) + x_i = 1 - 2 * p1 + dx_i = 2 * dp1 + x.append(x_i) + dx.append(dx_i) + return np.array([x, dx]) diff --git a/recirq/dfl/dfl_experiment_test_manual.py b/recirq/dfl/dfl_experiment_test_manual.py new file mode 100644 index 00000000..8d67fa11 --- /dev/null +++ b/recirq/dfl/dfl_experiment_test_manual.py @@ -0,0 +1,201 @@ +# Copyright 2025 Google +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import cirq +import numpy as np + +from recirq.dfl import dfl_experiment as dfl + +def _create_test_dir(directory: str = "test_circuits"): + """Create the test directory if it doesn't exist.""" + if not os.path.isdir(directory): + os.mkdir(directory) + + +def test_dfl_experiment_3x3(): + """Performs a comprehensive integration test of the 2D DFL experiment setup. + + Note: This test is designed to be run manually as it is time-consuming. + """ + _create_test_dir() + + qubits = cirq.GridQubit.rect(3, 3, 0, 0) + sampler = cirq.Simulator() + for num_gc_instances in [1, 2]: + for use_cphase in [False, True]: + dfl_expt = dfl.DFLExperiment2D( + sampler, + qubits, + origin_qubit=cirq.q(0, 0), + save_directory="test_circuits", + num_gc_instances=num_gc_instances, + excited_qubits=[cirq.q(0, 1)], + cycles=np.array([0, 1]), + use_cphase=use_cphase, + ) + for gauge_compile in [False, True]: + for dynamical_decouple in [False, True]: + dfl_expt.run_experiment( + gauge_compile=gauge_compile, + dynamical_decouple=dynamical_decouple, + repetitions_post_selection=np.ones( + len(dfl_expt.cycles), dtype=int + ) + * 10_000, + ) + + # check readout error rates + assert np.all(dfl_expt.e0 == np.zeros(len(dfl_expt.qubits))) + assert np.all(dfl_expt.e1 == np.zeros(len(dfl_expt.qubits))) + + # check gauge_x + signs = np.ones(len(dfl_expt.lgtdfl._gauge_indices())) + signs[0] = -1 + gauge_expected = dfl_expt.h / np.sqrt(dfl_expt.h ** 2 + 1) * signs + + for readout_mitigate in [False, True]: + for initial_state in ["gauge_invariant", "superposition"]: + for zero_trotter in [False, True]: + for post_select in [False, True]: + if post_select and initial_state == "superposition": + continue + else: + gauge_x = dfl_expt.extract_gauge_x( + initial_state, + readout_mitigate=readout_mitigate, + post_select=post_select, + zero_trotter=zero_trotter, + ) + assert np.all( + np.isclose( + gauge_x[0, 0], + gauge_expected, + atol=5 * gauge_x[0, 1], + ) + ) + if zero_trotter: + assert np.all( + np.isclose( + gauge_x[0, 0], + gauge_x[1, 0], + atol=5 + * np.sqrt( + gauge_x[0, 1] ** 2 + + gauge_x[1, 1] ** 2 + ), + ) + ) + + # check matter_x + matter_expected = [] + for q_matter in dfl_expt.lgtdfl.matter_qubits: + neighbor_indices = np.array( + [ + list(dfl_expt.lgtdfl.gauge_qubits).index(q) + for q in q_matter.neighbors() + if q in dfl_expt.qubits + ] + ) + matter_expected.append( + np.prod(gauge_expected[neighbor_indices]) + ) + + for readout_mitigate in [False, True]: + for post_select in [False, True]: + for zero_trotter in [False, True]: + matter_x = dfl_expt.extract_matter_x( + "gauge_invariant", + readout_mitigate=readout_mitigate, + post_select=post_select, + zero_trotter=zero_trotter, + ) + assert np.all( + np.isclose( + matter_x[0, 0], + matter_expected, + atol=5 * matter_x[0, 1], + ) + ) + if zero_trotter: + assert np.all( + np.isclose( + matter_x[0, 0], + matter_x[1, 0], + atol=5 + * np.sqrt( + matter_x[0, 1] ** 2 + + matter_x[1, 1] ** 2 + ), + ) + ) + + for readout_mitigate in [False, True]: + for zero_trotter in [False, True]: + matter_x = dfl_expt.extract_matter_x( + "superposition", + readout_mitigate=readout_mitigate, + post_select=False, + zero_trotter=zero_trotter, + ) + assert np.all( + np.isclose(matter_x[0, 0], 0.0, atol=5 * matter_x[0, 1]) + ) + if zero_trotter: + assert np.all( + np.isclose( + matter_x[0, 0], + matter_x[1, 0], + atol=5 + * np.sqrt( + matter_x[0, 1] ** 2 + matter_x[1, 1] ** 2 + ), + ) + ) + + # check interaction + interaction_expected = 1 / np.sqrt(dfl_expt.h ** 2 + 1) * signs + for readout_mitigate in [False, True]: + for initial_state in ["gauge_invariant", "superposition"]: + for zero_trotter in [False, True]: + for post_select in [False, True]: + if post_select and initial_state == "superposition": + continue + else: + interaction = dfl_expt.extract_interaction( + initial_state, + readout_mitigate=readout_mitigate, + post_select=post_select, + zero_trotter=zero_trotter, + ) + assert np.all( + np.isclose( + interaction[0, 0], + interaction_expected, + atol=5 * interaction[0, 1], + ) + ) + if zero_trotter: + assert np.all( + np.isclose( + interaction[0, 0], + interaction[1, 0], + atol=5 + * np.sqrt( + interaction[0, 1] ** 2 + + interaction[1, 1] ** 2 + ), + ) + )