1- ## Copyright (c) 2021, 2022 Oracle and/or its affiliates.
1+ ## Copyright (c) 2021, 2024 Oracle and/or its affiliates.
22## The Universal Permissive License (UPL), Version 1.0 as shown at https://oss.oracle.com/licenses/upl/
33
44require 'fluent/plugin/output'
77require 'yajl'
88require 'yajl/json_gem'
99
10+ # require 'tzinfo'
1011require 'logger'
1112require_relative '../dto/logEventsJson'
1213require_relative '../dto/logEvents'
1314require_relative '../metrics/prometheusMetrics'
1415require_relative '../metrics/metricsLabels'
16+ require_relative '../enums/source'
1517
1618# Import only specific OCI modules to improve load times and reduce the memory requirements.
1719require 'oci/auth/auth'
3638require 'oci/waiter'
3739require 'oci/retry/retry'
3840require 'oci/object_storage/object_storage'
39-
4041module OCI
4142 class << self
4243 attr_accessor :sdk_name
@@ -97,7 +98,8 @@ class OutOracleOCILogAnalytics < Output
9798 config_param :zip_file_location , :string , :default => nil
9899 desc 'The kubernetes_metadata_keys_mapping.'
99100 config_param :kubernetes_metadata_keys_mapping , :hash , :default => { "container_name" :"Container" , "namespace_name" :"Namespace" , "pod_name" :"Pod" , "container_image" :"Container Image Name" , "host" :"Node" }
100-
101+ desc 'opc-meta-properties'
102+ config_param :collection_source , :string , :default => Source ::FLUENTD
101103
102104 #****************************************************************
103105 desc 'The http proxy to be used.'
@@ -256,6 +258,14 @@ def initialize_loganalytics_client()
256258 else
257259 @@loganalytics_client = OCI ::LogAnalytics ::LogAnalyticsClient . new ( config : OCI ::Config . new , signer : instance_principals_signer )
258260 end
261+ when "WorkloadIdentity"
262+ workload_identity_signer = OCI ::Auth ::Signers ::oke_workload_resource_principal_signer
263+ if is_valid ( @endpoint )
264+ @@loganalytics_client = OCI ::LogAnalytics ::LogAnalyticsClient . new ( config : OCI ::Config . new , endpoint : @endpoint , signer : workload_identity_signer )
265+ @@logger . info { "loganalytics_client initialised with endpoint: #{ @endpoint } " }
266+ else
267+ @@loganalytics_client = OCI ::LogAnalytics ::LogAnalyticsClient . new ( config : OCI ::Config . new , signer : workload_identity_signer )
268+ end
259269 when "ConfigFile"
260270 my_config = OCI ::ConfigFileLoader . load_config ( config_file_location : @config_file_location , profile_name : @profile_name )
261271 if is_valid ( @endpoint )
@@ -628,6 +638,8 @@ def group_by_logGroupId(chunk)
628638 latency = 0
629639 records_per_tag = 0
630640
641+
642+
631643 tag_metrics_set = Hash . new
632644 logGroup_labels_set = Hash . new
633645
@@ -637,8 +649,8 @@ def group_by_logGroupId(chunk)
637649 tags_per_logGroupId = Hash . new
638650 tag_logSet_map = Hash . new
639651 tag_metadata_map = Hash . new
652+ timezoneValuesByTag = Hash . new
640653 incoming_records = 0
641-
642654 chunk . each do |time , record |
643655 incoming_records += 1
644656 metricsLabels = MetricsLabels . new
@@ -722,6 +734,8 @@ def group_by_logGroupId(chunk)
722734 end
723735 next
724736 end
737+
738+ # metricsLabels.timezone = record["oci_la_timezone"]
725739 metricsLabels . logGroupId = record [ "oci_la_log_group_id" ]
726740 metricsLabels . logSourceName = record [ "oci_la_log_source_name" ]
727741 if record [ "oci_la_log_set" ] != nil
@@ -770,6 +784,25 @@ def group_by_logGroupId(chunk)
770784 tags_per_logGroupId [ record [ "oci_la_log_group_id" ] ] = record [ "tag" ]
771785 end
772786 end
787+ # validating the timezone field
788+ if !timezoneValuesByTag . has_key? ( record [ "tag" ] )
789+ begin
790+ timezoneIdentifier = record [ "oci_la_timezone" ]
791+ unless is_valid ( timezoneIdentifier )
792+ record [ "oci_la_timezone" ] = nil
793+ else
794+ isTimezoneExist = timezone_exist? timezoneIdentifier
795+ unless isTimezoneExist
796+ @@logger . warn { "Invalid timezone '#{ timezoneIdentifier } ', using default UTC." }
797+ record [ "oci_la_timezone" ] = "UTC"
798+ end
799+
800+ end
801+ timezoneValuesByTag [ record [ "tag" ] ] = record [ "oci_la_timezone" ]
802+ end
803+ else
804+ record [ "oci_la_timezone" ] = timezoneValuesByTag [ record [ "tag" ] ]
805+ end
773806
774807 records << record
775808 ensure
@@ -916,6 +949,14 @@ def write(chunk)
916949 end
917950 end
918951 end
952+ def timezone_exist? ( tz )
953+ begin
954+ TZInfo ::Timezone . get ( tz )
955+ return true
956+ rescue TZInfo ::InvalidTimezoneIdentifier
957+ return false
958+ end
959+ end
919960
920961 # Each oci_la_log_set will correspond to a separate file in the zip
921962 # Only MAX_FILES_PER_ZIP files are allowed per zip.
@@ -958,6 +999,21 @@ def get_logSets_map_per_logGroupId(oci_la_log_group_id,records_per_logGroupId)
958999
9591000 # takes a fluentD chunk and converts it to an in-memory zipfile, populating metrics hash provided
9601001 # Any exception raised is passed into the metrics hash, to be re-thrown from write()
1002+ def getCollectionSource ( input )
1003+ collections_src = [ ]
1004+ if !is_valid input
1005+ collections_src . unshift ( "source:#{ Source ::FLUENTD } " )
1006+ else
1007+ if input == Source ::FLUENTD . to_s or input == Source ::KUBERNETES_SOLUTION . to_s
1008+ collections_src . unshift ( "source:#{ input } " )
1009+ else
1010+ # source not define ! using default source 'fluentd'
1011+ collections_src . unshift ( "source:#{ Source ::FLUENTD } " )
1012+ end
1013+ end
1014+ collections_src
1015+ end
1016+
9611017 def get_zipped_stream ( oci_la_log_group_id , oci_la_global_metadata , records_per_logSet_map )
9621018 begin
9631019 current , = Time . now
@@ -970,8 +1026,9 @@ def get_zipped_stream(oci_la_log_group_id,oci_la_global_metadata,records_per_log
9701026 record [ 'oci_la_metadata' ] ,
9711027 record [ 'oci_la_entity_id' ] ,
9721028 record [ 'oci_la_entity_type' ] ,
973- record [ 'oci_la_log_source_name' ] ,
974- record [ 'oci_la_log_path' ]
1029+ record [ 'oci_la_log_source_name' ] ,
1030+ record [ 'oci_la_log_path' ] ,
1031+ record [ 'oci_la_timezone' ]
9751032 ] } . map { |lrpe_key , records_per_lrpe |
9761033 number_of_records += records_per_lrpe . length
9771034 LogEvents . new ( lrpe_key , records_per_lrpe )
@@ -1021,9 +1078,10 @@ def save_zip_to_local(oci_la_log_group_id, zippedstream, current_s)
10211078 # upload zipped stream to oci
10221079 def upload_to_oci ( oci_la_log_group_id , number_of_records , zippedstream , metricsLabels_array )
10231080 begin
1081+ collection_src_prop = getCollectionSource @collection_source
10241082 error_reason = nil
10251083 error_code = nil
1026- opts = { payload_type : "ZIP" }
1084+ opts = { payload_type : "ZIP" , opc_meta_properties : collection_src_prop }
10271085
10281086 response = @@loganalytics_client . upload_log_events_file ( namespace_name = @namespace ,
10291087 logGroupId = oci_la_log_group_id ,
0 commit comments