Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Nov 9, 2025

📄 20% (0.20x) speedup for JiraDataSource.get_issue_type_screen_schemes in backend/python/app/sources/external/jira/jira.py

⏱️ Runtime : 1.96 milliseconds 1.63 milliseconds (best of 250 runs)

📝 Explanation and details

The optimized code achieves a 20% runtime improvement through three key micro-optimizations that reduce unnecessary overhead in data serialization:

1. Early Exit in Helper Functions

  • _safe_format_url() now checks if not params: before attempting format_map operations, avoiding expensive _SafeDict creation and exception handling when _path is empty (which it always is in this case)
  • _as_str_dict() adds early return if not d: to skip dict comprehension overhead for empty dictionaries

2. Optimized Header Handling
Changed dict(headers or {}) to headers.copy() if headers is not None else {}, eliminating the dict() constructor call and or evaluation when headers is None (the common case).

3. Direct Empty Dict Assignment
Replaced _as_str_dict(_path) with direct {} assignment for path_params, since _path is always empty. This eliminates a function call entirely.

Performance Analysis from Profiling:

  • _safe_format_url() time dropped from 630μs to 176μs (72% reduction) by avoiding format_map operations
  • _as_str_dict() shows improved per-hit performance due to early exits for empty dictionaries
  • Overall function time reduced from 10.2ms to 8.8ms

Test Case Performance:
The optimizations are most effective for typical API calls with minimal parameters (like basic pagination), where empty dictionaries and None headers are common. Large-scale concurrent tests show consistent improvements across different load patterns.

While throughput remains the same at 118,250 ops/sec, the 20% runtime reduction means lower CPU utilization per operation, making this optimization valuable for high-frequency API integrations.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 499 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 95.7%
🌀 Generated Regression Tests and Runtime

import asyncio
from typing import Any, Dict, Optional, Union

import pytest
from app.sources.external.jira.jira import JiraDataSource

--- Minimal stubs for required classes and helpers ---

class HTTPRequest:
def init(self, method, url, headers, path_params, query_params, body):
self.method = method
self.url = url
self.headers = headers
self.path_params = path_params
self.query_params = query_params
self.body = body

class HTTPResponse:
def init(self, data):
self.data = data

class DummyAsyncClient:
"""A dummy async client that records execute() calls and returns canned HTTPResponse."""
def init(self):
self.executed_requests = []
self.base_url = "https://dummy.atlassian.net"
self.raise_on_execute = False
self.response_data = {"ok": True}

async def execute(self, req):
    if self.raise_on_execute:
        raise RuntimeError("Simulated client error")
    self.executed_requests.append(req)
    return HTTPResponse(self.response_data)

def get_base_url(self):
    return self.base_url

class DummyJiraClient:
"""Mimics the JiraClient wrapper."""
def init(self, client):
self.client = client

def get_client(self):
    return self.client

from app.sources.external.jira.jira import JiraDataSource

--- TESTS ---

1. BASIC TEST CASES

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_basic_no_params():
"""Test basic call with no parameters returns HTTPResponse and correct request."""
dummy_client = DummyAsyncClient()
ds = JiraDataSource(DummyJiraClient(dummy_client))
resp = await ds.get_issue_type_screen_schemes()
req = dummy_client.executed_requests[-1]

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_basic_with_params():
"""Test with all parameters set."""
dummy_client = DummyAsyncClient()
ds = JiraDataSource(DummyJiraClient(dummy_client))
params = {
"startAt": 10,
"maxResults": 50,
"id": [1, 2, 3],
"queryString": "abc",
"orderBy": "name",
"expand": "all",
"headers": {"X-Test": "yes"}
}
resp = await ds.get_issue_type_screen_schemes(**params)
req = dummy_client.executed_requests[-1]

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_basic_id_empty_list():
"""Test with id as empty list (should serialize to empty string)."""
dummy_client = DummyAsyncClient()
ds = JiraDataSource(DummyJiraClient(dummy_client))
resp = await ds.get_issue_type_screen_schemes(id=[])
req = dummy_client.executed_requests[-1]

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_basic_headers_none():
"""Test with headers=None does not raise and yields no headers."""
dummy_client = DummyAsyncClient()
ds = JiraDataSource(DummyJiraClient(dummy_client))
resp = await ds.get_issue_type_screen_schemes(headers=None)
req = dummy_client.executed_requests[-1]

2. EDGE TEST CASES

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_concurrent():
"""Test concurrent calls with different parameters."""
dummy_client = DummyAsyncClient()
ds = JiraDataSource(DummyJiraClient(dummy_client))
# Launch several concurrent calls with different params
tasks = [
ds.get_issue_type_screen_schemes(startAt=i)
for i in range(5)
]
results = await asyncio.gather(*tasks)
# Each request should have a different startAt param
start_ats = [int(req.query_params.get("startAt", 0)) for req in dummy_client.executed_requests]

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_client_not_initialized():
"""Test ValueError is raised if client is None."""
class NullJiraClient:
def get_client(self):
return None
with pytest.raises(ValueError, match="HTTP client is not initialized"):
JiraDataSource(NullJiraClient())

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_client_missing_get_base_url():
"""Test ValueError is raised if client lacks get_base_url."""
class NoBaseUrlClient:
pass
class NoBaseUrlJiraClient:
def get_client(self):
return NoBaseUrlClient()
with pytest.raises(ValueError, match="does not have get_base_url method"):
JiraDataSource(NoBaseUrlJiraClient())

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_execute_raises():
"""Test that exceptions from client.execute are propagated."""
dummy_client = DummyAsyncClient()
dummy_client.raise_on_execute = True
ds = JiraDataSource(DummyJiraClient(dummy_client))
with pytest.raises(RuntimeError, match="Simulated client error"):
await ds.get_issue_type_screen_schemes()

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_query_param_types():
"""Test that bool and set/tuple values are serialized correctly."""
dummy_client = DummyAsyncClient()
ds = JiraDataSource(DummyJiraClient(dummy_client))
resp = await ds.get_issue_type_screen_schemes(
id={7, 8}, # set
queryString=True, # bool
orderBy=False, # bool
)
req = dummy_client.executed_requests[-1]
# id should be "7,8" (order not guaranteed for set)
id_val = req.query_params.get("id")

3. LARGE SCALE TEST CASES

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_large_id_list():
"""Test with a large id list (up to 500 elements)."""
dummy_client = DummyAsyncClient()
ds = JiraDataSource(DummyJiraClient(dummy_client))
id_list = list(range(500))
resp = await ds.get_issue_type_screen_schemes(id=id_list)
req = dummy_client.executed_requests[-1]
# id should be comma-separated string of all numbers
id_val = req.query_params.get("id")

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_many_concurrent():
"""Test 50 concurrent calls (large scale async)."""
dummy_client = DummyAsyncClient()
ds = JiraDataSource(DummyJiraClient(dummy_client))
n = 50
tasks = [
ds.get_issue_type_screen_schemes(startAt=i, maxResults=i+1)
for i in range(n)
]
results = await asyncio.gather(*tasks)
# Check a few requests for correct query params
for i in [0, 10, 25, 49]:
req = dummy_client.executed_requests[i]

4. THROUGHPUT TEST CASES

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_throughput_small_load():
"""Throughput: 10 quick concurrent calls."""
dummy_client = DummyAsyncClient()
ds = JiraDataSource(DummyJiraClient(dummy_client))
n = 10
tasks = [ds.get_issue_type_screen_schemes(startAt=i) for i in range(n)]
results = await asyncio.gather(*tasks)
# All requests should be unique by startAt
start_ats = [int(req.query_params.get("startAt", 0)) for req in dummy_client.executed_requests]

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_throughput_medium_load():
"""Throughput: 100 concurrent calls, medium load."""
dummy_client = DummyAsyncClient()
ds = JiraDataSource(DummyJiraClient(dummy_client))
n = 100
tasks = [ds.get_issue_type_screen_schemes(startAt=i) for i in range(n)]
results = await asyncio.gather(*tasks)
# Requests should all be present
start_ats = sorted(int(req.query_params.get("startAt", 0)) for req in dummy_client.executed_requests)

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_throughput_sustained_pattern():
"""Throughput: sustained execution pattern with repeated calls."""
dummy_client = DummyAsyncClient()
ds = JiraDataSource(DummyJiraClient(dummy_client))
n = 20
for i in range(5): # 5 rounds of 20 calls
tasks = [ds.get_issue_type_screen_schemes(startAt=j, maxResults=j+1) for j in range(n)]
results = await asyncio.gather(*tasks)
# Check that requests from last round are correct
for i in range(80, 100):
req = dummy_client.executed_requests[i]
idx = i - 80

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

#------------------------------------------------
import asyncio # Used for async test utilities

import pytest # Pytest for unit testing
from app.sources.external.jira.jira import JiraDataSource

---- Minimal stubs and helpers for the test environment ----

Simulate HTTPRequest and HTTPResponse classes

class HTTPRequest:
def init(self, method, url, headers, path_params, query_params, body):
self.method = method
self.url = url
self.headers = headers
self.path_params = path_params
self.query_params = query_params
self.body = body

class HTTPResponse:
def init(self, data):
# Simulate the actual response object
self.data = data

---- Minimal stub for JiraClient and its underlying client ----

class DummyClient:
def init(self, base_url="https://dummy.atlassian.net"):
self._base_url = base_url
self.executed_requests = [] # For test inspection

def get_base_url(self):
    return self._base_url

async def execute(self, req):
    # Simulate a successful HTTPResponse with echo of query params
    self.executed_requests.append(req)
    # For edge case testing, simulate error if query param triggers it
    if req.query_params.get("queryString") == "error":
        raise RuntimeError("Simulated error for queryString=error")
    # Echo back the request for inspection
    return HTTPResponse({
        "method": req.method,
        "url": req.url,
        "headers": req.headers,
        "path_params": req.path_params,
        "query_params": req.query_params,
        "body": req.body
    })

class JiraClient:
def init(self, client):
self.client = client

def get_client(self):
    return self.client

from app.sources.external.jira.jira import JiraDataSource

---- Unit Tests ----

@pytest.mark.asyncio
async def test_basic_no_params():
"""Test basic async/await with no parameters."""
ds = JiraDataSource(JiraClient(DummyClient()))
resp = await ds.get_issue_type_screen_schemes()

@pytest.mark.asyncio
async def test_basic_with_all_params():
"""Test with all parameters set."""
ds = JiraDataSource(JiraClient(DummyClient()))
resp = await ds.get_issue_type_screen_schemes(
startAt=10,
maxResults=50,
id=[1,2,3],
queryString="test",
orderBy="name",
expand="details",
headers={"X-Test": "yes"}
)
# Check that all query params are present and serialized correctly
qp = resp.data["query_params"]

@pytest.mark.asyncio
async def test_basic_id_empty_list():
"""Test with empty id list."""
ds = JiraDataSource(JiraClient(DummyClient()))
resp = await ds.get_issue_type_screen_schemes(id=[])

@pytest.mark.asyncio
async def test_basic_id_singleton():
"""Test with id as single element list."""
ds = JiraDataSource(JiraClient(DummyClient()))
resp = await ds.get_issue_type_screen_schemes(id=[42])

@pytest.mark.asyncio
async def test_basic_headers_none():
"""Test with headers=None."""
ds = JiraDataSource(JiraClient(DummyClient()))
resp = await ds.get_issue_type_screen_schemes(headers=None)

---- Edge Cases ----

@pytest.mark.asyncio
async def test_edge_missing_client_raises():
"""Test ValueError when client is None."""
class BadJiraClient:
def get_client(self):
return None
with pytest.raises(ValueError, match="HTTP client is not initialized"):
JiraDataSource(BadJiraClient())

@pytest.mark.asyncio
async def test_edge_missing_base_url_method_raises():
"""Test ValueError when client lacks get_base_url."""
class NoBaseUrlClient:
pass
jc = JiraClient(NoBaseUrlClient())
with pytest.raises(ValueError, match="HTTP client does not have get_base_url method"):
JiraDataSource(jc)

@pytest.mark.asyncio
async def test_edge_concurrent_execution():
"""Test concurrent execution of multiple requests."""
ds = JiraDataSource(JiraClient(DummyClient()))
# Run 5 concurrent requests with different queryStrings
tasks = [
ds.get_issue_type_screen_schemes(queryString=f"q{i}")
for i in range(5)
]
results = await asyncio.gather(*tasks)
# All should be HTTPResponse and have correct queryString
for i, resp in enumerate(results):
pass

@pytest.mark.asyncio

async def test_edge_expand_none_and_empty():
"""Test expand=None and expand=''."""
ds = JiraDataSource(JiraClient(DummyClient()))
# None: should not be present
resp_none = await ds.get_issue_type_screen_schemes(expand=None)
# Empty string: should be present as empty string
resp_empty = await ds.get_issue_type_screen_schemes(expand="")

---- Large Scale ----

@pytest.mark.asyncio
async def test_large_scale_many_concurrent():
"""Test 50 concurrent requests with different ids."""
ds = JiraDataSource(JiraClient(DummyClient()))
ids = [[i, i+1, i+2] for i in range(50)]
tasks = [
ds.get_issue_type_screen_schemes(id=id_list)
for id_list in ids
]
results = await asyncio.gather(*tasks)
# Each response's id should match the input, serialized
for i, resp in enumerate(results):
expected = ",".join(str(x) for x in ids[i])

@pytest.mark.asyncio
async def test_large_scale_max_results_boundary():
"""Test boundary values for maxResults."""
ds = JiraDataSource(JiraClient(DummyClient()))
# Test 0
resp0 = await ds.get_issue_type_screen_schemes(maxResults=0)
# Test 1000
resp1k = await ds.get_issue_type_screen_schemes(maxResults=1000)

---- Throughput ----

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_throughput_small_load():
"""Throughput test: 10 sequential requests."""
ds = JiraDataSource(JiraClient(DummyClient()))
for i in range(10):
resp = await ds.get_issue_type_screen_schemes(queryString=f"throughput{i}")

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_throughput_medium_concurrent():
"""Throughput test: 25 concurrent requests."""
ds = JiraDataSource(JiraClient(DummyClient()))
tasks = [
ds.get_issue_type_screen_schemes(queryString=f"tp{i}")
for i in range(25)
]
results = await asyncio.gather(*tasks)
for i, resp in enumerate(results):
pass

@pytest.mark.asyncio
async def test_get_issue_type_screen_schemes_throughput_large_concurrent():
"""Throughput test: 100 concurrent requests."""
ds = JiraDataSource(JiraClient(DummyClient()))
tasks = [
ds.get_issue_type_screen_schemes(queryString=f"load{i}")
for i in range(100)
]
results = await asyncio.gather(*tasks)
for i, resp in enumerate(results):
pass

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-JiraDataSource.get_issue_type_screen_schemes-mhs9gjrl and push.

Codeflash Static Badge

The optimized code achieves a **20% runtime improvement** through three key micro-optimizations that reduce unnecessary overhead in data serialization:

**1. Early Exit in Helper Functions**
- `_safe_format_url()` now checks `if not params:` before attempting format_map operations, avoiding expensive `_SafeDict` creation and exception handling when `_path` is empty (which it always is in this case)
- `_as_str_dict()` adds early return `if not d:` to skip dict comprehension overhead for empty dictionaries

**2. Optimized Header Handling**
Changed `dict(headers or {})` to `headers.copy() if headers is not None else {}`, eliminating the `dict()` constructor call and `or` evaluation when headers is None (the common case).

**3. Direct Empty Dict Assignment**
Replaced `_as_str_dict(_path)` with direct `{}` assignment for `path_params`, since `_path` is always empty. This eliminates a function call entirely.

**Performance Analysis from Profiling:**
- `_safe_format_url()` time dropped from 630μs to 176μs (72% reduction) by avoiding format_map operations
- `_as_str_dict()` shows improved per-hit performance due to early exits for empty dictionaries
- Overall function time reduced from 10.2ms to 8.8ms

**Test Case Performance:**
The optimizations are most effective for typical API calls with minimal parameters (like basic pagination), where empty dictionaries and None headers are common. Large-scale concurrent tests show consistent improvements across different load patterns.

While throughput remains the same at 118,250 ops/sec, the 20% runtime reduction means lower CPU utilization per operation, making this optimization valuable for high-frequency API integrations.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 November 9, 2025 22:04
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: Medium Optimization Quality according to Codeflash labels Nov 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: Medium Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant