|
26 | 26 | import static org.hamcrest.Matchers.is; |
27 | 27 | import static org.hamcrest.Matchers.lessThanOrEqualTo; |
28 | 28 | import static org.junit.Assert.assertEquals; |
| 29 | +import static org.mockito.Matchers.any; |
29 | 30 | import static org.mockito.Matchers.eq; |
30 | 31 | import static org.mockito.Mockito.mock; |
31 | 32 | import static org.mockito.Mockito.when; |
|
42 | 43 | import com.google.api.services.dataflow.model.MetricUpdate; |
43 | 44 | import com.google.cloud.dataflow.sdk.PipelineResult.State; |
44 | 45 | import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms; |
| 46 | +import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; |
45 | 47 | import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper; |
46 | 48 | import com.google.cloud.dataflow.sdk.transforms.Aggregator; |
47 | 49 | import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; |
@@ -91,6 +93,9 @@ public class DataflowPipelineJobTest { |
91 | 93 | @Rule |
92 | 94 | public ExpectedException thrown = ExpectedException.none(); |
93 | 95 |
|
| 96 | + @Rule |
| 97 | + public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowPipelineJob.class); |
| 98 | + |
94 | 99 | @Before |
95 | 100 | public void setup() { |
96 | 101 | MockitoAnnotations.initMocks(this); |
@@ -193,6 +198,34 @@ public void testWaitToFinishCancelled() throws Exception { |
193 | 198 | assertEquals(State.CANCELLED, mockWaitToFinishInState(State.CANCELLED)); |
194 | 199 | } |
195 | 200 |
|
| 201 | + /** |
| 202 | + * Test that {@link DataflowPipelineJob#cancel} doesn't throw if the Dataflow service returns |
| 203 | + * non-terminal state even though the cancel API call failed, which can happen in practice. |
| 204 | + * |
| 205 | + * <p>TODO: delete this code if the API calls become consistent. |
| 206 | + */ |
| 207 | + @Test |
| 208 | + public void testCancelTerminatedJobWithStaleState() throws IOException { |
| 209 | + Dataflow.Projects.Jobs.Get statusRequest = |
| 210 | + mock(Dataflow.Projects.Jobs.Get.class); |
| 211 | + |
| 212 | + Job statusResponse = new Job(); |
| 213 | + statusResponse.setCurrentState("JOB_STATE_RUNNING"); |
| 214 | + when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(statusRequest); |
| 215 | + when(statusRequest.execute()).thenReturn(statusResponse); |
| 216 | + |
| 217 | + Dataflow.Projects.Jobs.Update update = mock( |
| 218 | + Dataflow.Projects.Jobs.Update.class); |
| 219 | + when(mockJobs.update(eq(PROJECT_ID), eq(JOB_ID), any(Job.class))) |
| 220 | + .thenReturn(update); |
| 221 | + when(update.execute()).thenThrow(new IOException("Job has terminated in state SUCCESS")); |
| 222 | + |
| 223 | + DataflowPipelineJob job = new DataflowPipelineJob( |
| 224 | + PROJECT_ID, JOB_ID, mockWorkflowClient, null); |
| 225 | + job.cancel(); |
| 226 | + expectedLogs.verifyWarn("Cancel failed because job " + JOB_ID + " is already terminated."); |
| 227 | + } |
| 228 | + |
196 | 229 | /** |
197 | 230 | * Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED} |
198 | 231 | * state is terminal. |
|
0 commit comments