Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ name = "pypi"
cryptography = "==37.0.4"
crcmod = "==1.7"
future = "==0.17.1"
protobuf = "==4.23.4"
protobuf = "==6.33.0"
psutil = "==5.9.4"
google-cloud-ndb = "==2.3.4"

[dev-packages]
Fabric = "==1.14.1"
grpcio-tools = "==1.62.2"
grpcio-tools = "==1.75.1"
gunicorn = "*"
isort = "*"
mypy-protobuf = "==3.4.0"
nodeenv = "==1.9.1"
parameterized = "==0.9.0"
paramiko = "==2.6.0"
pipenv = "==2022.8.5"
pipenv = "==2025.0.4"
pyfakefs = "==4.4.0"
pylint = "~=2.4"
pylint-protobuf = "==0.21.0"
Expand All @@ -30,3 +30,9 @@ selenium = "==3.141.0"
twine = "*"
WebTest = "==3.0.0"
yapf = "==0.22.0"
google-api-python-client = "==2.185.0"
aiohttp = "==3.13.1"
google-cloud-secret-manager = "==2.25.0"
python-dateutil = "==2.9.0.post0"
jira = "==3.10.5"
oauth2client = "==4.1.3"
2,029 changes: 1,508 additions & 521 deletions Pipfile.lock

Large diffs are not rendered by default.

101 changes: 101 additions & 0 deletions src/clusterfuzz/_internal/tests/core/grpc/server_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the gRPC server, now with expanded coverage."""

import unittest
from concurrent import futures
from unittest import mock
import grpc
import os

# Mock the problematic module before it's imported by the server.
mock_reproduce_logic = mock.MagicMock()
with mock.patch.dict('sys.modules', {'clusterfuzz.grpc.reproduce_logic': mock_reproduce_logic}):
from clusterfuzz.grpc import reproduce_pb2
from clusterfuzz.grpc import reproduce_pb2_grpc
from clusterfuzz.grpc.auth import ApiKeyInterceptor
from clusterfuzz.grpc.server import ReproduceServiceImpl

_VALID_API_KEY = "test-key-1"

@mock.patch('clusterfuzz.grpc.server.JobStore')
@mock.patch('dbm.open', new_callable=mock.mock_open)
@mock.patch.dict('os.environ', {'VALID_API_KEYS': _VALID_API_KEY})
class GrpcServerTest(unittest.TestCase):
"""Expanded tests for the gRPC server."""

def setUp(self):
"""Sets up the test server and client."""
# Manually start the patchers that were causing issues as decorators.
self.job_store_patcher = mock.patch('clusterfuzz.grpc.server.JobStore')
self.mock_job_store_class = self.job_store_patcher.start()

self._server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
interceptors=[ApiKeyInterceptor()])

self.service_impl = ReproduceServiceImpl()
reproduce_pb2_grpc.add_ReproduceServiceServicer_to_server(
self.service_impl, self._server)

self.port = self._server.add_insecure_port('[::]:0')
self._server.start()
self.channel = grpc.insecure_channel(f'localhost:{self.port}')
self.stub = reproduce_pb2_grpc.ReproduceServiceStub(self.channel)

# Get the instance of the mock from the service.
self.mock_job_store = self.service_impl._job_store

def tearDown(self):
"""Stops the server and the patchers."""
self.job_store_patcher.stop()
self.channel.close()
self._server.stop(0)

def test_start_job_success(self):
"""Tests starting a job with a valid API key."""
mock_job = reproduce_pb2.Job(job_id="new-job-id", status=reproduce_pb2.JOB_STATUS_QUEUED)
self.mock_job_store.create_job.return_value = mock_job

metadata = [('x-api-key', _VALID_API_KEY)]
request = reproduce_pb2.StartReproductionJobRequest(testcase_ids=[123], image_tag="latest")

with mock.patch.object(self.service_impl._job_runner, 'start_job') as mock_start_runner:
response = self.stub.StartReproductionJob(request, metadata=metadata)
self.assertEqual(response.job_id, "new-job-id")
self.assertEqual(response.status, reproduce_pb2.JOB_STATUS_QUEUED)
mock_start_runner.assert_called_once_with(mock_job)

def test_get_job_results_success(self):
"""Tests getting results for a completed job."""
job_id = "completed-job"
metadata_dict = {
'job': reproduce_pb2.Job(job_id=job_id, status=reproduce_pb2.JOB_STATUS_COMPLETED),
'testcase_statuses': {
'101': reproduce_pb2.TEST_CASE_STATUS_REPRODUCED,
'102': reproduce_pb2.TEST_CASE_STATUS_FAILED_REPRODUCTION,
}
}
self.mock_job_store.get_job_metadata.return_value = metadata_dict

metadata = [('x-api-key', _VALID_API_KEY)]
request = reproduce_pb2.GetJobResultsRequest(job_id=job_id)
response = self.stub.GetJobResults(request, metadata=metadata)

self.assertAlmostEqual(response.success_percentage, 50.0)
self.assertAlmostEqual(response.failure_percentage, 50.0)
self.assertEqual(len(response.results), 2)

if __name__ == '__main__':
unittest.main()
63 changes: 63 additions & 0 deletions src/clusterfuzz/grpc/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""gRPC authentication interceptor for API key validation."""

import os
import grpc
from typing import Callable, Any, Set

def _get_valid_api_keys() -> Set[str]:
"""
Retrieves valid API keys from the 'VALID_API_KEYS' environment variable.
Keys should be a comma-separated string.
"""
keys_str = os.environ.get('VALID_API_KEYS', '')
if not keys_str:
# For local development and testing, provide a default key.
# In production, the environment variable should always be set.
print("WARNING: VALID_API_KEYS environment variable not set. Using default developer key.")
return {"test-key-1"}
return set(key.strip() for key in keys_str.split(','))

class ApiKeyInterceptor(grpc.ServerInterceptor):
"""An interceptor to validate an API key from request metadata."""

def __init__(self):
self._valid_keys = _get_valid_api_keys()

def intercept_service(self, continuation: Callable,
handler_call_details: grpc.HandlerCallDetails) -> Any:
"""
Intercepts a service call to perform authentication.
"""
metadata = dict(handler_call_details.invocation_metadata)
api_key = metadata.get('x-api-key')

if self._is_valid_key(api_key):
return continuation(handler_call_details)
else:
context = grpc.ServicerContext()
context.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid or missing API key")

def _is_valid_key(self, api_key: str) -> bool:
"""Checks if the provided API key is valid."""
if not api_key:
print("Authentication failed: API key is missing.")
return False

if api_key not in self._valid_keys:
print(f"Authentication failed: Invalid API key.")
return False

return True
146 changes: 146 additions & 0 deletions src/clusterfuzz/grpc/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Core logic for managing and running reproduction jobs with persistence."""

import dbm
import json
import threading
import uuid
from concurrent import futures
from typing import Dict, Any

from google.protobuf import timestamp_pb2, json_format
from clusterfuzz.grpc import reproduce_pb2
from clusterfuzz.grpc.reproduce_logic import initialize_environment, reproduce_testcase_by_id

_DB_PATH = './jobs.db'
_db_lock = threading.Lock()

class JobStore:
"""A class to manage the state of reproduction jobs with DBM persistence."""

def _serialize(self, data: Dict[str, Any]) -> str:
"""Serializes job data, converting protobuf messages to dicts."""
if 'job' in data:
data['job'] = json_format.MessageToDict(data['job'])
return json.dumps(data)

def _deserialize(self, data_str: str) -> Dict[str, Any]:
"""Deserializes job data, converting dicts back to protobuf messages."""
data = json.loads(data_str)
if 'job' in data:
data['job'] = json_format.ParseDict(data['job'], reproduce_pb2.Job())
return data

def create_job(self, testcase_ids, image_tag, time_limit) -> reproduce_pb2.Job:
"""Creates a new job and stores it in the DBM file."""
job_id = str(uuid.uuid4())
now = timestamp_pb2.Timestamp()
now.GetCurrentTime()
job = reproduce_pb2.Job(
job_id=job_id,
status=reproduce_pb2.JOB_STATUS_QUEUED,
creation_time=now)

# Track individual testcase statuses.
testcase_statuses = {str(tid): reproduce_pb2.TEST_CASE_STATUS_PENDING for tid in testcase_ids}

job_metadata = {
'job': job,
'testcase_ids': testcase_ids,
'image_tag': image_tag,
'time_limit': time_limit.ToJsonString(),
'testcase_statuses': testcase_statuses,
'updates': [],
}

with _db_lock:
with dbm.open(_DB_PATH, 'c') as db:
db[job_id] = self._serialize(job_metadata)

return job

def get_job_metadata(self, job_id: str) -> Dict[str, Any]:
"""Retrieves full job metadata from the DBM file."""
with _db_lock:
with dbm.open(_DB_PATH, 'r') as db:
if job_id not in db:
return None
return self._deserialize(db[job_id])

def get_job(self, job_id: str):
"""Retrieves a job by its ID."""
metadata = self.get_job_metadata(job_id)
return metadata.get('job') if metadata else None

def _update_metadata(self, job_id: str, metadata: Dict[str, Any]):
"""Writes updated metadata back to the DBM file."""
with _db_lock:
with dbm.open(_DB_PATH, 'w') as db:
db[job_id] = self._serialize(metadata)

def update_job_status(self, job_id: str, status: reproduce_pb2.JobStatus):
"""Updates the overall status of a job."""
metadata = self.get_job_metadata(job_id)
if metadata:
metadata['job'].status = status
if status in [reproduce_pb2.JOB_STATUS_COMPLETED, reproduce_pb2.JOB_STATUS_FAILED]:
now = timestamp_pb2.Timestamp()
now.GetCurrentTime()
metadata['job'].completion_time = now
self._update_metadata(job_id, metadata)

def update_testcase_status(self, job_id: str, testcase_id: int, status: reproduce_pb2.TestCaseStatus):
"""Updates the status of a single testcase within a job."""
metadata = self.get_job_metadata(job_id)
if metadata:
metadata['testcase_statuses'][str(testcase_id)] = status
self._update_metadata(job_id, metadata)


class JobRunner:
"""A class responsible for running the reproduction logic."""
def __init__(self):
self._executor = futures.ThreadPoolExecutor(max_workers=4)
self._job_store = JobStore()
initialize_environment('./configs/local')

def start_job(self, job: reproduce_pb2.Job):
"""Starts a job execution in the background."""
self._executor.submit(self._run_job, job.job_id)

def _run_job(self, job_id: str):
"""The actual job execution logic."""
print(f"Starting execution for job {job_id}")
self._job_store.update_job_status(job_id, reproduce_pb2.JOB_STATUS_RUNNING)
metadata = self._job_store.get_job_metadata(job_id)
if not metadata:
print(f"Job {job_id} not found.")
return

testcase_ids = metadata.get('testcase_ids', [])
for testcase_id in testcase_ids:
self._job_store.update_testcase_status(job_id, testcase_id, reproduce_pb2.TEST_CASE_STATUS_RUNNING)
try:
print(f"Reproducing testcase {testcase_id} for job {job_id}")
reproduce_testcase_by_id(testcase_id, './configs/local')
# Assuming success if no exception is raised. A real implementation
# would return a status from the reproduce function.
self._job_store.update_testcase_status(job_id, testcase_id, reproduce_pb2.TEST_CASE_STATUS_REPRODUCED)
except Exception as e:
print(f"Error reproducing testcase {testcase_id}: {e}")
self._job_store.update_testcase_status(job_id, testcase_id, reproduce_pb2.TEST_CASE_STATUS_FAILED_REPRODUCTION)

print(f"Finished execution for job {job_id}")
self._job_store.update_job_status(job_id, reproduce_pb2.JOB_STATUS_COMPLETED)
Loading
Loading