Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Nov 9, 2025

📄 5% (0.05x) speedup for JiraDataSource.create_issue_type_screen_scheme in backend/python/app/sources/external/jira/jira.py

⏱️ Runtime : 2.29 milliseconds 2.17 milliseconds (best of 250 runs)

📝 Explanation and details

The optimization achieves a 5% runtime improvement through strategic memory allocation reduction in the HTTP request construction path. Here's what was optimized:

Key Optimizations:

  1. Module-level constant reuse: Replaced repeated dict allocations with pre-allocated constants:

    • _EMPTY_DICT for always-empty _path and _query parameters
    • _DEFAULT_HEADERS for the standard Content-Type header
    • Only creates new dicts when custom headers are actually provided (4 out of 556 calls in profiling)
  2. Conditional header construction: The original code always created a new dict with dict(headers or {}) and called setdefault(). The optimized version only allocates when headers are provided, otherwise reuses the constant.

  3. Safer URL formatting: Changed from .format(**request.path_params) to .format_map(request.path_params) in HTTPClient for more robust parameter substitution.

Performance Impact:

The line profiler shows the optimization reduced overhead in dictionary operations - particularly beneficial for the common case where no custom headers are provided (552/556 calls). The _as_str_dict function time decreased from 1.81ms to 1.72ms, and overall request construction became more efficient.

Test Case Performance:
The optimization is most effective for:

  • High-volume concurrent requests (throughput tests with 50-200 requests)
  • Repeated calls with default parameters (no custom headers)
  • Basic usage scenarios that don't require custom headers

While throughput remained constant at 139k ops/sec due to the async nature being I/O bound, the 5% runtime reduction means less CPU overhead per request, which is valuable in high-frequency API scenarios where this Jira integration might be called repeatedly.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 584 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 93.8%
🌀 Generated Regression Tests and Runtime

import asyncio # used to run async functions

import pytest # used for our unit tests
from app.sources.external.jira.jira import JiraDataSource

--- Minimal stubs for dependencies to allow testing JiraDataSource.create_issue_type_screen_scheme ---

class HTTPResponse:
"""Stub for HTTPResponse, simulates a real HTTP response."""
def init(self, data):
self.data = data

def json(self):
    return self.data

class HTTPRequest:
"""Stub for HTTPRequest, simulates an HTTP request object."""
def init(self, method, url, headers, path_params, query_params, body):
self.method = method
self.url = url
self.headers = headers
self.path_params = path_params
self.query_params = query_params
self.body = body

--- Stubs for JiraClient and its get_client() and get_base_url() ---

class DummyClient:
"""Stub for the underlying HTTP client used by JiraDataSource."""
def init(self, base_url="http://jira.example.com"):
self._base_url = base_url
self.executed_requests = []

def get_base_url(self):
    """Return the base URL."""
    return self._base_url

async def execute(self, request):
    """Simulate executing an HTTP request asynchronously."""
    self.executed_requests.append(request)
    # Return a dummy HTTPResponse containing the request for inspection
    return HTTPResponse({
        "method": request.method,
        "url": request.url,
        "headers": request.headers,
        "path_params": request.path_params,
        "query_params": request.query_params,
        "body": request.body
    })

class JiraClient:
"""Stub JiraClient that returns a DummyClient."""
def init(self, client):
self.client = client

def get_client(self):
    return self.client

from app.sources.external.jira.jira import JiraDataSource

--- Unit tests for JiraDataSource.create_issue_type_screen_scheme ---

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_basic():
"""Basic test: normal usage with required fields."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [{"issueTypeId": "10001", "screenSchemeId": "20001"}]
name = "My Screen Scheme"
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name)
data = resp.json()

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_with_description_and_headers():
"""Test with optional description and custom headers."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [{"issueTypeId": "10002", "screenSchemeId": "20002"}]
name = "Another Scheme"
description = "This is a test scheme"
headers = {"X-Custom-Header": "foobar"}
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name, description, headers)
data = resp.json()

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_empty_issueTypeMappings():
"""Edge case: empty issueTypeMappings list."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = []
name = "EmptyMappings"
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name)
data = resp.json()

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_none_client_raises():
"""Edge case: JiraDataSource with None client should raise ValueError."""
class BadJiraClient:
def get_client(self):
return None
with pytest.raises(ValueError, match="HTTP client is not initialized"):
JiraDataSource(BadJiraClient())

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_client_missing_get_base_url():
"""Edge case: client missing get_base_url method should raise ValueError."""
class BadClient:
pass
class BadJiraClient:
def get_client(self):
return BadClient()
with pytest.raises(ValueError, match="HTTP client does not have get_base_url method"):
JiraDataSource(BadJiraClient())

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_concurrent():
"""Edge case: concurrent calls with different data."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
# Prepare several coroutines with different names and mappings
coros = [
ds.create_issue_type_screen_scheme(
[{"issueTypeId": str(i), "screenSchemeId": str(i + 100)}],
f"Scheme {i}",
description=f"Desc {i}"
)
for i in range(5)
]
results = await asyncio.gather(*coros)
for i, resp in enumerate(results):
data = resp.json()

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_headers_override_content_type():
"""Edge case: headers explicitly override Content-Type."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [{"issueTypeId": "101", "screenSchemeId": "201"}]
name = "OverrideContentType"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name, headers=headers)
data = resp.json()

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_large_issueTypeMappings():
"""Large scale: test with a large number of issueTypeMappings."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [
{"issueTypeId": str(i), "screenSchemeId": str(i + 100)}
for i in range(200)
]
name = "LargeScheme"
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name)
data = resp.json()

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_concurrent_large_scale():
"""Large scale: concurrent execution with many coroutines."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
coros = [
ds.create_issue_type_screen_scheme(
[{"issueTypeId": str(i), "screenSchemeId": str(i + 100)}],
f"Scheme {i}"
)
for i in range(50)
]
results = await asyncio.gather(*coros)
# Assert all responses are correct
for i, resp in enumerate(results):
data = resp.json()

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_throughput_small_load():
"""Throughput test: small load, 10 requests."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
coros = [
ds.create_issue_type_screen_scheme(
[{"issueTypeId": str(i), "screenSchemeId": str(i + 10)}],
f"Scheme {i}"
)
for i in range(10)
]
results = await asyncio.gather(*coros)
for i, resp in enumerate(results):
data = resp.json()

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_throughput_medium_load():
"""Throughput test: medium load, 100 requests."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
coros = [
ds.create_issue_type_screen_scheme(
[{"issueTypeId": str(i), "screenSchemeId": str(i + 100)}],
f"Scheme {i}"
)
for i in range(100)
]
results = await asyncio.gather(*coros)
# Spot check a few results
for i in [0, 50, 99]:
data = results[i].json()

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_throughput_high_volume():
"""Throughput test: high volume, 200 requests."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
coros = [
ds.create_issue_type_screen_scheme(
[{"issueTypeId": str(i), "screenSchemeId": str(i + 200)}],
f"Scheme {i}"
)
for i in range(200)
]
results = await asyncio.gather(*coros)
# Spot check a few results
for i in [0, 100, 199]:
data = results[i].json()

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_async_await_behavior():
"""Basic async/await behavior: ensure coroutine returns only when awaited."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [{"issueTypeId": "123", "screenSchemeId": "456"}]
name = "AwaitTest"
codeflash_output = ds.create_issue_type_screen_scheme(issueTypeMappings, name); coro = codeflash_output
resp = await coro
data = resp.json()

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_invalid_headers_type():
"""Edge case: headers is not a dict (should fallback to empty dict)."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [{"issueTypeId": "999", "screenSchemeId": "888"}]
name = "BadHeadersType"
# Pass headers as a list (should fallback to empty dict)
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name, headers=None)
data = resp.json()

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_special_characters():
"""Edge case: name and description with special characters."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [{"issueTypeId": "spc", "screenSchemeId": "spc"}]
name = "特殊字符!@#"
description = "描述 with emoji 🚀"
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name, description)
data = resp.json()

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

#------------------------------------------------
import asyncio # used to run async functions
from typing import Any, Dict, List, Optional

import pytest # used for our unit tests
from app.sources.external.jira.jira import JiraDataSource

Minimal HTTPResponse and HTTPRequest for testing

class HTTPResponse:
def init(self, data: Any, status_code: int = 200):
self.data = data
self.status_code = status_code

def json(self):
    return self.data

class HTTPRequest:
def init(self, method, url, headers, path_params, query_params, body):
self.method = method
self.url = url
self.headers = headers
self.path_params = path_params
self.query_params = query_params
self.body = body

Mock Jira REST client

class MockJiraRESTClient:
def init(self, base_url: str):
self.base_url = base_url
self.executed_requests = []

def get_base_url(self):
    return self.base_url

async def execute(self, request: HTTPRequest):
    # Record the request for inspection
    self.executed_requests.append(request)
    # Simulate a response containing the request body
    return HTTPResponse({
        "received": request.body,
        "headers": request.headers,
        "url": request.url,
        "method": request.method
    }, status_code=201)

class JiraClient:
def init(self, client):
self.client = client

def get_client(self):
    return self.client

from app.sources.external.jira.jira import JiraDataSource

---------------------- TESTS BEGIN HERE ----------------------

1. BASIC TEST CASES

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_basic():
"""Test basic creation with required fields only."""
client = MockJiraRESTClient("https://example.atlassian.net")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": "10001", "screenSchemeId": "20001"}]
name = "Test Scheme"
resp = await ds.create_issue_type_screen_scheme(mappings, name)

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_with_description_and_headers():
"""Test creation with description and custom headers."""
client = MockJiraRESTClient("https://example.net")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": "2", "screenSchemeId": "3"}]
name = "Scheme with Desc"
description = "A test scheme with description"
headers = {"X-Custom-Header": "foobar"}
resp = await ds.create_issue_type_screen_scheme(mappings, name, description, headers)

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_empty_mappings():
"""Test with empty issueTypeMappings list."""
client = MockJiraRESTClient("https://jira.com")
ds = JiraDataSource(JiraClient(client))
mappings = []
name = "EmptyMappings"
resp = await ds.create_issue_type_screen_scheme(mappings, name)

2. EDGE TEST CASES

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_invalid_client_raises():
"""Test ValueError is raised if client is not initialized."""
class DummyJiraClient:
def get_client(self):
return None
with pytest.raises(ValueError, match="HTTP client is not initialized"):
JiraDataSource(DummyJiraClient())

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_missing_get_base_url():
"""Test ValueError is raised if client lacks get_base_url method."""
class DummyClient:
pass
class DummyJiraClient:
def get_client(self):
return DummyClient()
with pytest.raises(ValueError, match="HTTP client does not have get_base_url method"):
JiraDataSource(DummyJiraClient())

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_concurrent_execution():
"""Test concurrent execution of multiple requests."""
client = MockJiraRESTClient("https://concurrent.test")
ds = JiraDataSource(JiraClient(client))
mappings1 = [{"issueTypeId": "1", "screenSchemeId": "1"}]
mappings2 = [{"issueTypeId": "2", "screenSchemeId": "2"}]
name1 = "Scheme1"
name2 = "Scheme2"
# Run two requests concurrently
resp1, resp2 = await asyncio.gather(
ds.create_issue_type_screen_scheme(mappings1, name1),
ds.create_issue_type_screen_scheme(mappings2, name2, "desc2")
)

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_headers_override():
"""Test that user-provided Content-Type header is not overwritten."""
client = MockJiraRESTClient("https://header.override")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": "x", "screenSchemeId": "y"}]
name = "HeaderOverride"
headers = {"Content-Type": "application/xml"}
resp = await ds.create_issue_type_screen_scheme(mappings, name, headers=headers)

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_long_name_and_description():
"""Test with long name and description strings."""
client = MockJiraRESTClient("https://long.strings")
ds = JiraDataSource(JiraClient(client))
long_name = "N" * 255
long_desc = "D" * 1024
mappings = [{"issueTypeId": "a", "screenSchemeId": "b"}]
resp = await ds.create_issue_type_screen_scheme(mappings, long_name, long_desc)

3. LARGE SCALE TEST CASES

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_many_mappings():
"""Test with a large number of mappings (scalability)."""
client = MockJiraRESTClient("https://large.mappings")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": str(i), "screenSchemeId": str(i)} for i in range(200)]
name = "LargeMappings"
resp = await ds.create_issue_type_screen_scheme(mappings, name)

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_concurrent_high_volume():
"""Test concurrent execution with many parallel requests."""
client = MockJiraRESTClient("https://parallel.test")
ds = JiraDataSource(JiraClient(client))
names = [f"Scheme_{i}" for i in range(20)]
mappings = [{"issueTypeId": str(i), "screenSchemeId": str(i)} for i in range(3)]
# Launch 20 concurrent requests
responses = await asyncio.gather(*[
ds.create_issue_type_screen_scheme(mappings, name)
for name in names
])
for i, resp in enumerate(responses):
pass

4. THROUGHPUT TEST CASES

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_throughput_small_load():
"""Throughput: Test small batch of requests completes successfully."""
client = MockJiraRESTClient("https://throughput.small")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": "1", "screenSchemeId": "1"}]
# 5 concurrent requests
results = await asyncio.gather(*[
ds.create_issue_type_screen_scheme(mappings, f"SmallLoad_{i}")
for i in range(5)
])

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_throughput_medium_load():
"""Throughput: Test medium batch of requests completes successfully."""
client = MockJiraRESTClient("https://throughput.medium")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": "m", "screenSchemeId": "m"}]
# 50 concurrent requests
results = await asyncio.gather(*[
ds.create_issue_type_screen_scheme(mappings, f"MediumLoad_{i}")
for i in range(50)
])
# Check that all names are unique in the responses
names = set(r.data["received"]["name"] for r in results)

@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_throughput_large_load():
"""Throughput: Test large batch of requests completes successfully."""
client = MockJiraRESTClient("https://throughput.large")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": "l", "screenSchemeId": "l"}]
# 100 concurrent requests (keep < 1000 for performance)
results = await asyncio.gather(*[
ds.create_issue_type_screen_scheme(mappings, f"LargeLoad_{i}")
for i in range(100)
])
# Check that all responses are HTTPResponse and have correct mapping
for r in results:
pass

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-JiraDataSource.create_issue_type_screen_scheme-mhs9wi1k and push.

Codeflash Static Badge

The optimization achieves a **5% runtime improvement** through strategic memory allocation reduction in the HTTP request construction path. Here's what was optimized:

**Key Optimizations:**

1. **Module-level constant reuse**: Replaced repeated dict allocations with pre-allocated constants:
   - `_EMPTY_DICT` for always-empty `_path` and `_query` parameters
   - `_DEFAULT_HEADERS` for the standard Content-Type header
   - Only creates new dicts when custom headers are actually provided (4 out of 556 calls in profiling)

2. **Conditional header construction**: The original code always created a new dict with `dict(headers or {})` and called `setdefault()`. The optimized version only allocates when headers are provided, otherwise reuses the constant.

3. **Safer URL formatting**: Changed from `.format(**request.path_params)` to `.format_map(request.path_params)` in HTTPClient for more robust parameter substitution.

**Performance Impact:**

The line profiler shows the optimization reduced overhead in dictionary operations - particularly beneficial for the common case where no custom headers are provided (552/556 calls). The `_as_str_dict` function time decreased from 1.81ms to 1.72ms, and overall request construction became more efficient.

**Test Case Performance:**
The optimization is most effective for:
- High-volume concurrent requests (throughput tests with 50-200 requests)
- Repeated calls with default parameters (no custom headers)
- Basic usage scenarios that don't require custom headers

While throughput remained constant at 139k ops/sec due to the async nature being I/O bound, the 5% runtime reduction means less CPU overhead per request, which is valuable in high-frequency API scenarios where this Jira integration might be called repeatedly.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 November 9, 2025 22:16
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Nov 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant