diff --git a/ann_benchmarks/algorithms/pase/Dockerfile b/ann_benchmarks/algorithms/pase/Dockerfile new file mode 100644 index 000000000..4b7fdd9d6 --- /dev/null +++ b/ann_benchmarks/algorithms/pase/Dockerfile @@ -0,0 +1,109 @@ +FROM ann-benchmarks + +# Set timezone to prevent interactive prompts +ENV TZ=Etc/UTC +RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone + +# Install PostgreSQL build dependencies +RUN DEBIAN_FRONTEND=noninteractive apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + libreadline-dev \ + zlib1g-dev \ + flex \ + bison \ + libxml2-dev \ + libxslt-dev \ + libssl-dev \ + libxml2-utils \ + xsltproc \ + ccache \ + wget \ + git \ + && rm -rf /var/lib/apt/lists/* + +# Download and build PostgreSQL 11.0 +WORKDIR /tmp +RUN wget https://ftp.postgresql.org/pub/source/v11.0/postgresql-11.0.tar.gz && \ + tar -zxvf postgresql-11.0.tar.gz && \ + cd postgresql-11.0 && \ + ./configure --prefix=/usr/local/pgsql CFLAGS="-O3" LDFLAGS="-fPIC -fopenmp" && \ + make -j$(nproc) && \ + make install + +# Clone VecDB and copy PASE files +RUN git clone https://github.com/YunanZzz/VecDB-ICDE24.git /vecdb && \ + mkdir -p /tmp/postgresql-11.0/contrib/pase && \ + cp -r /vecdb/postgresql-11.0/contrib/pase/* /tmp/postgresql-11.0/contrib/pase/ + +# Build and install PASE +WORKDIR /tmp/postgresql-11.0/contrib/pase +RUN PG_CONFIG=/usr/local/pgsql/bin/pg_config make USE_PGXS=1 && \ + make install + +# Create postgres user and set up directories with proper permissions +RUN useradd postgres && \ + mkdir -p /tmp /var/run/postgresql /usr/local/pgsql/data /usr/local/pgsql/logs && \ + chown -R postgres:postgres /tmp && \ + chown -R postgres:postgres /var/run/postgresql && \ + chown -R postgres:postgres /usr/local/pgsql && \ + chmod 0700 /usr/local/pgsql/data && \ + chmod 0700 /usr/local/pgsql/logs && \ + su postgres -c '/usr/local/pgsql/bin/initdb -D /usr/local/pgsql/data' + +# Configure PostgreSQL +USER postgres +RUN echo "unix_socket_directories = '/tmp'" >> /usr/local/pgsql/data/postgresql.conf && \ + echo "shared_buffers = 4GB" >> /usr/local/pgsql/data/postgresql.conf && \ + echo "listen_addresses = '*'" >> /usr/local/pgsql/data/postgresql.conf && \ + echo "local all all trust" > /usr/local/pgsql/data/pg_hba.conf && \ + echo "host all all 127.0.0.1/32 trust" >> /usr/local/pgsql/data/pg_hba.conf && \ + echo "host all all ::1/128 trust" >> /usr/local/pgsql/data/pg_hba.conf && \ + echo "host all all 0.0.0.0/0 trust" >> /usr/local/pgsql/data/pg_hba.conf && \ + /usr/local/pgsql/bin/pg_ctl start -D /usr/local/pgsql/data -l /usr/local/pgsql/logs/logfile -w && \ + /usr/local/pgsql/bin/createuser -s ann && \ + /usr/local/pgsql/bin/createdb -O ann ann && \ + /usr/local/pgsql/bin/pg_ctl stop -D /usr/local/pgsql/data + +USER root +# Expose PostgreSQL port + +EXPOSE 5432 + +# Create logging configuration +COPY <<-'EOF' /home/app/logging.conf +[loggers] +keys=root + +[handlers] +keys=consoleHandler + +[formatters] +keys=simpleFormatter + +[logger_root] +level=INFO +handlers=consoleHandler + +[handler_consoleHandler] +class=StreamHandler +level=INFO +formatter=simpleFormatter +args=(sys.stdout,) + +[formatter_simpleFormatter] +format=%(asctime)s - %(name)s - %(levelname)s - %(message)s +datefmt= +EOF + +# Add PostgreSQL binaries to PATH +ENV PATH=/usr/local/pgsql/bin:$PATH + +# Install Python PostgreSQL adapter and vector support +RUN pip install psycopg[binary] + +WORKDIR /home/app + +# Set permissions for app directory after copying +RUN mkdir -p /home/app/ann_benchmarks/algorithms/pase && \ + chown -R postgres:postgres /home/app && \ + chmod -R 0700 /home/app \ No newline at end of file diff --git a/ann_benchmarks/algorithms/pase/config.yml b/ann_benchmarks/algorithms/pase/config.yml new file mode 100644 index 000000000..152505ae8 --- /dev/null +++ b/ann_benchmarks/algorithms/pase/config.yml @@ -0,0 +1,22 @@ +base: + args: [] + docker_tag: ann-benchmarks-pase + module: ann_benchmarks.algorithms.pase.module + +float: + any: + - name: pase + docker_tag: ann-benchmarks-pase + module: ann_benchmarks.algorithms.pase + constructor: PASE + base_args: ['@metric'] + disabled: false + run_groups: + M-5: + arg_groups: [{ M: 5, efConstruction: 32 }] + args: { } + query_args: [[10, 20, 40, 80]] + M-8: + arg_groups: [{ M: 8, efConstruction: 64 }] + args: { } + query_args: [[ 10, 20, 40, 80]] \ No newline at end of file diff --git a/ann_benchmarks/algorithms/pase/module.py b/ann_benchmarks/algorithms/pase/module.py new file mode 100644 index 000000000..218a513a3 --- /dev/null +++ b/ann_benchmarks/algorithms/pase/module.py @@ -0,0 +1,279 @@ +""" +This module supports connecting to a PostgreSQL instance and performing vector +indexing and search using the PASE extension. The default behavior uses +the "ann" value of PostgreSQL user name, password, and database name, as well +as the default host and port values of the psycopg driver. + +If PostgreSQL is managed externally, e.g. in a cloud DBaaS environment, the +environment variable overrides listed below are available for setting PostgreSQL +connection parameters: + +ANN_BENCHMARKS_PG_USER +ANN_BENCHMARKS_PG_PASSWORD +ANN_BENCHMARKS_PG_DBNAME +ANN_BENCHMARKS_PG_HOST +ANN_BENCHMARKS_PG_PORT + +This module starts the PostgreSQL service automatically using the "service" +command. The environment variable ANN_BENCHMARKS_PG_START_SERVICE could be set +to "false" (or e.g. "0" or "no") in order to disable this behavior. + +This module will also attempt to create the PASE extension inside the +target database, if it has not been already created. +""" + +import subprocess +import sys +import os +import time +import numpy as np + +import psycopg + +from typing import Dict, Any, Optional, List + +from ..base.module import BaseANN +from ...util import get_bool_env_var +from psycopg.types import array + + +def get_pg_param_env_var_name(pg_param_name: str) -> str: + """ + Generate the environment variable name for PostgreSQL connection parameters. + + Args: + pg_param_name (str): The name of the PostgreSQL parameter. + + Returns: + str: The corresponding environment variable name. + """ + return f'ANN_BENCHMARKS_PG_{pg_param_name.upper()}' + + +def get_pg_conn_param( + pg_param_name: str, + default_value: Optional[str] = None) -> Optional[str]: + """ + Retrieve PostgreSQL connection parameter from environment variable. + + Args: + pg_param_name (str): The name of the PostgreSQL parameter. + default_value (Optional[str], optional): Default value if not set. Defaults to None. + + Returns: + Optional[str]: The parameter value from environment or default. + """ + env_var_name = get_pg_param_env_var_name(pg_param_name) + env_var_value = os.getenv(env_var_name, default_value) + if env_var_value is None or len(env_var_value.strip()) == 0: + return default_value + return env_var_value + + +class PASE(BaseANN): + """ + PASE (Parallel Approximate Similarity Search Extension) implementation + for Approximate Nearest Neighbor benchmarking. + """ + + def __init__(self, metric: str, method_param: Dict[str, Any]): + """ + Initialize PASE algorithm parameters. + + Args: + metric (str): Distance metric to use ('angular' or 'euclidean'). + method_param (Dict[str, Any]): Method-specific parameters. + """ + self._metric = metric + self._m = method_param['M'] + self._ef_construction = method_param['efConstruction'] + self._ef_search = method_param.get('efSearch', 40) # Default to 40 if not specified + self._cur = None + + def ensure_pase_extension_created(self, conn: psycopg.Connection) -> None: + """ + Ensure that the PASE extension is created in the database. + + Args: + conn (psycopg.Connection): PostgreSQL database connection. + """ + with conn.cursor() as cur: + # Check if the extension exists + cur.execute( + "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pase')") + pase_exists = cur.fetchone()[0] + + if pase_exists: + print("PASE extension already exists") + else: + print("PASE extension does not exist, creating") + cur.execute("CREATE EXTENSION pase") + conn.commit() + print("Successfully created PASE extension") + + def fit(self, X): + """ + Fit the PASE index with the given dataset. + + Args: + X (Any): The input dataset of vectors. + """ + # Prepare connection parameters with defaults + psycopg_connect_kwargs: Dict[str, Any] = dict( + autocommit=True, + ) + for arg_name in ['user', 'password', 'dbname']: + # The default value is "ann" for all of these parameters. + psycopg_connect_kwargs[arg_name] = get_pg_conn_param( + arg_name, 'ann') + + # Always use /tmp for the socket directory + psycopg_connect_kwargs['host'] = '/tmp' + + pg_port_str: Optional[str] = get_pg_conn_param('port') + if pg_port_str is not None: + psycopg_connect_kwargs['port'] = int(pg_port_str) + + # Decide whether to start the PostgreSQL service + should_start_service = get_bool_env_var( + get_pg_param_env_var_name('start_service'), + default_value=True) + if should_start_service: + # Start PostgreSQL using pg_ctl as postgres user + try: + # First check if PostgreSQL is already running + status = subprocess.run( + "su postgres -c '/usr/local/pgsql/bin/pg_ctl status -D /usr/local/pgsql/data'", + shell=True, + capture_output=True + ) + + if status.returncode != 0: # Not running, so start it + print("Starting PostgreSQL server...") + subprocess.run( + "su postgres -c '/usr/local/pgsql/bin/pg_ctl start -D /usr/local/pgsql/data -l /usr/local/pgsql/logs/logfile -w'", + shell=True, + check=True, + stdout=sys.stdout, + stderr=sys.stderr + ) + # Wait a bit for the server to be ready + time.sleep(2) + else: + print("PostgreSQL server is already running") + except subprocess.CalledProcessError as e: + print(f"Error starting PostgreSQL: {e}") + raise + else: + print( + "Assuming that PostgreSQL service is managed externally. " + "Not attempting to start the service.") + + # Establish connection and create PASE extension + conn = psycopg.connect(**psycopg_connect_kwargs) + self.ensure_pase_extension_created(conn) + + # Register vector type and create cursor + array.register_default_adapters(conn.adapters) + cur = conn.cursor() + + print("X shape:", X.shape[1], X.shape) + # Prepare table and index + cur.execute("DROP TABLE IF EXISTS items") + cur.execute(f"CREATE TABLE items (id int, embedding float4[%d])" % X.shape[1]) + cur.execute("ALTER TABLE items ALTER COLUMN embedding SET STORAGE PLAIN") + + print("Copying data...") + # Use a different approach to insert data in batches + # This avoids the numpy array boolean evaluation issue + batch_size = 1000 # Process in smaller batches + print("Copying data using COPY command...") + total_rows = len(X) + + # Use COPY for faster data loading + with cur.copy("COPY items (id, embedding) FROM STDIN WITH (FORMAT binary)") as copy: + copy.set_types(["int4", "float4[]"]) + for i, embedding in enumerate(X): + # Convert NumPy array to list if needed + embedding_list = embedding.tolist() if isinstance(embedding, np.ndarray) else embedding + copy.write_row((i, embedding_list)) + + # Print progress every 10000 rows + if i > 0 and i % 10000 == 0: + print(f"Copied {i}/{total_rows} vectors...") + + print("Creating index...") + + # Note: PASE uses slightly different index creation syntax + if self._metric == "angular": + cur.execute( + "CREATE INDEX pase_hnsw_idx ON items USING pase_hnsw(embedding) WITH (dim = %d, base_nb_num = %d, ef_build = %d)" + % (X.shape[1], self._m, self._ef_construction) + ) + elif self._metric == "euclidean": + cur.execute( + "CREATE INDEX pase_hnsw_idx ON items USING pase_hnsw(embedding) WITH (dim = %d, base_nb_num = %d, ef_build = %d)" + % (X.shape[1], self._m, self._ef_construction) + ) + + print("Indexing complete!") + self._cur = cur + + def set_query_arguments(self, ef_search: int) -> None: + """ + Set query-time search parameters. + + Args: + ef_search (int): Effective search parameter. + """ + self._ef_search = ef_search + + def query(self, v: Any, n: int) -> List[int]: + """ + Perform nearest neighbor search. + + Args: + v (Any): Query vector. + n (int): Number of neighbors to retrieve. + + Returns: + List[int]: List of neighbor IDs. + """ + # Convert NumPy array to list if needed + if isinstance(v, np.ndarray): + v = v.tolist() + # Cast the vector to float4[] explicitly to match the column type + self._cur.execute(f"SET hnsw.ef_search = {self._ef_search}") + # Force PostgreSQL to use the index by setting costs + self._cur.execute("SET enable_seqscan = off") + + if self._metric == "angular": + query = """SELECT id FROM items ORDER BY embedding pase(ARRAY[%s]::float4[],0,1) LIMIT %s""" + elif self._metric == "euclidean": + query = """SELECT id FROM items ORDER BY embedding pase(ARRAY[%s]::float4[],0,0) LIMIT %s""" + else: + raise RuntimeError(f"unknown metric {self._metric}") + + self._cur.execute(query, (v, n), binary=True, prepare=True) + return [id for id, in self._cur.fetchall()] + + def get_memory_usage(self) -> float: + """ + Get memory usage of the index. + + Returns: + float: Memory usage in KB. + """ + if self._cur is None: + return 0 + self._cur.execute("SELECT pg_relation_size('pase_hnsw_idx')") + return self._cur.fetchone()[0] / 1024 + + def __str__(self) -> str: + """ + String representation of the PASE configuration. + + Returns: + str: Formatted configuration string. + """ + return f"PASE(m={self._m}, ef_construction={self._ef_construction}, ef_search={self._ef_search})" \ No newline at end of file