From 44918afd53ad713ac10767f2e84886cc498a0101 Mon Sep 17 00:00:00 2001 From: Marcin Wisniewski Date: Wed, 24 Apr 2019 12:35:06 +0100 Subject: [PATCH 1/3] Adds Statuser implementation --- proximo/internal/proximoc/client.go | 4 +++ proximo/proximo_sink.go | 2 +- proximo/status.go | 41 +++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 proximo/status.go diff --git a/proximo/internal/proximoc/client.go b/proximo/internal/proximoc/client.go index 30a31d2..b22332a 100644 --- a/proximo/internal/proximoc/client.go +++ b/proximo/internal/proximoc/client.go @@ -284,3 +284,7 @@ func makeId() string { } return base64.URLEncoding.EncodeToString(random) } + +func (p *ProducerConn) GrpcClient () *grpc.ClientConn { + return p.cc +} diff --git a/proximo/proximo_sink.go b/proximo/proximo_sink.go index 648273f..ec76d43 100644 --- a/proximo/proximo_sink.go +++ b/proximo/proximo_sink.go @@ -66,5 +66,5 @@ func (mq *messageSink) Close() error { // Status reports the status of the message sink func (mq *messageSink) Status() (*pubsub.Status, error) { - return nil, errors.New("status is not implemented") + return sinkStatus(mq.producer.GrpcClient()) } diff --git a/proximo/status.go b/proximo/status.go new file mode 100644 index 0000000..0c02349 --- /dev/null +++ b/proximo/status.go @@ -0,0 +1,41 @@ +package proximo + +import ( + "errors" + + "github.com/utilitywarehouse/go-pubsub" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" +) + +// ErrNotConnected is returned if a status is requested before the connection has been initialized +var ErrNotConnected = errors.New("proximo not connected") + +func sinkStatus(cc *grpc.ClientConn) (*pubsub.Status, error) { + if cc == nil { + return nil, ErrNotConnected + } + + var working bool + var problems []string + switch cc.GetState() { + case connectivity.Ready, + connectivity.Idle: + working = true + case connectivity.Shutdown, + connectivity.TransientFailure: + working = false + problems = append(problems, cc.GetState().String()) + case connectivity.Connecting: + working = true + problems = append(problems, cc.GetState().String()) + default: + working = false + problems = append(problems, "unknown connectivity state") + } + + return &pubsub.Status{ + Problems: problems, + Working: working, + }, nil +} From 62add89b209c50a30b8bc4ed8e09eb145dfbdc6c Mon Sep 17 00:00:00 2001 From: Marcin Wisniewski Date: Wed, 24 Apr 2019 12:37:04 +0100 Subject: [PATCH 2/3] Adds comment to exported method --- proximo/internal/proximoc/client.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/proximo/internal/proximoc/client.go b/proximo/internal/proximoc/client.go index b22332a..a507cde 100644 --- a/proximo/internal/proximoc/client.go +++ b/proximo/internal/proximoc/client.go @@ -108,7 +108,7 @@ func consumeContext(ctx context.Context, proximoAddress string, consumer string, case err := <-errs: return err case <-localCtx.Done(): - return nil //ctx.Err() + return nil // ctx.Err() } } @@ -197,7 +197,7 @@ func (p *ProducerConn) Close() error { func (p *ProducerConn) start() error { - // defer p.stream.CloseSend() + // defer p.stream.CloseSend() confirmations := make(chan *Confirmation, 16) // TODO: make buffer size configurable? @@ -285,6 +285,7 @@ func makeId() string { return base64.URLEncoding.EncodeToString(random) } -func (p *ProducerConn) GrpcClient () *grpc.ClientConn { +// GrpcClient returns grpc client connection +func (p *ProducerConn) GrpcClient() *grpc.ClientConn { return p.cc } From f179f9125aa6dfd556084c323785f75b9a996daf Mon Sep 17 00:00:00 2001 From: Marcin Wisniewski Date: Wed, 24 Apr 2019 14:17:43 +0100 Subject: [PATCH 3/3] pedantic cleanup --- proximo/internal/proximoc/client.go | 4 ++-- proximo/status.go | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/proximo/internal/proximoc/client.go b/proximo/internal/proximoc/client.go index a507cde..af0db61 100644 --- a/proximo/internal/proximoc/client.go +++ b/proximo/internal/proximoc/client.go @@ -285,7 +285,7 @@ func makeId() string { return base64.URLEncoding.EncodeToString(random) } -// GrpcClient returns grpc client connection -func (p *ProducerConn) GrpcClient() *grpc.ClientConn { +// GRPCClient returns grpc client connection +func (p *ProducerConn) GRPCClient() *grpc.ClientConn { return p.cc } diff --git a/proximo/status.go b/proximo/status.go index 0c02349..c556938 100644 --- a/proximo/status.go +++ b/proximo/status.go @@ -16,8 +16,10 @@ func sinkStatus(cc *grpc.ClientConn) (*pubsub.Status, error) { return nil, ErrNotConnected } - var working bool - var problems []string + var ( + working bool + problems []string + ) switch cc.GetState() { case connectivity.Ready, connectivity.Idle: