|
18 | 18 | GUARD_DUTY_TEMPLATE = "guardduty/template.yaml" |
19 | 19 | GUARD_DUTY_SAM_TEMPLATE = "guardduty/packaged.yaml" |
20 | 20 |
|
| 21 | +CLOUDWATCH_TEMPLATE = "guardduty/cloudwatchevents.json" |
| 22 | + |
21 | 23 | # Update the below values with preferred bucket name and aws region. |
22 | 24 | BUCKET_NAME = "" |
23 | 25 | AWS_REGION = os.environ.get("AWS_DEFAULT_REGION", "us-east-1") |
@@ -379,7 +381,7 @@ def setUp(self): |
379 | 381 | self.collector_name = "Test GuardDuty Lambda" |
380 | 382 | self.source_name = "GuardDuty" |
381 | 383 | self.source_category = "Labs/test/guard/duty" |
382 | | - self.finding_types = ["Policy:S3/AccountBlockPublicAccessDisabled", "Policy:S3/BucketPublicAccessGranted"] |
| 384 | + self.finding_types = ["DefenseEvasion:IAMUser/AnomalousBehavior", "Backdoor:EC2/Spambot"] |
383 | 385 | self.delay = 7 |
384 | 386 |
|
385 | 387 | # Get GuardDuty details |
@@ -427,5 +429,60 @@ def test_guard_duty(self): |
427 | 429 | assert len(self.sumo_resource.verificationErrors) == 0 |
428 | 430 |
|
429 | 431 |
|
| 432 | +class TestCloudWatchEvents(unittest.TestCase): |
| 433 | + |
| 434 | + def setUp(self): |
| 435 | + # Parameters |
| 436 | + self.collector_name = "Test CloudWatch Events Lambda" |
| 437 | + self.source_name = "CloudWatch Events" |
| 438 | + self.source_category = "Labs/test/cloudwatch/events" |
| 439 | + self.finding_types = ["Recon:IAMUser/MaliciousIPCaller.Custom", "Discovery:S3/TorIPCaller"] |
| 440 | + self.delay = 7 |
| 441 | + |
| 442 | + # Get GuardDuty details |
| 443 | + self.guard_duty = boto3.client('guardduty', AWS_REGION) |
| 444 | + response = self.guard_duty.list_detectors() |
| 445 | + if "DetectorIds" in response: |
| 446 | + self.detector_id = response["DetectorIds"][0] |
| 447 | + |
| 448 | + # Get Sumo Logic Client |
| 449 | + self.sumo_resource = SumoLogicResource(self.source_category, self.finding_types, self.delay) |
| 450 | + # Create a collector and http source for testing |
| 451 | + self.collector = self.sumo_resource.create_collector(self.collector_name) |
| 452 | + self.collector_id = self.collector['collector']['id'] |
| 453 | + self.source = self.sumo_resource.create_source(self.collector_id, self.source_name) |
| 454 | + self.source_id = self.source['source']['id'] |
| 455 | + |
| 456 | + # Get CloudFormation client |
| 457 | + self.cf = CloudFormation("TestCloudWatchEvents", CLOUDWATCH_TEMPLATE) |
| 458 | + self.parameters = { |
| 459 | + "SumoEndpointUrl": self.source['source']['url'], |
| 460 | + } |
| 461 | + |
| 462 | + def tearDown(self): |
| 463 | + if self.cf.stack_exists(): |
| 464 | + self.cf.delete_stack() |
| 465 | + self.sumo_resource.delete_source(self.collector_id, self.source) |
| 466 | + self.sumo_resource.delete_collector(self.collector) |
| 467 | + |
| 468 | + def test_cloudwatch_event(self): |
| 469 | + self.cf.create_stack(self.parameters) |
| 470 | + print("Testing Stack Creation.") |
| 471 | + self.assertTrue(self.cf.stack_exists()) |
| 472 | + # Generate some specific sample findings |
| 473 | + print("Generating sample CloudWatch Events.") |
| 474 | + self.guard_duty.create_sample_findings(DetectorId=self.detector_id, FindingTypes=self.finding_types) |
| 475 | + print("Waiting for %s minutes for logs to appear in Sumo Logic." % self.delay) |
| 476 | + time.sleep(self.delay * 60) |
| 477 | + # Go to SumoLogic and check if you received the logs |
| 478 | + # Assert one of the log for JSON format to check correctness |
| 479 | + print("Validate Logs in Sumo Logic.") |
| 480 | + self.sumo_resource.assert_logs() |
| 481 | + |
| 482 | + if len(self.sumo_resource.verificationErrors) > 0: |
| 483 | + print("Assertions failures are:- %s." % '\n'.join(self.sumo_resource.verificationErrors)) |
| 484 | + assert len(self.sumo_resource.verificationErrors) == 0 |
| 485 | + |
| 486 | + |
430 | 487 | if __name__ == '__main__': |
431 | 488 | unittest.main() |
0 commit comments