Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions apps/worker/services/tests/test_lock_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import threading
import time
from unittest.mock import MagicMock

Expand Down Expand Up @@ -286,6 +287,116 @@ def test_locked_warning_logged_on_lock_error(self, mock_redis, caplog):
assert warning_logs[0].__dict__["retry_num"] == 2
assert "countdown" in warning_logs[0].__dict__

def test_locked_blocking_timeout_none_causes_indefinite_blocking(self, mock_redis):
"""
Test that blocking_timeout=None causes indefinite blocking, preventing retry logic.

This test detects the bug where blocking_timeout=None causes Redis to block
indefinitely, preventing LockError from being raised and disabling retry logic.
"""
# Simulate a lock that's already held - Redis would block indefinitely
# when blocking_timeout=None
blocking_lock = MagicMock()
blocking_lock.__enter__ = MagicMock()
blocking_lock.__exit__ = MagicMock(return_value=None)

# Track what blocking_timeout was passed
blocking_timeouts = []
lock_called = threading.Event()

def mock_lock(*args, **kwargs):
blocking_timeouts.append(kwargs.get("blocking_timeout"))
lock_called.set()
if kwargs.get("blocking_timeout") is None:
# With blocking_timeout=None, Redis blocks forever and never raises LockError
# This simulates the blocking behavior - it would hang indefinitely
# We use a longer sleep to ensure the thread hangs
time.sleep(1.0) # Simulate blocking (longer than thread timeout)
return blocking_lock
else:
# With blocking_timeout set, Redis raises LockError immediately
raise LockError()

mock_redis.lock = mock_lock

manager = LockManager(repoid=123, commitid="abc123", blocking_timeout=None)

# Use threading to detect if the code hangs
exception_raised = []
completed = threading.Event()

def test_locked():
try:
with manager.locked(LockType.UPLOAD):
pass
completed.set()
except LockRetry:
exception_raised.append("LockRetry")
completed.set()
except Exception as e:
exception_raised.append(f"Other: {type(e).__name__}")
completed.set()

thread = threading.Thread(target=test_locked)
thread.daemon = True
thread.start()

# Wait for lock to be called
lock_called.wait(timeout=1.0)

# Wait a short time to see if it completes (it shouldn't with blocking_timeout=None)
completed.wait(timeout=0.3)

# Join with a short timeout to detect if thread is still running
thread.join(timeout=0.2)

# Verify that blocking_timeout=None was passed to Redis
assert None in blocking_timeouts, (
"blocking_timeout=None should have been passed to Redis"
)

# With the bug, the thread hangs because blocking_timeout=None causes indefinite blocking
# The thread should still be alive (BUG!) - this documents the bug behavior
# This test passes when it correctly identifies the bug: indefinite blocking prevents retry logic
assert thread.is_alive(), (
"Expected thread to hang with blocking_timeout=None (bug behavior), "
"but thread completed. This test documents that blocking_timeout=None "
"causes indefinite blocking, preventing LockError from being raised."
)

# Verify that LockRetry was NOT raised (because LockError never happens)
assert "LockRetry" not in exception_raised, (
"LockRetry should NOT be raised with blocking_timeout=None because "
"LockError is never raised (Redis blocks indefinitely). "
"This is the bug: retry logic is disabled."
)

def test_locked_blocking_timeout_enables_retry_logic(self, mock_redis):
"""
Test that blocking_timeout with a value enables retry logic by raising LockError.

This is the correct behavior - when blocking_timeout is set, Redis raises
LockError after the timeout, which enables the retry logic.
"""
# When blocking_timeout is set, Redis raises LockError after timeout
mock_redis.lock.side_effect = LockError()

manager = LockManager(repoid=123, commitid="abc123", blocking_timeout=5)

# This should raise LockRetry immediately (not hang)
with pytest.raises(LockRetry) as exc_info:
with manager.locked(LockType.UPLOAD, retry_num=0):
pass

# Verify LockRetry was raised with countdown
assert exc_info.value.countdown > 0
assert isinstance(exc_info.value.countdown, int)

# Verify blocking_timeout=5 was passed to Redis
mock_redis.lock.assert_called_once_with(
"upload_lock_123_abc123", timeout=300, blocking_timeout=5
)


class TestLockRetry:
def test_lock_retry_init(self):
Expand Down
131 changes: 116 additions & 15 deletions apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from unittest.mock import ANY

import pytest
from celery.exceptions import Retry
from redis.exceptions import LockError

from database.enums import ReportType
Expand Down Expand Up @@ -144,6 +143,7 @@ def test_bundle_analysis_processor_task_error(

task = BundleAnalysisProcessorTask()
retry = mocker.patch.object(task, "retry")
task.request.retries = 0

result = task.run_impl(
dbsession,
Expand Down Expand Up @@ -339,23 +339,123 @@ def test_bundle_analysis_processor_task_locked(
dbsession.flush()

task = BundleAnalysisProcessorTask()
retry = mocker.patch.object(task, "retry")
# The task uses self.retry() directly when LockRetry is raised
retry_mock = mocker.patch.object(task, "retry", side_effect=Retry())

result = task.run_impl(
dbsession,
[{"previous": "result"}],
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
params={
"upload_id": upload.id_,
"commit": commit.commitid,
# When LockError is raised, LockRetry is raised, which calls self.retry()
# self.retry() raises Retry exception
with pytest.raises(Retry):
task.run_impl(
dbsession,
[{"previous": "result"}],
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
params={
"upload_id": upload.id_,
"commit": commit.commitid,
},
)

assert upload.state == "started"
# Verify retry was called with countdown
retry_mock.assert_called_once()
call_args = retry_mock.call_args
assert "countdown" in call_args.kwargs, (
"retry should have been called with countdown parameter"
)


def test_bundle_analysis_processor_task_uses_default_blocking_timeout(
mocker,
dbsession,
mock_storage,
mock_redis,
):
"""
Test that BundleAnalysisProcessorTask uses default blocking_timeout (not None).

This test verifies that the task doesn't use blocking_timeout=None, which would
cause indefinite blocking and disable retry logic. The task should use the
default blocking_timeout to enable proper retry behavior.
"""
storage_path = (
"v1/repos/testing/ed1bdd67-8fd2-4cdb-ac9e-39b99e4a3892/bundle_report.sqlite"
)
mock_storage.write_file(get_bucket_name(), storage_path, "test-content")

mocker.patch.object(
BundleAnalysisProcessorTask,
"app",
tasks={
bundle_analysis_save_measurements_task_name: mocker.MagicMock(),
},
)
assert result is None

assert upload.state == "started"
retry.assert_called_once_with(countdown=ANY)
commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()

commit_report = CommitReport(commit_id=commit.id_)
dbsession.add(commit_report)
dbsession.flush()

upload = UploadFactory.create(
state="started",
storage_path=storage_path,
report=commit_report,
)
dbsession.add(upload)
dbsession.flush()

# Track what blocking_timeout was passed to Redis
blocking_timeouts = []

# Mock Redis to raise LockError (simulating lock contention)
# This should happen immediately with a proper blocking_timeout
def mock_lock(*args, **kwargs):
blocking_timeouts.append(kwargs.get("blocking_timeout"))
raise LockError()

mock_redis.lock = mock_lock

task = BundleAnalysisProcessorTask()
# The task uses self.retry() directly, not safe_retry()
retry_mock = mocker.patch.object(task, "retry", side_effect=Retry())

# When LockError is raised, LockRetry is raised, which calls self.retry()
# self.retry() raises Retry exception
with pytest.raises(Retry):
task.run_impl(
dbsession,
[{"previous": "result"}],
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
params={
"upload_id": upload.id_,
"commit": commit.commitid,
},
)

# Verify that blocking_timeout was NOT None
# The default should be DEFAULT_BLOCKING_TIMEOUT_SECONDS (5)
assert None not in blocking_timeouts, (
"blocking_timeout=None was used! This causes indefinite blocking "
"and disables retry logic. Use default blocking_timeout instead."
)

# Verify that retry logic was triggered
assert retry_mock.called, (
"self.retry() should have been called when lock contention occurs"
)

# Verify retry was called with countdown
retry_mock.assert_called_once()
call_args = retry_mock.call_args
assert "countdown" in call_args.kwargs or len(call_args.args) > 0, (
"retry should have been called with countdown parameter"
)


def test_bundle_analysis_process_upload_rate_limit_error(
Expand Down Expand Up @@ -390,6 +490,7 @@ def test_bundle_analysis_process_upload_rate_limit_error(

task = BundleAnalysisProcessorTask()
retry = mocker.patch.object(task, "retry")
task.request.retries = 0

ingest = mocker.patch("shared.bundle_analysis.BundleAnalysisReport.ingest")
ingest.side_effect = PutRequestRateLimitError()
Expand Down