diff --git a/proximo/internal/proximoc/client.go b/proximo/internal/proximoc/client.go index 30a31d2..af0db61 100644 --- a/proximo/internal/proximoc/client.go +++ b/proximo/internal/proximoc/client.go @@ -108,7 +108,7 @@ func consumeContext(ctx context.Context, proximoAddress string, consumer string, case err := <-errs: return err case <-localCtx.Done(): - return nil //ctx.Err() + return nil // ctx.Err() } } @@ -197,7 +197,7 @@ func (p *ProducerConn) Close() error { func (p *ProducerConn) start() error { - // defer p.stream.CloseSend() + // defer p.stream.CloseSend() confirmations := make(chan *Confirmation, 16) // TODO: make buffer size configurable? @@ -284,3 +284,8 @@ func makeId() string { } return base64.URLEncoding.EncodeToString(random) } + +// GRPCClient returns grpc client connection +func (p *ProducerConn) GRPCClient() *grpc.ClientConn { + return p.cc +} diff --git a/proximo/proximo_sink.go b/proximo/proximo_sink.go index 648273f..ec76d43 100644 --- a/proximo/proximo_sink.go +++ b/proximo/proximo_sink.go @@ -66,5 +66,5 @@ func (mq *messageSink) Close() error { // Status reports the status of the message sink func (mq *messageSink) Status() (*pubsub.Status, error) { - return nil, errors.New("status is not implemented") + return sinkStatus(mq.producer.GrpcClient()) } diff --git a/proximo/status.go b/proximo/status.go new file mode 100644 index 0000000..c556938 --- /dev/null +++ b/proximo/status.go @@ -0,0 +1,43 @@ +package proximo + +import ( + "errors" + + "github.com/utilitywarehouse/go-pubsub" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" +) + +// ErrNotConnected is returned if a status is requested before the connection has been initialized +var ErrNotConnected = errors.New("proximo not connected") + +func sinkStatus(cc *grpc.ClientConn) (*pubsub.Status, error) { + if cc == nil { + return nil, ErrNotConnected + } + + var ( + working bool + problems []string + ) + switch cc.GetState() { + case connectivity.Ready, + connectivity.Idle: + working = true + case connectivity.Shutdown, + connectivity.TransientFailure: + working = false + problems = append(problems, cc.GetState().String()) + case connectivity.Connecting: + working = true + problems = append(problems, cc.GetState().String()) + default: + working = false + problems = append(problems, "unknown connectivity state") + } + + return &pubsub.Status{ + Problems: problems, + Working: working, + }, nil +}