⚡️ Speed up method JiraDataSource.create_issue_type_screen_scheme by 5%
#553
+57
−26
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
📄 5% (0.05x) speedup for
JiraDataSource.create_issue_type_screen_schemeinbackend/python/app/sources/external/jira/jira.py⏱️ Runtime :
2.29 milliseconds→2.17 milliseconds(best of250runs)📝 Explanation and details
The optimization achieves a 5% runtime improvement through strategic memory allocation reduction in the HTTP request construction path. Here's what was optimized:
Key Optimizations:
Module-level constant reuse: Replaced repeated dict allocations with pre-allocated constants:
_EMPTY_DICTfor always-empty_pathand_queryparameters_DEFAULT_HEADERSfor the standard Content-Type headerConditional header construction: The original code always created a new dict with
dict(headers or {})and calledsetdefault(). The optimized version only allocates when headers are provided, otherwise reuses the constant.Safer URL formatting: Changed from
.format(**request.path_params)to.format_map(request.path_params)in HTTPClient for more robust parameter substitution.Performance Impact:
The line profiler shows the optimization reduced overhead in dictionary operations - particularly beneficial for the common case where no custom headers are provided (552/556 calls). The
_as_str_dictfunction time decreased from 1.81ms to 1.72ms, and overall request construction became more efficient.Test Case Performance:
The optimization is most effective for:
While throughput remained constant at 139k ops/sec due to the async nature being I/O bound, the 5% runtime reduction means less CPU overhead per request, which is valuable in high-frequency API scenarios where this Jira integration might be called repeatedly.
✅ Correctness verification report:
🌀 Generated Regression Tests and Runtime
import asyncio # used to run async functions
import pytest # used for our unit tests
from app.sources.external.jira.jira import JiraDataSource
--- Minimal stubs for dependencies to allow testing JiraDataSource.create_issue_type_screen_scheme ---
class HTTPResponse:
"""Stub for HTTPResponse, simulates a real HTTP response."""
def init(self, data):
self.data = data
class HTTPRequest:
"""Stub for HTTPRequest, simulates an HTTP request object."""
def init(self, method, url, headers, path_params, query_params, body):
self.method = method
self.url = url
self.headers = headers
self.path_params = path_params
self.query_params = query_params
self.body = body
--- Stubs for JiraClient and its get_client() and get_base_url() ---
class DummyClient:
"""Stub for the underlying HTTP client used by JiraDataSource."""
def init(self, base_url="http://jira.example.com"):
self._base_url = base_url
self.executed_requests = []
class JiraClient:
"""Stub JiraClient that returns a DummyClient."""
def init(self, client):
self.client = client
from app.sources.external.jira.jira import JiraDataSource
--- Unit tests for JiraDataSource.create_issue_type_screen_scheme ---
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_basic():
"""Basic test: normal usage with required fields."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [{"issueTypeId": "10001", "screenSchemeId": "20001"}]
name = "My Screen Scheme"
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name)
data = resp.json()
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_with_description_and_headers():
"""Test with optional description and custom headers."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [{"issueTypeId": "10002", "screenSchemeId": "20002"}]
name = "Another Scheme"
description = "This is a test scheme"
headers = {"X-Custom-Header": "foobar"}
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name, description, headers)
data = resp.json()
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_empty_issueTypeMappings():
"""Edge case: empty issueTypeMappings list."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = []
name = "EmptyMappings"
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name)
data = resp.json()
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_none_client_raises():
"""Edge case: JiraDataSource with None client should raise ValueError."""
class BadJiraClient:
def get_client(self):
return None
with pytest.raises(ValueError, match="HTTP client is not initialized"):
JiraDataSource(BadJiraClient())
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_client_missing_get_base_url():
"""Edge case: client missing get_base_url method should raise ValueError."""
class BadClient:
pass
class BadJiraClient:
def get_client(self):
return BadClient()
with pytest.raises(ValueError, match="HTTP client does not have get_base_url method"):
JiraDataSource(BadJiraClient())
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_concurrent():
"""Edge case: concurrent calls with different data."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
# Prepare several coroutines with different names and mappings
coros = [
ds.create_issue_type_screen_scheme(
[{"issueTypeId": str(i), "screenSchemeId": str(i + 100)}],
f"Scheme {i}",
description=f"Desc {i}"
)
for i in range(5)
]
results = await asyncio.gather(*coros)
for i, resp in enumerate(results):
data = resp.json()
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_headers_override_content_type():
"""Edge case: headers explicitly override Content-Type."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [{"issueTypeId": "101", "screenSchemeId": "201"}]
name = "OverrideContentType"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name, headers=headers)
data = resp.json()
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_large_issueTypeMappings():
"""Large scale: test with a large number of issueTypeMappings."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [
{"issueTypeId": str(i), "screenSchemeId": str(i + 100)}
for i in range(200)
]
name = "LargeScheme"
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name)
data = resp.json()
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_concurrent_large_scale():
"""Large scale: concurrent execution with many coroutines."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
coros = [
ds.create_issue_type_screen_scheme(
[{"issueTypeId": str(i), "screenSchemeId": str(i + 100)}],
f"Scheme {i}"
)
for i in range(50)
]
results = await asyncio.gather(*coros)
# Assert all responses are correct
for i, resp in enumerate(results):
data = resp.json()
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_throughput_small_load():
"""Throughput test: small load, 10 requests."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
coros = [
ds.create_issue_type_screen_scheme(
[{"issueTypeId": str(i), "screenSchemeId": str(i + 10)}],
f"Scheme {i}"
)
for i in range(10)
]
results = await asyncio.gather(*coros)
for i, resp in enumerate(results):
data = resp.json()
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_throughput_medium_load():
"""Throughput test: medium load, 100 requests."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
coros = [
ds.create_issue_type_screen_scheme(
[{"issueTypeId": str(i), "screenSchemeId": str(i + 100)}],
f"Scheme {i}"
)
for i in range(100)
]
results = await asyncio.gather(*coros)
# Spot check a few results
for i in [0, 50, 99]:
data = results[i].json()
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_throughput_high_volume():
"""Throughput test: high volume, 200 requests."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
coros = [
ds.create_issue_type_screen_scheme(
[{"issueTypeId": str(i), "screenSchemeId": str(i + 200)}],
f"Scheme {i}"
)
for i in range(200)
]
results = await asyncio.gather(*coros)
# Spot check a few results
for i in [0, 100, 199]:
data = results[i].json()
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_async_await_behavior():
"""Basic async/await behavior: ensure coroutine returns only when awaited."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [{"issueTypeId": "123", "screenSchemeId": "456"}]
name = "AwaitTest"
codeflash_output = ds.create_issue_type_screen_scheme(issueTypeMappings, name); coro = codeflash_output
resp = await coro
data = resp.json()
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_invalid_headers_type():
"""Edge case: headers is not a dict (should fallback to empty dict)."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [{"issueTypeId": "999", "screenSchemeId": "888"}]
name = "BadHeadersType"
# Pass headers as a list (should fallback to empty dict)
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name, headers=None)
data = resp.json()
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_special_characters():
"""Edge case: name and description with special characters."""
client = JiraClient(DummyClient())
ds = JiraDataSource(client)
issueTypeMappings = [{"issueTypeId": "spc", "screenSchemeId": "spc"}]
name = "特殊字符!@#"
description = "描述 with emoji 🚀"
resp = await ds.create_issue_type_screen_scheme(issueTypeMappings, name, description)
data = resp.json()
codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
import asyncio # used to run async functions
from typing import Any, Dict, List, Optional
import pytest # used for our unit tests
from app.sources.external.jira.jira import JiraDataSource
Minimal HTTPResponse and HTTPRequest for testing
class HTTPResponse:
def init(self, data: Any, status_code: int = 200):
self.data = data
self.status_code = status_code
class HTTPRequest:
def init(self, method, url, headers, path_params, query_params, body):
self.method = method
self.url = url
self.headers = headers
self.path_params = path_params
self.query_params = query_params
self.body = body
Mock Jira REST client
class MockJiraRESTClient:
def init(self, base_url: str):
self.base_url = base_url
self.executed_requests = []
class JiraClient:
def init(self, client):
self.client = client
from app.sources.external.jira.jira import JiraDataSource
---------------------- TESTS BEGIN HERE ----------------------
1. BASIC TEST CASES
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_basic():
"""Test basic creation with required fields only."""
client = MockJiraRESTClient("https://example.atlassian.net")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": "10001", "screenSchemeId": "20001"}]
name = "Test Scheme"
resp = await ds.create_issue_type_screen_scheme(mappings, name)
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_with_description_and_headers():
"""Test creation with description and custom headers."""
client = MockJiraRESTClient("https://example.net")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": "2", "screenSchemeId": "3"}]
name = "Scheme with Desc"
description = "A test scheme with description"
headers = {"X-Custom-Header": "foobar"}
resp = await ds.create_issue_type_screen_scheme(mappings, name, description, headers)
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_empty_mappings():
"""Test with empty issueTypeMappings list."""
client = MockJiraRESTClient("https://jira.com")
ds = JiraDataSource(JiraClient(client))
mappings = []
name = "EmptyMappings"
resp = await ds.create_issue_type_screen_scheme(mappings, name)
2. EDGE TEST CASES
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_invalid_client_raises():
"""Test ValueError is raised if client is not initialized."""
class DummyJiraClient:
def get_client(self):
return None
with pytest.raises(ValueError, match="HTTP client is not initialized"):
JiraDataSource(DummyJiraClient())
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_missing_get_base_url():
"""Test ValueError is raised if client lacks get_base_url method."""
class DummyClient:
pass
class DummyJiraClient:
def get_client(self):
return DummyClient()
with pytest.raises(ValueError, match="HTTP client does not have get_base_url method"):
JiraDataSource(DummyJiraClient())
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_concurrent_execution():
"""Test concurrent execution of multiple requests."""
client = MockJiraRESTClient("https://concurrent.test")
ds = JiraDataSource(JiraClient(client))
mappings1 = [{"issueTypeId": "1", "screenSchemeId": "1"}]
mappings2 = [{"issueTypeId": "2", "screenSchemeId": "2"}]
name1 = "Scheme1"
name2 = "Scheme2"
# Run two requests concurrently
resp1, resp2 = await asyncio.gather(
ds.create_issue_type_screen_scheme(mappings1, name1),
ds.create_issue_type_screen_scheme(mappings2, name2, "desc2")
)
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_headers_override():
"""Test that user-provided Content-Type header is not overwritten."""
client = MockJiraRESTClient("https://header.override")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": "x", "screenSchemeId": "y"}]
name = "HeaderOverride"
headers = {"Content-Type": "application/xml"}
resp = await ds.create_issue_type_screen_scheme(mappings, name, headers=headers)
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_long_name_and_description():
"""Test with long name and description strings."""
client = MockJiraRESTClient("https://long.strings")
ds = JiraDataSource(JiraClient(client))
long_name = "N" * 255
long_desc = "D" * 1024
mappings = [{"issueTypeId": "a", "screenSchemeId": "b"}]
resp = await ds.create_issue_type_screen_scheme(mappings, long_name, long_desc)
3. LARGE SCALE TEST CASES
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_many_mappings():
"""Test with a large number of mappings (scalability)."""
client = MockJiraRESTClient("https://large.mappings")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": str(i), "screenSchemeId": str(i)} for i in range(200)]
name = "LargeMappings"
resp = await ds.create_issue_type_screen_scheme(mappings, name)
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_concurrent_high_volume():
"""Test concurrent execution with many parallel requests."""
client = MockJiraRESTClient("https://parallel.test")
ds = JiraDataSource(JiraClient(client))
names = [f"Scheme_{i}" for i in range(20)]
mappings = [{"issueTypeId": str(i), "screenSchemeId": str(i)} for i in range(3)]
# Launch 20 concurrent requests
responses = await asyncio.gather(*[
ds.create_issue_type_screen_scheme(mappings, name)
for name in names
])
for i, resp in enumerate(responses):
pass
4. THROUGHPUT TEST CASES
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_throughput_small_load():
"""Throughput: Test small batch of requests completes successfully."""
client = MockJiraRESTClient("https://throughput.small")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": "1", "screenSchemeId": "1"}]
# 5 concurrent requests
results = await asyncio.gather(*[
ds.create_issue_type_screen_scheme(mappings, f"SmallLoad_{i}")
for i in range(5)
])
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_throughput_medium_load():
"""Throughput: Test medium batch of requests completes successfully."""
client = MockJiraRESTClient("https://throughput.medium")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": "m", "screenSchemeId": "m"}]
# 50 concurrent requests
results = await asyncio.gather(*[
ds.create_issue_type_screen_scheme(mappings, f"MediumLoad_{i}")
for i in range(50)
])
# Check that all names are unique in the responses
names = set(r.data["received"]["name"] for r in results)
@pytest.mark.asyncio
async def test_create_issue_type_screen_scheme_throughput_large_load():
"""Throughput: Test large batch of requests completes successfully."""
client = MockJiraRESTClient("https://throughput.large")
ds = JiraDataSource(JiraClient(client))
mappings = [{"issueTypeId": "l", "screenSchemeId": "l"}]
# 100 concurrent requests (keep < 1000 for performance)
results = await asyncio.gather(*[
ds.create_issue_type_screen_scheme(mappings, f"LargeLoad_{i}")
for i in range(100)
])
# Check that all responses are HTTPResponse and have correct mapping
for r in results:
pass
codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
To edit these changes
git checkout codeflash/optimize-JiraDataSource.create_issue_type_screen_scheme-mhs9wi1kand push.