Skip to content

Commit f545450

Browse files
authored
Add an option to store payload to files. (#124)
This change introduces an option to store the CreateTimeSeriesRequest protobuf messages in wire format to files under a directory specified, instead of sending protobuf messages to Stackdriver Monitoring API.
1 parent fc1c9e2 commit f545450

File tree

3 files changed

+208
-32
lines changed

3 files changed

+208
-32
lines changed

cmd/stackdriver-prometheus-sidecar/main.go

Lines changed: 78 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -180,24 +180,25 @@ func main() {
180180
}
181181

182182
cfg := struct {
183-
configFilename string
184-
projectIdResource string
185-
kubernetesLabels kubernetesConfig
186-
genericLabels genericConfig
187-
stackdriverAddress *url.URL
188-
metricsPrefix string
189-
useGkeResource bool
190-
walDirectory string
191-
prometheusURL *url.URL
192-
listenAddress string
193-
filters []string
194-
filtersets []string
195-
aggregations retrieval.CounterAggregatorConfig
196-
metricRenames map[string]string
197-
staticMetadata []scrape.MetricMetadata
198-
useRestrictedIps bool
199-
manualResolver *manual.Resolver
200-
monitoringBackends []string
183+
configFilename string
184+
projectIdResource string
185+
kubernetesLabels kubernetesConfig
186+
genericLabels genericConfig
187+
stackdriverAddress *url.URL
188+
metricsPrefix string
189+
useGkeResource bool
190+
storeInFilesDirectory string
191+
walDirectory string
192+
prometheusURL *url.URL
193+
listenAddress string
194+
filters []string
195+
filtersets []string
196+
aggregations retrieval.CounterAggregatorConfig
197+
metricRenames map[string]string
198+
staticMetadata []scrape.MetricMetadata
199+
useRestrictedIps bool
200+
manualResolver *manual.Resolver
201+
monitoringBackends []string
201202

202203
logLevel promlog.AllowedLevel
203204
}{}
@@ -239,6 +240,9 @@ func main() {
239240
"Whether to use the legacy gke_container MonitoredResource type instead of k8s_container").
240241
Default("false").BoolVar(&cfg.useGkeResource)
241242

243+
a.Flag("stackdriver.store-in-files-directory", "If specified, store the CreateTimeSeriesRequest protobuf messages to files under this directory, instead of sending protobuf messages to Stackdriver Monitoring API.").
244+
StringVar(&cfg.storeInFilesDirectory)
245+
242246
a.Flag("prometheus.wal-directory", "Directory from where to read the Prometheus TSDB WAL.").
243247
Default("data/wal").StringVar(&cfg.walDirectory)
244248

@@ -393,16 +397,34 @@ func main() {
393397
// works well.
394398
config.DefaultQueueConfig.Capacity = 3 * stackdriver.MaxTimeseriesesPerRequest
395399

396-
queueManager, err := stackdriver.NewQueueManager(
397-
log.With(logger, "component", "queue_manager"),
398-
config.DefaultQueueConfig,
399-
&clientFactory{
400+
var scf stackdriver.StorageClientFactory
401+
402+
if len(cfg.storeInFilesDirectory) > 0 {
403+
err := os.MkdirAll(cfg.storeInFilesDirectory, 0700)
404+
if err != nil {
405+
level.Error(logger).Log(
406+
"msg", "Failure creating directory.",
407+
"err", err)
408+
os.Exit(1)
409+
}
410+
scf = &fileClientFactory{
411+
dir: cfg.storeInFilesDirectory,
412+
logger: log.With(logger, "component", "storage"),
413+
}
414+
} else {
415+
scf = &stackdriverClientFactory{
400416
logger: log.With(logger, "component", "storage"),
401417
projectIdResource: cfg.projectIdResource,
402418
url: cfg.stackdriverAddress,
403419
timeout: 10 * time.Second,
404420
manualResolver: cfg.manualResolver,
405-
},
421+
}
422+
}
423+
424+
queueManager, err := stackdriver.NewQueueManager(
425+
log.With(logger, "component", "queue_manager"),
426+
config.DefaultQueueConfig,
427+
scf,
406428
tailer,
407429
)
408430
if err != nil {
@@ -561,26 +583,50 @@ func main() {
561583
level.Info(logger).Log("msg", "See you next time!")
562584
}
563585

564-
type clientFactory struct {
586+
type stackdriverClientFactory struct {
565587
logger log.Logger
566588
projectIdResource string
567589
url *url.URL
568590
timeout time.Duration
569591
manualResolver *manual.Resolver
570592
}
571593

572-
func (f *clientFactory) New() stackdriver.StorageClient {
594+
func (s *stackdriverClientFactory) New() stackdriver.StorageClient {
573595
return stackdriver.NewClient(&stackdriver.ClientConfig{
574-
Logger: f.logger,
575-
ProjectId: f.projectIdResource,
576-
URL: f.url,
577-
Timeout: f.timeout,
578-
Resolver: f.manualResolver,
596+
Logger: s.logger,
597+
ProjectId: s.projectIdResource,
598+
URL: s.url,
599+
Timeout: s.timeout,
600+
Resolver: s.manualResolver,
579601
})
580602
}
581603

582-
func (f *clientFactory) Name() string {
583-
return f.url.String()
604+
func (s *stackdriverClientFactory) Name() string {
605+
return s.url.String()
606+
}
607+
608+
// fileClientFactory generates StorageClient which writes to a newly
609+
// created file under dir. It requires dir an existing valid directory.
610+
type fileClientFactory struct {
611+
dir string
612+
logger log.Logger
613+
}
614+
615+
// New creates an instance of stackdriver.StorageClient. Each instance
616+
// writes to a different file under dir. The returned instance is not
617+
// thread-safe.
618+
func (fcf *fileClientFactory) New() stackdriver.StorageClient {
619+
f, err := ioutil.TempFile(fcf.dir, "*.txt")
620+
if err != nil {
621+
level.Error(fcf.logger).Log(
622+
"msg", "failure creating files.",
623+
"err", err)
624+
}
625+
return stackdriver.NewCreateTimeSeriesRequestWriterCloser(f, fcf.logger)
626+
}
627+
628+
func (f *fileClientFactory) Name() string {
629+
return "fileClientFactory"
584630
}
585631

586632
func waitForPrometheus(ctx context.Context, logger log.Logger, promURL *url.URL) {

stackdriver/writer.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
Copyright 2019 Google Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package stackdriver
18+
19+
import (
20+
"io"
21+
22+
"github.com/go-kit/kit/log"
23+
"github.com/go-kit/kit/log/level"
24+
"github.com/golang/protobuf/proto"
25+
monitoring "google.golang.org/genproto/googleapis/monitoring/v3"
26+
)
27+
28+
// CreateTimeSeriesRequestWriterCloser allows writing protobuf message
29+
// monitoring.CreateTimeSeriesRequest as wire format into the writerCloser.
30+
type CreateTimeSeriesRequestWriterCloser struct {
31+
logger log.Logger
32+
writeCloser io.WriteCloser
33+
}
34+
35+
func NewCreateTimeSeriesRequestWriterCloser(writeCloser io.WriteCloser, logger log.Logger) *CreateTimeSeriesRequestWriterCloser {
36+
if logger == nil {
37+
logger = log.NewNopLogger()
38+
}
39+
return &CreateTimeSeriesRequestWriterCloser{
40+
writeCloser: writeCloser,
41+
logger: logger,
42+
}
43+
}
44+
45+
// Store writes protobuf message monitoring.CreateTimeSeriesRequest as wire
46+
// format into the writeCloser.
47+
func (c *CreateTimeSeriesRequestWriterCloser) Store(req *monitoring.CreateTimeSeriesRequest) error {
48+
data, err := proto.Marshal(req)
49+
if err != nil {
50+
level.Warn(c.logger).Log(
51+
"msg", "failure marshaling CreateTimeSeriesRequest.",
52+
"err", err)
53+
return err
54+
}
55+
_, err = c.writeCloser.Write(data)
56+
if err != nil {
57+
level.Warn(c.logger).Log(
58+
"msg", "failure writing data to file.",
59+
"err", err)
60+
return err
61+
}
62+
return nil
63+
}
64+
65+
func (c *CreateTimeSeriesRequestWriterCloser) Close() error {
66+
return c.writeCloser.Close()
67+
}

stackdriver/writer_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
Copyright 2019 Google Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package stackdriver
18+
19+
import (
20+
"bytes"
21+
"reflect"
22+
"testing"
23+
24+
"github.com/go-kit/kit/log"
25+
"github.com/golang/protobuf/proto"
26+
monitoring "google.golang.org/genproto/googleapis/monitoring/v3"
27+
)
28+
29+
type myWriterCloser struct {
30+
Buffer bytes.Buffer
31+
}
32+
33+
func (m *myWriterCloser) Write(p []byte) (int, error) {
34+
return m.Buffer.Write(p)
35+
}
36+
37+
func (m *myWriterCloser) Close() error {
38+
m.Buffer.Reset()
39+
return nil
40+
}
41+
42+
func TestRequest(t *testing.T) {
43+
var m myWriterCloser
44+
c := NewCreateTimeSeriesRequestWriterCloser(&m, log.NewNopLogger())
45+
defer c.Close()
46+
req := &monitoring.CreateTimeSeriesRequest{
47+
TimeSeries: []*monitoring.TimeSeries{
48+
&monitoring.TimeSeries{},
49+
},
50+
}
51+
if err := c.Store(req); err != nil {
52+
t.Fatal(err)
53+
}
54+
55+
storedReq := &monitoring.CreateTimeSeriesRequest{}
56+
err := proto.Unmarshal(m.Buffer.Bytes(), storedReq)
57+
if err != nil {
58+
t.Fatal(err)
59+
}
60+
if !reflect.DeepEqual(req, storedReq) {
61+
t.Fatalf("Expect requests as %v, but stored as: %v", req, storedReq)
62+
}
63+
}

0 commit comments

Comments
 (0)