diff --git a/translator/config/schema.json b/translator/config/schema.json index 5ac45126881..9b83cf2dda7 100644 --- a/translator/config/schema.json +++ b/translator/config/schema.json @@ -1390,6 +1390,13 @@ "type": "string", "minLength": 1, "maxLength": 259 + }, + "log_group_name": { + "description": "CloudWatch log group name for OTLP metrics", + "type": "string" + }, + "emf_processor": { + "$ref": "#/definitions/emfProcessorDefinition" } }, "additionalProperties": false diff --git a/translator/tocwconfig/sampleConfig/otlp_metrics_cloudwatchlogs_config.json b/translator/tocwconfig/sampleConfig/otlp_metrics_cloudwatchlogs_config.json index 2291a3c3c21..d655a51ea1d 100644 --- a/translator/tocwconfig/sampleConfig/otlp_metrics_cloudwatchlogs_config.json +++ b/translator/tocwconfig/sampleConfig/otlp_metrics_cloudwatchlogs_config.json @@ -8,6 +8,14 @@ "tls": { "cert_file": "/path/to/cert.pem", "key_file": "/path/to/key.pem" + }, + "log_group_name": "/aws/application/otlp", + "emf_processor": { + "metric_namespace": "MyApplication/OTLP", + "metric_unit": { + "request_duration": "Milliseconds", + "request_count": "Count" + } } } } diff --git a/translator/tocwconfig/sampleConfig/otlp_metrics_cloudwatchlogs_config.yaml b/translator/tocwconfig/sampleConfig/otlp_metrics_cloudwatchlogs_config.yaml index af5fabba845..48ef5a88ffa 100644 --- a/translator/tocwconfig/sampleConfig/otlp_metrics_cloudwatchlogs_config.yaml +++ b/translator/tocwconfig/sampleConfig/otlp_metrics_cloudwatchlogs_config.yaml @@ -11,12 +11,19 @@ exporters: external_id: "" imds_retries: 1 local_mode: false - log_group_name: /aws/cwagent + log_group_name: /aws/application/otlp log_retention: 0 log_stream_name: "" max_retries: 2 + metric_descriptors: + - metric_name: request_count + overwrite: false + unit: Count + - metric_name: request_duration + overwrite: false + unit: Milliseconds middleware: agenthealth/logs - namespace: CWAgent + namespace: MyApplication/OTLP no_verify_ssl: false num_workers: 8 output_destination: cloudwatch diff --git a/translator/tocwconfig/sampleConfig/otlp_prometheus_combined_config.conf b/translator/tocwconfig/sampleConfig/otlp_prometheus_combined_config.conf new file mode 100644 index 00000000000..364841e49fa --- /dev/null +++ b/translator/tocwconfig/sampleConfig/otlp_prometheus_combined_config.conf @@ -0,0 +1,23 @@ +[agent] + collection_jitter = "0s" + debug = false + flush_interval = "1s" + flush_jitter = "0s" + hostname = "" + interval = "60s" + logfile = "/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log" + logtarget = "lumberjack" + metric_batch_size = 1000 + metric_buffer_limit = 10000 + omit_hostname = false + precision = "" + quiet = false + round_interval = false + +[[inputs.prometheus]] + prometheus_config_path = "/etc/prometheus/prometheus.yaml" + +[[outputs.cloudwatchlogs]] + force_flush_interval = "30s" + log_stream_name = "i-UNKNOWN" + region = "us-west-2" diff --git a/translator/tocwconfig/sampleConfig/otlp_prometheus_combined_config.json b/translator/tocwconfig/sampleConfig/otlp_prometheus_combined_config.json new file mode 100644 index 00000000000..9f994b91c08 --- /dev/null +++ b/translator/tocwconfig/sampleConfig/otlp_prometheus_combined_config.json @@ -0,0 +1,44 @@ +{ + "logs": { + "force_flush_interval": 30, + "metrics_collected": { + "prometheus": { + "log_group_name": "/aws/prometheus/metrics", + "prometheus_config_path": "/etc/prometheus/prometheus.yaml", + "emf_processor": { + "metric_namespace": "PrometheusApp", + "metric_unit": { + "http_requests_total": "Count", + "http_request_duration_seconds": "Seconds" + }, + "metric_declaration": [ + { + "source_labels": ["job"], + "label_matcher": "^kubernetes-pod-jmx$", + "dimensions": [ + ["ClusterName", "Namespace"] + ], + "metric_selectors": [ + "^jvm_threads_current$", + "^jvm_memory_bytes_used$" + ] + } + ] + } + }, + "otlp": { + "grpc_endpoint": "0.0.0.0:4317", + "http_endpoint": "0.0.0.0:4318", + "log_group_name": "/aws/otlp/metrics", + "emf_processor": { + "metric_namespace": "OTLPApp", + "metric_unit": { + "request_duration": "Milliseconds", + "request_count": "Count", + "error_rate": "Percent" + } + } + } + } + } +} diff --git a/translator/tocwconfig/sampleConfig/otlp_prometheus_combined_config.yaml b/translator/tocwconfig/sampleConfig/otlp_prometheus_combined_config.yaml new file mode 100644 index 00000000000..9d15496d005 --- /dev/null +++ b/translator/tocwconfig/sampleConfig/otlp_prometheus_combined_config.yaml @@ -0,0 +1,195 @@ +exporters: + awsemf: + add_entity: true + certificate_file_path: "" + detailed_metrics: false + dimension_rollup_option: NoDimensionRollup + disable_metric_extraction: false + eks_fargate_container_insights_enabled: false + endpoint: "" + enhanced_container_insights: false + external_id: "" + imds_retries: 1 + local_mode: false + log_group_name: /aws/otlp/metrics + log_retention: 0 + log_stream_name: "" + max_retries: 2 + metric_descriptors: + - metric_name: request_count + overwrite: false + unit: Count + - metric_name: error_rate + overwrite: false + unit: Percent + - metric_name: request_duration + overwrite: false + unit: Milliseconds + middleware: agenthealth/logs + namespace: OTLPApp + no_verify_ssl: false + num_workers: 8 + output_destination: cloudwatch + profile: "" + proxy_address: "" + region: us-west-2 + request_timeout_seconds: 30 + resource_arn: "" + resource_to_telemetry_conversion: + enabled: true + retain_initial_value_of_delta_metric: false + role_arn: "" + version: "0" + awsemf/prometheus: + add_entity: false + certificate_file_path: "" + detailed_metrics: false + dimension_rollup_option: NoDimensionRollup + disable_metric_extraction: false + eks_fargate_container_insights_enabled: false + endpoint: "" + enhanced_container_insights: false + external_id: "" + imds_retries: 1 + local_mode: false + log_group_name: /aws/prometheus/metrics + log_retention: 0 + log_stream_name: '{JobName}' + max_retries: 2 + metric_declarations: + - dimensions: + - - ClusterName + - Namespace + label_matchers: + - label_names: + - job + regex: ^kubernetes-pod-jmx$ + separator: ; + metric_name_selectors: + - ^jvm_threads_current$ + - ^jvm_memory_bytes_used$ + metric_descriptors: + - metric_name: http_request_duration_seconds + overwrite: false + unit: Seconds + - metric_name: http_requests_total + overwrite: false + unit: Count + middleware: agenthealth/logs + namespace: PrometheusApp + no_verify_ssl: false + num_workers: 8 + output_destination: cloudwatch + profile: "" + proxy_address: "" + region: us-west-2 + request_timeout_seconds: 30 + resource_arn: "" + resource_to_telemetry_conversion: + enabled: true + retain_initial_value_of_delta_metric: false + role_arn: "" + version: "0" +extensions: + agenthealth/logs: + is_usage_data_enabled: true + stats: + operations: + - PutLogEvents + usage_flags: + mode: EC2 + region_type: ACJ + agenthealth/statuscode: + is_status_code_enabled: true + is_usage_data_enabled: true + stats: + usage_flags: + mode: EC2 + region_type: ACJ + entitystore: + mode: ec2 + region: us-west-2 +processors: + awsentity/service/otlp: + entity_type: Service + platform: ec2 + batch/hostOtlpMetrics/cloudwatchlogs: + metadata_cardinality_limit: 1000 + send_batch_max_size: 0 + send_batch_size: 8192 + timeout: 30s + batch/prometheus/cloudwatchlogs: + metadata_cardinality_limit: 1000 + send_batch_max_size: 0 + send_batch_size: 8192 + timeout: 30s + cumulativetodelta/hostOtlpMetrics/cloudwatchlogs: + exclude: + match_type: "" + include: + match_type: "" + initial_value: 2 + max_staleness: 0s +receivers: + otlp/grpc_0_0_0_0_4317: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + keepalive: + enforcement_policy: {} + server_parameters: {} + read_buffer_size: 524288 + transport: tcp + otlp/http_0_0_0_0_4318: + protocols: + http: + cors: {} + endpoint: 0.0.0.0:4318 + idle_timeout: 0s + logs_url_path: /v1/logs + metrics_url_path: /v1/metrics + read_header_timeout: 0s + traces_url_path: /v1/traces + write_timeout: 0s + telegraf_prometheus: + collection_interval: 1m0s + initial_delay: 1s + timeout: 0s +service: + extensions: + - agenthealth/logs + - agenthealth/statuscode + - entitystore + pipelines: + metrics/hostOtlpMetrics/cloudwatchlogs: + exporters: + - awsemf + processors: + - cumulativetodelta/hostOtlpMetrics/cloudwatchlogs + - awsentity/service/otlp + - batch/hostOtlpMetrics/cloudwatchlogs + receivers: + - otlp/grpc_0_0_0_0_4317 + - otlp/http_0_0_0_0_4318 + metrics/prometheus/cloudwatchlogs: + exporters: + - awsemf/prometheus + processors: + - batch/prometheus/cloudwatchlogs + receivers: + - telegraf_prometheus + telemetry: + logs: + encoding: console + level: info + output_paths: + - /opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log + sampling: + enabled: true + initial: 2 + thereafter: 500 + tick: 10s + metrics: + level: None + traces: + level: None diff --git a/translator/tocwconfig/tocwconfig_test.go b/translator/tocwconfig/tocwconfig_test.go index ac5cef7c731..2613514556c 100644 --- a/translator/tocwconfig/tocwconfig_test.go +++ b/translator/tocwconfig/tocwconfig_test.go @@ -348,6 +348,11 @@ func TestSharedOtlp(t *testing.T) { checkTranslation(t, "shared_otlp_config", "linux", nil, "") } +func TestOtlpPrometheusCombinedConfig(t *testing.T) { + resetContext(t) + context.CurrentContext().SetMode(config.ModeEC2) + checkTranslation(t, "otlp_prometheus_combined_config", "linux", nil, "") +} func TestProcstatMemorySwapConfig(t *testing.T) { resetContext(t) context.CurrentContext().SetRunInContainer(false) diff --git a/translator/translate/logs/metrics_collected/otlp/otlp.go b/translator/translate/logs/metrics_collected/otlp/otlp.go new file mode 100644 index 00000000000..e753e6f2f52 --- /dev/null +++ b/translator/translate/logs/metrics_collected/otlp/otlp.go @@ -0,0 +1,45 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package otlp + +import ( + "github.com/aws/amazon-cloudwatch-agent/translator" + parent "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/metrics_collected" +) + +type Rule translator.Rule + +type Otlp struct { +} + +const SectionKey = "otlp" + +func GetCurPath() string { + curPath := parent.GetCurPath() + SectionKey + "/" + return curPath +} + +func (o *Otlp) ApplyRule(input interface{}) (string, interface{}) { + im := input.(map[string]interface{}) + result := map[string]map[string]interface{}{} + inputs := map[string]interface{}{} + processors := map[string]interface{}{} + + // Check if this plugin exists in the input instance + if _, ok := im[SectionKey]; !ok { + return "", "" + } + // OTLP configuration is handled by the OTEL pipeline translator + // This rule just validates the configuration exists + result["inputs"] = inputs + result["processors"] = processors + return SectionKey, result +} + +func init() { + o := new(Otlp) + parent.RegisterLinuxRule(SectionKey, o) + parent.RegisterDarwinRule(SectionKey, o) + parent.RegisterWindowsRule(SectionKey, o) +} diff --git a/translator/translate/otel/exporter/awsemf/otlp.go b/translator/translate/otel/exporter/awsemf/otlp.go new file mode 100644 index 00000000000..aa60f0fc8a6 --- /dev/null +++ b/translator/translate/otel/exporter/awsemf/otlp.go @@ -0,0 +1,52 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package awsemf + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" + "go.opentelemetry.io/collector/confmap" + + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" +) + +var ( + otlpBasePathKey = common.OTLPLogsKey + otlpEMFProcessorPathKey = common.ConfigKey(otlpBasePathKey, common.EMFProcessorKey) +) + +// setOTLPFields configures the EMF exporter for OTLP metrics +func setOTLPFields(conf *confmap.Conf, cfg *awsemfexporter.Config) error { + // Set log group name if provided + setOTLPLogGroup(conf, cfg) + + // Set EMF processor fields if provided + if conf.IsSet(otlpEMFProcessorPathKey) { + if err := setOTLPNamespace(conf, cfg); err != nil { + return err + } + if err := setOTLPMetricDescriptors(conf, cfg); err != nil { + return err + } + } + + return nil +} + +// setOTLPLogGroup sets the log group name for OTLP metrics +// If not set, use the default from awsemf_default_generic.yaml (/aws/cwagent) +func setOTLPLogGroup(conf *confmap.Conf, cfg *awsemfexporter.Config) { + if logGroupName, ok := common.GetString(conf, common.ConfigKey(otlpBasePathKey, common.LogGroupName)); ok { + cfg.LogGroupName = logGroupName + } +} + +// setOTLPNamespace sets the metric namespace for OTLP metrics +func setOTLPNamespace(conf *confmap.Conf, cfg *awsemfexporter.Config) error { + return setNamespaceWithDefault(conf, common.ConfigKey(otlpEMFProcessorPathKey, metricNamespace), "", cfg) +} + +// setOTLPMetricDescriptors sets the metric units for OTLP metrics +func setOTLPMetricDescriptors(conf *confmap.Conf, cfg *awsemfexporter.Config) error { + return setMetricDescriptors(conf, common.ConfigKey(otlpEMFProcessorPathKey, metricUnit), cfg) +} diff --git a/translator/translate/otel/exporter/awsemf/otlp_test.go b/translator/translate/otel/exporter/awsemf/otlp_test.go new file mode 100644 index 00000000000..3b2f84a2cc8 --- /dev/null +++ b/translator/translate/otel/exporter/awsemf/otlp_test.go @@ -0,0 +1,249 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package awsemf + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" +) + +func TestSetOTLPLogGroup(t *testing.T) { + tests := []struct { + name string + config map[string]interface{} + expectedLogGrp string + }{ + { + name: "CustomLogGroup", + config: map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "otlp": map[string]interface{}{ + "log_group_name": "/aws/application/custom", + }, + }, + }, + }, + expectedLogGrp: "/aws/application/custom", + }, + { + name: "NoLogGroupUsesDefault", + config: map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "otlp": map[string]interface{}{}, + }, + }, + }, + expectedLogGrp: "/aws/cwagent", // Default from awsemf_default_generic.yaml + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := confmap.NewFromStringMap(tt.config) + cfg := &awsemfexporter.Config{ + LogGroupName: "/aws/cwagent", // Default + } + + setOTLPLogGroup(conf, cfg) + assert.Equal(t, tt.expectedLogGrp, cfg.LogGroupName) + }) + } +} + +func TestSetOTLPNamespace(t *testing.T) { + tests := []struct { + name string + config map[string]interface{} + expectedNamespace string + }{ + { + name: "CustomNamespace", + config: map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "otlp": map[string]interface{}{ + "emf_processor": map[string]interface{}{ + "metric_namespace": "MyApp/Custom", + }, + }, + }, + }, + }, + expectedNamespace: "MyApp/Custom", + }, + { + name: "NoNamespaceUsesDefault", + config: map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "otlp": map[string]interface{}{}, + }, + }, + }, + expectedNamespace: "CWAgent", // Default from awsemf_default_generic.yaml + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := confmap.NewFromStringMap(tt.config) + cfg := &awsemfexporter.Config{ + Namespace: "CWAgent", // Default + } + + err := setOTLPNamespace(conf, cfg) + require.NoError(t, err) + assert.Equal(t, tt.expectedNamespace, cfg.Namespace) + }) + } +} + +func TestSetOTLPMetricDescriptors(t *testing.T) { + tests := []struct { + name string + config map[string]interface{} + expectedMetricDescriptors int + }{ + { + name: "WithMetricUnits", + config: map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "otlp": map[string]interface{}{ + "emf_processor": map[string]interface{}{ + "metric_unit": map[string]interface{}{ + "request_duration": "Milliseconds", + "request_count": "Count", + }, + }, + }, + }, + }, + }, + expectedMetricDescriptors: 2, + }, + { + name: "NoMetricUnits", + config: map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "otlp": map[string]interface{}{}, + }, + }, + }, + expectedMetricDescriptors: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := confmap.NewFromStringMap(tt.config) + cfg := &awsemfexporter.Config{} + + err := setOTLPMetricDescriptors(conf, cfg) + require.NoError(t, err) + assert.Len(t, cfg.MetricDescriptors, tt.expectedMetricDescriptors) + }) + } +} + +func TestSetOTLPFields(t *testing.T) { + config := map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "otlp": map[string]interface{}{ + "log_group_name": "/aws/application/otlp", + "emf_processor": map[string]interface{}{ + "metric_namespace": "MyApplication/OTLP", + "metric_unit": map[string]interface{}{ + "request_duration": "Milliseconds", + "request_count": "Count", + }, + }, + }, + }, + }, + } + + conf := confmap.NewFromStringMap(config) + cfg := &awsemfexporter.Config{ + LogGroupName: "/aws/cwagent", + Namespace: "CWAgent", + } + + err := setOTLPFields(conf, cfg) + require.NoError(t, err) + assert.Equal(t, "/aws/application/otlp", cfg.LogGroupName) + assert.Equal(t, "MyApplication/OTLP", cfg.Namespace) + assert.Len(t, cfg.MetricDescriptors, 2) +} + +func TestIsOTLP(t *testing.T) { + tests := []struct { + name string + config map[string]interface{} + pipelineName string + expected bool + }{ + { + name: "OTLPConfigured", + config: map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "otlp": map[string]interface{}{ + "grpc_endpoint": "0.0.0.0:4317", + }, + }, + }, + }, + expected: true, + }, + { + name: "OTLPNotConfigured", + config: map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{}, + }, + }, + expected: false, + }, + { + name: "EmptyConfig", + config: map[string]interface{}{}, + expected: false, + }, + { + name: "OTLPConfiguredButWrongPipeline", + config: map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "otlp": map[string]interface{}{ + "grpc_endpoint": "0.0.0.0:4317", + }, + }, + }, + }, + pipelineName: "containerinsights", + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := confmap.NewFromStringMap(tt.config) + pipelineName := "" + if tt.pipelineName != "" { + pipelineName = tt.pipelineName + } + result := isOTLP(conf, pipelineName) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/translator/translate/otel/exporter/awsemf/prometheus.go b/translator/translate/otel/exporter/awsemf/prometheus.go index 20a4a324e40..ae81349da11 100644 --- a/translator/translate/otel/exporter/awsemf/prometheus.go +++ b/translator/translate/otel/exporter/awsemf/prometheus.go @@ -16,8 +16,6 @@ import ( ) const ( - metricUnit = "metric_unit" - metricNamespace = "metric_namespace" metricDeclartion = "metric_declaration" ecsDefaultCloudWatchNamespace = "ECS/ContainerInsights/Prometheus" k8sDefaultCloudWatchNamespace = "ContainerInsights/Prometheus" @@ -51,48 +49,21 @@ func setPrometheusLogGroup(conf *confmap.Conf, cfg *awsemfexporter.Config) error return nil } func setPrometheusNamespace(conf *confmap.Conf, cfg *awsemfexporter.Config) error { - if namespace, ok := common.GetString(conf, common.ConfigKey(emfProcessorBasePathKey, metricNamespace)); ok { - cfg.Namespace = namespace - return nil - } - + var defaultNamespace string if context.CurrentContext().RunInContainer() { if ecsutil.GetECSUtilSingleton().IsECS() { - cfg.Namespace = ecsDefaultCloudWatchNamespace + defaultNamespace = ecsDefaultCloudWatchNamespace } else { - cfg.Namespace = k8sDefaultCloudWatchNamespace + defaultNamespace = k8sDefaultCloudWatchNamespace } } else { - cfg.Namespace = ec2DefaultCloudWatchNamespace + defaultNamespace = ec2DefaultCloudWatchNamespace } - - return nil - + return setNamespaceWithDefault(conf, common.ConfigKey(emfProcessorBasePathKey, metricNamespace), defaultNamespace, cfg) } func setPrometheusMetricDescriptors(conf *confmap.Conf, cfg *awsemfexporter.Config) error { - metricUnitKey := common.ConfigKey(emfProcessorBasePathKey, metricUnit) - if !conf.IsSet(metricUnitKey) { - return nil - } - - mus := conf.Get(metricUnitKey) - metricUnits := mus.(map[string]interface{}) - var metricDescriptors []map[string]string - for mName, unit := range metricUnits { - metricDescriptors = append(metricDescriptors, map[string]string{ - "metric_name": mName, - "unit": unit.(string), - }) - } - c := confmap.NewFromStringMap(map[string]interface{}{ - "metric_descriptors": metricDescriptors, - }) - cfg.MetricDescriptors = []awsemfexporter.MetricDescriptor{} - if err := c.Unmarshal(&cfg); err != nil { - return fmt.Errorf("unable to unmarshal metric_descriptors: %w", err) - } - return nil + return setMetricDescriptors(conf, common.ConfigKey(emfProcessorBasePathKey, metricUnit), cfg) } func setPrometheusMetricDeclarations(conf *confmap.Conf, cfg *awsemfexporter.Config) error { diff --git a/translator/translate/otel/exporter/awsemf/translator.go b/translator/translate/otel/exporter/awsemf/translator.go index 5dea4285415..ae690004ee1 100644 --- a/translator/translate/otel/exporter/awsemf/translator.go +++ b/translator/translate/otel/exporter/awsemf/translator.go @@ -100,7 +100,7 @@ func (t *translator) Translate(c *confmap.Conf) (component.Config, error) { defaultConfig = defaultKubernetesConfig } - if isOTLP(c) { + if isOTLP(c, t.name) { cfg.AddEntity = true } @@ -154,6 +154,10 @@ func (t *translator) Translate(c *confmap.Conf) (component.Config, error) { if err := setPrometheusFields(c, cfg); err != nil { return nil, err } + } else if isOTLP(c, t.name) { + if err := setOTLPFields(c, cfg); err != nil { + return nil, err + } } else if isKubernetes(c, t.name) { if err := setKubernetesFields(c, cfg); err != nil { return nil, err @@ -186,8 +190,8 @@ func isPrometheus(conf *confmap.Conf, pipelineName string) bool { return conf.IsSet(prometheusBasePathKey) && pipelineName == common.PipelineNamePrometheus } -func isOTLP(conf *confmap.Conf) bool { - return conf.IsSet(common.OTLPLogsKey) +func isOTLP(conf *confmap.Conf, pipelineName string) bool { + return conf.IsSet(common.OTLPLogsKey) && pipelineName == "" } func setAppSignalsFields(_ *confmap.Conf, _ *awsemfexporter.Config) error { diff --git a/translator/translate/otel/exporter/awsemf/translator_test.go b/translator/translate/otel/exporter/awsemf/translator_test.go index 2594c9b4387..1576b89f950 100644 --- a/translator/translate/otel/exporter/awsemf/translator_test.go +++ b/translator/translate/otel/exporter/awsemf/translator_test.go @@ -1626,3 +1626,67 @@ func TestTranslateAppSignals(t *testing.T) { }) } } + +func TestTranslatorOTLPWithCustomConfig(t *testing.T) { + agent.Global_Config.Region = "us-west-2" + agent.Global_Config.Role_arn = "" + context.CurrentContext().SetMode(config.ModeEC2) + + inputConfig := map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "otlp": map[string]interface{}{ + "grpc_endpoint": "0.0.0.0:4317", + "log_group_name": "/aws/application/otlp", + "emf_processor": map[string]interface{}{ + "metric_namespace": "MyApplication/OTLP", + "metric_unit": map[string]interface{}{ + "request_duration": "Milliseconds", + "request_count": "Count", + }, + }, + }, + }, + }, + } + + conf := confmap.NewFromStringMap(inputConfig) + translator := NewTranslator() + cfg, err := translator.Translate(conf) + require.NoError(t, err) + require.NotNil(t, cfg) + + emfCfg := cfg.(*awsemfexporter.Config) + assert.Equal(t, "/aws/application/otlp", emfCfg.LogGroupName) + assert.Equal(t, "MyApplication/OTLP", emfCfg.Namespace) + assert.Len(t, emfCfg.MetricDescriptors, 2) + assert.True(t, emfCfg.AddEntity) +} + +func TestTranslatorOTLPWithDefaults(t *testing.T) { + agent.Global_Config.Region = "us-west-2" + agent.Global_Config.Role_arn = "" + context.CurrentContext().SetMode(config.ModeEC2) + + inputConfig := map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "otlp": map[string]interface{}{ + "grpc_endpoint": "0.0.0.0:4317", + }, + }, + }, + } + + conf := confmap.NewFromStringMap(inputConfig) + translator := NewTranslator() + cfg, err := translator.Translate(conf) + require.NoError(t, err) + require.NotNil(t, cfg) + + emfCfg := cfg.(*awsemfexporter.Config) + // Should use defaults from awsemf_default_generic.yaml + assert.Equal(t, "/aws/cwagent", emfCfg.LogGroupName) + assert.Equal(t, "CWAgent", emfCfg.Namespace) + assert.True(t, emfCfg.AddEntity) +} diff --git a/translator/translate/otel/exporter/awsemf/utils.go b/translator/translate/otel/exporter/awsemf/utils.go new file mode 100644 index 00000000000..5c3a5c8683b --- /dev/null +++ b/translator/translate/otel/exporter/awsemf/utils.go @@ -0,0 +1,58 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package awsemf + +import ( + "fmt" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" + "go.opentelemetry.io/collector/confmap" + + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" +) + +const ( + metricUnit = "metric_unit" + metricNamespace = "metric_namespace" +) + +// setMetricDescriptors is a shared function to set metric descriptors from metric_unit configuration +func setMetricDescriptors(conf *confmap.Conf, metricUnitKey string, cfg *awsemfexporter.Config) error { + if !conf.IsSet(metricUnitKey) { + return nil + } + + mus := conf.Get(metricUnitKey) + metricUnits := mus.(map[string]interface{}) + var metricDescriptors []map[string]string + for mName, unit := range metricUnits { + metricDescriptors = append(metricDescriptors, map[string]string{ + "metric_name": mName, + "unit": unit.(string), + }) + } + c := confmap.NewFromStringMap(map[string]interface{}{ + "metric_descriptors": metricDescriptors, + }) + cfg.MetricDescriptors = []awsemfexporter.MetricDescriptor{} + if err := c.Unmarshal(&cfg); err != nil { + return fmt.Errorf("unable to unmarshal metric_descriptors: %w", err) + } + return nil +} + +// setNamespaceWithDefault is a shared function to set namespace from config or use a default +func setNamespaceWithDefault(conf *confmap.Conf, namespaceKey string, defaultNamespace string, cfg *awsemfexporter.Config) error { + if namespace, ok := common.GetString(conf, namespaceKey); ok { + cfg.Namespace = namespace + return nil + } + + if defaultNamespace != "" { + cfg.Namespace = defaultNamespace + } + // If defaultNamespace is empty, the namespace from awsemf_default_generic.yaml will be used + + return nil +} diff --git a/translator/translate/otel/exporter/awsemf/utils_test.go b/translator/translate/otel/exporter/awsemf/utils_test.go new file mode 100644 index 00000000000..15f002e434d --- /dev/null +++ b/translator/translate/otel/exporter/awsemf/utils_test.go @@ -0,0 +1,100 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package awsemf + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" +) + +func TestSetMetricDescriptors(t *testing.T) { + tests := []struct { + name string + config map[string]interface{} + metricUnitKey string + expectedMetricDescriptors int + }{ + { + name: "WithMetricUnits", + config: map[string]interface{}{ + "emf_processor": map[string]interface{}{ + "metric_unit": map[string]interface{}{ + "request_duration": "Milliseconds", + "request_count": "Count", + }, + }, + }, + metricUnitKey: "emf_processor::metric_unit", + expectedMetricDescriptors: 2, + }, + { + name: "NoMetricUnits", + config: map[string]interface{}{}, + metricUnitKey: "emf_processor::metric_unit", + expectedMetricDescriptors: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := confmap.NewFromStringMap(tt.config) + cfg := &awsemfexporter.Config{} + + err := setMetricDescriptors(conf, tt.metricUnitKey, cfg) + require.NoError(t, err) + assert.Len(t, cfg.MetricDescriptors, tt.expectedMetricDescriptors) + }) + } +} + +func TestSetNamespaceWithDefault(t *testing.T) { + tests := []struct { + name string + config map[string]interface{} + namespaceKey string + defaultNamespace string + expectedNamespace string + }{ + { + name: "CustomNamespace", + config: map[string]interface{}{ + "emf_processor": map[string]interface{}{ + "metric_namespace": "MyApp/Custom", + }, + }, + namespaceKey: "emf_processor::metric_namespace", + defaultNamespace: "DefaultNamespace", + expectedNamespace: "MyApp/Custom", + }, + { + name: "UseDefault", + config: map[string]interface{}{}, + namespaceKey: "emf_processor::metric_namespace", + defaultNamespace: "DefaultNamespace", + expectedNamespace: "DefaultNamespace", + }, + { + name: "NoDefaultNoCustom", + config: map[string]interface{}{}, + namespaceKey: "emf_processor::metric_namespace", + defaultNamespace: "", + expectedNamespace: "", // Will use the default from awsemf_default_generic.yaml + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := confmap.NewFromStringMap(tt.config) + cfg := &awsemfexporter.Config{} + + err := setNamespaceWithDefault(conf, tt.namespaceKey, tt.defaultNamespace, cfg) + require.NoError(t, err) + assert.Equal(t, tt.expectedNamespace, cfg.Namespace) + }) + } +}