Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Nov 9, 2025

📄 7% (0.07x) speedup for JiraDataSource.create_issue_type_scheme in backend/python/app/sources/external/jira/jira.py

⏱️ Runtime : 1.32 milliseconds 1.23 milliseconds (best of 250 runs)

📝 Explanation and details

The optimized code achieves a 7% runtime improvement through a targeted memory allocation optimization in the create_issue_type_scheme method.

Key Optimization:
The original code created three empty dictionaries (_path, _query, _body) and then passed them through expensive _as_str_dict() conversions even when they were empty. The optimized version eliminates two of these allocations:

  • Removed unnecessary dict allocations: _path: Dict[str, Any] = {} and _query: Dict[str, Any] = {} were eliminated since they're always empty for this endpoint
  • Direct empty dict literals: Instead of calling _as_str_dict(_path) and _as_str_dict(_query), the optimized code passes {} directly to the HTTPRequest constructor
  • Simplified URL construction: _safe_format_url(rel_path, {}) uses an empty dict literal instead of the _path variable

Performance Impact:
The line profiler shows the optimization reduced _as_str_dict calls from 855 hits (0.948ms) to 285 hits (0.625ms) - a 34% reduction in this expensive string conversion function. The path_params and query_params processing overhead dropped significantly, as seen in the HTTPRequest construction time.

Workload Benefits:
This optimization is most effective for:

  • High-frequency API calls where the 7% per-call improvement compounds significantly
  • Batch operations creating multiple issue type schemes, as demonstrated in the throughput tests
  • Memory-constrained environments where reducing allocations helps overall system performance

The optimization maintains identical functionality and error handling while reducing memory pressure and CPU overhead from unnecessary dictionary operations.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 314 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 94.4%
🌀 Generated Regression Tests and Runtime

import asyncio # used to run async functions

import pytest # used for our unit tests
from app.sources.external.jira.jira import JiraDataSource

--- Minimal stubs for dependencies ---

class HTTPResponse:
"""Stub for HTTPResponse, simulates a real HTTP response."""
def init(self, data):
self.data = data

def json(self):
    return self.data

class HTTPRequest:
"""Stub for HTTPRequest, holds request data for inspection."""
def init(self, method, url, headers, path_params, query_params, body):
self.method = method
self.url = url
self.headers = headers
self.path_params = path_params
self.query_params = query_params
self.body = body

class DummyAsyncClient:
"""Dummy async client to simulate HTTPClient's execute method."""
def init(self, base_url):
self.base_url = base_url

def get_base_url(self):
    return self.base_url

async def execute(self, req):
    # Simulate a successful response containing the request data for verification
    return HTTPResponse({
        "method": req.method,
        "url": req.url,
        "headers": req.headers,
        "path_params": req.path_params,
        "query_params": req.query_params,
        "body": req.body
    })

class JiraClient:
"""Stub JiraClient to wrap DummyAsyncClient."""
def init(self, client):
self.client = client

def get_client(self):
    return self.client

from app.sources.external.jira.jira import JiraDataSource

---- Unit tests ----

Helper to create a JiraDataSource with a dummy async client

def make_datasource(base_url="https://jira.example.com"):
client = DummyAsyncClient(base_url)
return JiraDataSource(JiraClient(client))

---- 1. Basic Test Cases ----

@pytest.mark.asyncio
async def test_create_issue_type_scheme_basic_minimal():
"""Test with minimal required arguments."""
ds = make_datasource()
issueTypeIds = ["10001"]
name = "Scheme A"
resp = await ds.create_issue_type_scheme(issueTypeIds, name)

@pytest.mark.asyncio
async def test_create_issue_type_scheme_basic_all_fields():
"""Test with all fields provided."""
ds = make_datasource()
issueTypeIds = ["10001", "10002"]
name = "Scheme B"
defaultIssueTypeId = "10001"
description = "A test scheme"
headers = {"X-Test": "testvalue"}
resp = await ds.create_issue_type_scheme(
issueTypeIds, name, defaultIssueTypeId, description, headers
)

@pytest.mark.asyncio
async def test_create_issue_type_scheme_basic_async_await():
"""Test that the function returns a coroutine and must be awaited."""
ds = make_datasource()
codeflash_output = ds.create_issue_type_scheme(["10001"], "Scheme C"); coro = codeflash_output
resp = await coro

---- 2. Edge Test Cases ----

@pytest.mark.asyncio
async def test_create_issue_type_scheme_edge_empty_issueTypeIds():
"""Test with an empty issueTypeIds list (edge case)."""
ds = make_datasource()
resp = await ds.create_issue_type_scheme([], "Empty Scheme")

@pytest.mark.asyncio
async def test_create_issue_type_scheme_edge_empty_name():
"""Test with empty string for name (edge case)."""
ds = make_datasource()
resp = await ds.create_issue_type_scheme(["10001"], "")

@pytest.mark.asyncio
async def test_create_issue_type_scheme_edge_none_headers():
"""Test with headers=None, should default Content-Type."""
ds = make_datasource()
resp = await ds.create_issue_type_scheme(["10001"], "Scheme D", headers=None)

@pytest.mark.asyncio
async def test_create_issue_type_scheme_edge_custom_content_type_header():
"""Test with custom Content-Type header, should not override."""
ds = make_datasource()
headers = {"Content-Type": "application/xml"}
resp = await ds.create_issue_type_scheme(["10001"], "Scheme E", headers=headers)

@pytest.mark.asyncio
async def test_create_issue_type_scheme_edge_concurrent_execution():
"""Test concurrent execution of multiple requests."""
ds = make_datasource()
tasks = [
ds.create_issue_type_scheme([str(i)], f"Scheme {i}")
for i in range(5)
]
results = await asyncio.gather(*tasks)
for i, resp in enumerate(results):
pass

@pytest.mark.asyncio
async def test_create_issue_type_scheme_edge_missing_client_raises():
"""Test ValueError is raised if client is None."""
class BadClient:
def get_client(self):
return None
with pytest.raises(ValueError, match="HTTP client is not initialized"):
JiraDataSource(BadClient())

@pytest.mark.asyncio
async def test_create_issue_type_scheme_edge_missing_get_base_url_raises():
"""Test ValueError is raised if client lacks get_base_url."""
class BadClientObj:
pass
class BadClient:
def get_client(self):
return BadClientObj()
with pytest.raises(ValueError, match="HTTP client does not have get_base_url method"):
JiraDataSource(BadClient())

---- 3. Large Scale Test Cases ----

@pytest.mark.asyncio
async def test_create_issue_type_scheme_large_many_issue_types():
"""Test with a large number of issueTypeIds."""
ds = make_datasource()
issueTypeIds = [str(i) for i in range(100)]
name = "Large Scheme"
resp = await ds.create_issue_type_scheme(issueTypeIds, name)

@pytest.mark.asyncio
async def test_create_issue_type_scheme_large_concurrent_requests():
"""Test many concurrent requests (scalability)."""
ds = make_datasource()
N = 50 # Reasonable concurrency for unit test
tasks = [
ds.create_issue_type_scheme([str(i)], f"Scheme {i}")
for i in range(N)
]
results = await asyncio.gather(*tasks)
for i, resp in enumerate(results):
pass

---- 4. Throughput Test Cases ----

@pytest.mark.asyncio
async def test_create_issue_type_scheme_throughput_small_load():
"""Throughput test: small load of requests."""
ds = make_datasource()
tasks = [
ds.create_issue_type_scheme([str(i)], f"Scheme {i}")
for i in range(5)
]
results = await asyncio.gather(*tasks)
for resp in results:
pass

@pytest.mark.asyncio
async def test_create_issue_type_scheme_throughput_medium_load():
"""Throughput test: medium load of requests."""
ds = make_datasource()
tasks = [
ds.create_issue_type_scheme([str(i)], f"Scheme {i}")
for i in range(20)
]
results = await asyncio.gather(*tasks)
for resp in results:
pass

@pytest.mark.asyncio
async def test_create_issue_type_scheme_throughput_large_load():
"""Throughput test: large load of requests."""
ds = make_datasource()
N = 100 # Large but safe for unit test
tasks = [
ds.create_issue_type_scheme([str(i)], f"Scheme {i}")
for i in range(N)
]
results = await asyncio.gather(*tasks)
for resp in results:
pass
# Spot check a few

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

#------------------------------------------------
import asyncio # used to run async functions

import pytest # used for our unit tests
from app.sources.external.jira.jira import JiraDataSource

---- Minimal stubs for required classes ----

class HTTPResponse:
"""Minimal stub for HTTPResponse used in tests."""
def init(self, status_code=200, json_data=None, text_data="", headers=None):
self.status_code = status_code
self._json_data = json_data or {}
self.text = text_data
self.headers = headers or {}

def json(self):
    return self._json_data

class HTTPRequest:
"""Minimal stub for HTTPRequest used in tests."""
def init(self, method, url, headers, path_params, query_params, body):
self.method = method
self.url = url
self.headers = headers
self.path_params = path_params
self.query_params = query_params
self.body = body

---- Mock JiraClient and HTTP client ----

class MockHTTPClient:
"""Mock async HTTP client for testing."""
def init(self, base_url='http://mocked-jira', execute_result=None, raise_on_execute=None):
self._base_url = base_url
self._execute_result = execute_result
self._raise_on_execute = raise_on_execute
self.last_request = None # Capture the last HTTPRequest for assertions

def get_base_url(self):
    return self._base_url

async def execute(self, req, **kwargs):
    self.last_request = req
    if self._raise_on_execute:
        raise self._raise_on_execute
    # Return a mock HTTPResponse
    return self._execute_result or HTTPResponse(
        status_code=201,
        json_data={"id": "10001", "name": req.body.get("name"), "issueTypeIds": req.body.get("issueTypeIds")},
        text_data="Created"
    )

class MockJiraClient:
"""Mock JiraClient for testing."""
def init(self, client=None):
self.client = client or MockHTTPClient()

def get_client(self):
    return self.client

from app.sources.external.jira.jira import JiraDataSource

---- UNIT TESTS ----

1. BASIC TEST CASES

@pytest.mark.asyncio
async def test_create_issue_type_scheme_basic_success():
"""Test basic successful creation of issue type scheme."""
client = MockJiraClient()
ds = JiraDataSource(client)
issueTypeIds = ['1001', '1002']
name = "Test Scheme"
resp = await ds.create_issue_type_scheme(issueTypeIds, name)

@pytest.mark.asyncio
async def test_create_issue_type_scheme_with_optional_fields():
"""Test creation with all optional fields provided."""
client = MockJiraClient()
ds = JiraDataSource(client)
issueTypeIds = ['1003']
name = "Scheme with Description"
defaultIssueTypeId = "1003"
description = "A test description"
resp = await ds.create_issue_type_scheme(
issueTypeIds,
name,
defaultIssueTypeId=defaultIssueTypeId,
description=description,
headers={"X-Test-Header": "yes"}
)
# Check the last request body contains the optional fields
last_req = client.get_client().last_request

@pytest.mark.asyncio
async def test_create_issue_type_scheme_empty_issueTypeIds():
"""Test creation with an empty issueTypeIds list (edge: required field empty)."""
client = MockJiraClient()
ds = JiraDataSource(client)
issueTypeIds = []
name = "Empty Issue Types"
resp = await ds.create_issue_type_scheme(issueTypeIds, name)

@pytest.mark.asyncio
async def test_create_issue_type_scheme_custom_headers_merge():
"""Test that custom headers are merged and Content-Type is set."""
client = MockJiraClient()
ds = JiraDataSource(client)
resp = await ds.create_issue_type_scheme(
["1001"], "Header Test", headers={"Authorization": "Bearer testtoken"}
)
last_req = client.get_client().last_request

2. EDGE TEST CASES

@pytest.mark.asyncio
async def test_create_issue_type_scheme_none_client_raises():
"""Test that ValueError is raised if JiraDataSource is initialized with None client."""
class BrokenClient:
def get_client(self):
return None
with pytest.raises(ValueError, match="HTTP client is not initialized"):
JiraDataSource(BrokenClient())

@pytest.mark.asyncio
async def test_create_issue_type_scheme_client_missing_get_base_url():
"""Test that ValueError is raised if client lacks get_base_url method."""
class NoBaseUrlClient:
def get_client(self):
class Dummy: pass
return Dummy()
with pytest.raises(ValueError, match="get_base_url"):
JiraDataSource(NoBaseUrlClient())

@pytest.mark.asyncio
async def test_create_issue_type_scheme_execute_raises_exception():
"""Test that exceptions from the HTTP client execute method are propagated."""
exc = RuntimeError("network error")
client = MockJiraClient(client=MockHTTPClient(raise_on_execute=exc))
ds = JiraDataSource(client)
with pytest.raises(RuntimeError, match="network error"):
await ds.create_issue_type_scheme(["1001"], "Should Fail")

@pytest.mark.asyncio
async def test_create_issue_type_scheme_concurrent_calls():
"""Test concurrent execution of multiple create_issue_type_scheme calls."""
client = MockJiraClient()
ds = JiraDataSource(client)
# Run several calls concurrently with different names
names = [f"Scheme {i}" for i in range(5)]
coros = [
ds.create_issue_type_scheme(["1000", str(i)], name)
for i, name in enumerate(names)
]
results = await asyncio.gather(*coros)
# All responses should be HTTPResponse and reflect the correct name
for i, resp in enumerate(results):
pass

@pytest.mark.asyncio
async def test_create_issue_type_scheme_non_ascii_fields():
"""Test that non-ASCII characters in name and description are handled."""
client = MockJiraClient()
ds = JiraDataSource(client)
name = "Тестовая схема" # Russian for "Test scheme"
description = "描述" # Chinese for "description"
resp = await ds.create_issue_type_scheme(["1001"], name, description=description)

3. LARGE SCALE TEST CASES

@pytest.mark.asyncio
async def test_create_issue_type_scheme_large_issueTypeIds_list():
"""Test with a large list of issueTypeIds."""
client = MockJiraClient()
ds = JiraDataSource(client)
issueTypeIds = [str(i) for i in range(200)] # Large but <1000
name = "Large List Scheme"
resp = await ds.create_issue_type_scheme(issueTypeIds, name)

@pytest.mark.asyncio
async def test_create_issue_type_scheme_many_concurrent_large_payloads():
"""Test concurrent execution with large payloads."""
client = MockJiraClient()
ds = JiraDataSource(client)
# 10 concurrent requests with different large issueTypeIds
coros = [
ds.create_issue_type_scheme([str(i) for i in range(100)], f"Scheme {i}")
for i in range(10)
]
results = await asyncio.gather(*coros)
for i, resp in enumerate(results):
pass

4. THROUGHPUT TEST CASES

@pytest.mark.asyncio
async def test_create_issue_type_scheme_throughput_small_load():
"""Throughput: Run a small number of concurrent requests and check all succeed."""
client = MockJiraClient()
ds = JiraDataSource(client)
coros = [
ds.create_issue_type_scheme(["1001"], f"Throughput {i}")
for i in range(5)
]
results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_create_issue_type_scheme_throughput_medium_load():
"""Throughput: Run a medium number of concurrent requests."""
client = MockJiraClient()
ds = JiraDataSource(client)
coros = [
ds.create_issue_type_scheme(["1001", "1002"], f"Throughput {i}")
for i in range(20)
]
results = await asyncio.gather(*coros)
# Check that each name is unique and correct
for i, resp in enumerate(results):
pass

@pytest.mark.asyncio
async def test_create_issue_type_scheme_throughput_high_volume():
"""Throughput: Run a high volume of concurrent requests (but <100 for speed)."""
client = MockJiraClient()
ds = JiraDataSource(client)
coros = [
ds.create_issue_type_scheme(["1001", "1002", "1003"], f"HighVolume {i}")
for i in range(50)
]
results = await asyncio.gather(*coros)
# Ensure all names are unique
names = set(resp.json()["name"] for resp in results)

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-JiraDataSource.create_issue_type_scheme-mhs5gq6s and push.

Codeflash Static Badge

The optimized code achieves a **7% runtime improvement** through a targeted memory allocation optimization in the `create_issue_type_scheme` method.

**Key Optimization:**
The original code created three empty dictionaries (`_path`, `_query`, `_body`) and then passed them through expensive `_as_str_dict()` conversions even when they were empty. The optimized version eliminates two of these allocations:

- **Removed unnecessary dict allocations:** `_path: Dict[str, Any] = {}` and `_query: Dict[str, Any] = {}` were eliminated since they're always empty for this endpoint
- **Direct empty dict literals:** Instead of calling `_as_str_dict(_path)` and `_as_str_dict(_query)`, the optimized code passes `{}` directly to the HTTPRequest constructor
- **Simplified URL construction:** `_safe_format_url(rel_path, {})` uses an empty dict literal instead of the `_path` variable

**Performance Impact:**
The line profiler shows the optimization reduced `_as_str_dict` calls from 855 hits (0.948ms) to 285 hits (0.625ms) - a **34% reduction** in this expensive string conversion function. The `path_params` and `query_params` processing overhead dropped significantly, as seen in the HTTPRequest construction time.

**Workload Benefits:**
This optimization is most effective for:
- **High-frequency API calls** where the 7% per-call improvement compounds significantly
- **Batch operations** creating multiple issue type schemes, as demonstrated in the throughput tests
- **Memory-constrained environments** where reducing allocations helps overall system performance

The optimization maintains identical functionality and error handling while reducing memory pressure and CPU overhead from unnecessary dictionary operations.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 November 9, 2025 20:12
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Nov 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant