diff --git a/CLAUDE.md b/CLAUDE.md index de6af3af..48631183 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -42,7 +42,7 @@ mintlify install - **Namespaces**: Environment isolation for graphs (dev, staging, prod) - **Feature Flags & Feature Subgraphs**: Toggle-able subgraph replacements for incremental rollout - **Schema Contracts**: Filtered graph versions using @tag directives -- **EDFS**: Event-Driven Federated Subscriptions for real-time data +- **Cosmo Streams / EDFS**: Event-Driven Federated Subscriptions for real-time data ## Documentation Structure diff --git a/docs/docs.json b/docs/docs.json index ce020d58..2bfdec0c 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -216,21 +216,30 @@ ] }, { - "group": "Event-Driven Federated Subscriptions (EDFS)", - "icon": "calendar-users", + "group": "Cosmo Streams (EDFS)", + "icon": "wave", "pages": [ - "router/event-driven-federated-subscriptions-edfs", + "router/cosmo-streams", { "group": "NATS", "icon": "puzzle-piece", "pages": [ - "router/event-driven-federated-subscriptions-edfs/nats", - "router/event-driven-federated-subscriptions-edfs/nats/stream-and-consumer-configuration" + "router/cosmo-streams/nats", + "router/cosmo-streams/nats/stream-and-consumer-configuration" ] }, - "router/event-driven-federated-subscriptions-edfs/kafka", - "router/event-driven-federated-subscriptions-edfs/redis", - "router/event-driven-federated-subscriptions-edfs/custom-modules" + "router/cosmo-streams/kafka", + "router/cosmo-streams/redis", + { + "group": "Custom Modules", + "icon": "cubes", + "pages": [ + "router/cosmo-streams/custom-modules", + "router/cosmo-streams/custom-modules/subscription-on-start", + "router/cosmo-streams/custom-modules/on-receive-event", + "router/cosmo-streams/custom-modules/on-publish-event" + ] + } ] }, "router/compliance-and-data-management", @@ -738,5 +747,11 @@ "gtm": { "tagId": "GTM-5GPL7DQH" } - } + }, + "redirects": [ + { + "source": "/router/event-driven-federated-subscriptions-edfs/:path*", + "destination": "/router/cosmo-streams/:path*" + } + ] } diff --git a/docs/federation/directives/openfed__subscriptionfilter.mdx b/docs/federation/directives/openfed__subscriptionfilter.mdx index 3088a849..80a6c9b6 100644 --- a/docs/federation/directives/openfed__subscriptionfilter.mdx +++ b/docs/federation/directives/openfed__subscriptionfilter.mdx @@ -26,7 +26,7 @@ input openfed__SubscriptionFilterCondition { ## Overview -The `@openfed__subscriptionFilter` directive declares that a field definition can be filtered by filter conditions. The directive can only be applied to [EDFS](/federation/event-driven-federated-subscriptions) subscriptions. +The `@openfed__subscriptionFilter` directive declares that a field definition can be filtered by filter conditions. The directive can only be applied to [EDG](/federation/event-driven-federated-subscriptions) subscriptions. ## Arguments diff --git a/docs/federation/event-driven-federated-subscriptions.mdx b/docs/federation/event-driven-federated-subscriptions.mdx index 8fb2a622..5c43f3f5 100644 --- a/docs/federation/event-driven-federated-subscriptions.mdx +++ b/docs/federation/event-driven-federated-subscriptions.mdx @@ -7,6 +7,6 @@ icon: circle-info Defining an Event-Driven Graph with Event-Driven Federated Subscriptions. -An Event-Driven Graph (EDG) is best thought to be an abstract subgraph that facilitates Event-Driven Federated Subscriptions (EDFS). If a subgraph uses or defines any event driven directives, it will be interpreted to be an Event-Driven Graph. +An Event-Driven Graph (EDG) is best thought to be an abstract subgraph that facilitates [Cosmo Streams](/router/cosmo-streams). If a subgraph uses or defines any event driven directives, it will be interpreted to be an Event-Driven Graph. diff --git a/docs/router/configuration.mdx b/docs/router/configuration.mdx index 89ac6bcb..0571dcf0 100644 --- a/docs/router/configuration.mdx +++ b/docs/router/configuration.mdx @@ -611,7 +611,7 @@ This option may change or be removed in future versions as the OpenTelemetry SDK | METRICS_OTLP_EXCLUDE_METRIC_LABELS | exclude_metric_labels | | The metric labels to exclude from the OTEL metrics. Accepts a list of Go regular expressions. Use https://regex101.com/ to test your regular expressions. | [] | | METRICS_OTLP_CONNECTION_STATS | connection_stats | | Enable connection metrics. | false | | METRICS_OTLP_CIRCUIT_BREAKER | circuit_breaker | | Ensure that circuit breaker metrics are enabled for OTEL. | false | -| METRICS_OTLP_STREAM | streams | | Enable EDFS stream metrics. | false | +| METRICS_OTLP_STREAM | streams | | Enable Cosmo Streams metrics. | false | ### Attributes @@ -662,7 +662,7 @@ telemetry: | PROMETHEUS_EXCLUDE_METRIC_LABELS | exclude_metric_labels | | | | | PROMETHEUS_EXCLUDE_SCOPE_INFO | exclude_scope_info | | Exclude scope info from Prometheus metrics. | false | | PROMETHEUS_CIRCUIT_BREAKER | circuit_breaker | | Enable the circuit breaker metrics for prometheus metric collection. | false | -| PROMETHEUS_OTLP_STREAM | streams | | Enable EDFS stream metrics. | false | +| PROMETHEUS_OTLP_STREAM | streams | | Enable Cosmo Streams metrics. | false | ### Example YAML config: diff --git a/docs/router/event-driven-federated-subscriptions-edfs.mdx b/docs/router/cosmo-streams.mdx similarity index 87% rename from docs/router/event-driven-federated-subscriptions-edfs.mdx rename to docs/router/cosmo-streams.mdx index 46a0f7ae..ee75f1b4 100644 --- a/docs/router/event-driven-federated-subscriptions-edfs.mdx +++ b/docs/router/cosmo-streams.mdx @@ -1,6 +1,6 @@ --- -title: "Event-Driven Federated Subscriptions (EDFS)" -description: "EDFS combines the power of GraphQL Federation and Event-Driven Architecture (Kafka, NATS, Redis) to update a user GraphQL Subscription after an event occurs in your system." +title: "Cosmo Streams (EDFS)" +description: "Cosmo Streams (formally known as EDFS) combines the power of GraphQL Federation and Event-Driven Architecture to update a user GraphQL Subscription after an event occurs in your system." icon: "circle-info" sidebarTitle: "Overview" --- @@ -9,7 +9,7 @@ sidebarTitle: "Overview" ![](/images/router/EDFS.png) -Event Driven Federated Subscriptions (EDFS) solves 3 major problems when it comes to GraphQL Federation and Subscriptions by directly connecting the Router to an event source like Kafka and NATS and making it a part of the Graph. +Cosmo Streams solves 3 major problems when it comes to GraphQL Federation and Subscriptions by directly connecting the Router to an event source like Kafka and NATS and making it a part of the Graph. ## Intro @@ -38,18 +38,18 @@ Furthermore, classic Subscriptions with Federation are quite expensive when it c ## Specification -Enter Event-Driven Federated Subscriptions, a simple way to scale Federated Subscriptions in a resource-efficient manner. +Enter Cosmo Streams, a simple way to scale Federated Subscriptions in a resource-efficient manner. -EDFS supports three event providers: +Cosmo Streams supports three event providers: - + - + - + @@ -58,7 +58,7 @@ Each provider consists of at least Publish and Subscribe capabilities. For NATS, Our goal is to integrate with various technologies rather than agree on a single unified interface. This approach allows us to leverage the strengths of each technology. This philosophy is reflected in how we structure the directives, naming parameters, exposing features as they would appear in their respective ecosystems. -Here is an overview about all EDFS directives: +Here is an overview about all Cosmo Streams directives: ```js # Nats and JetStream integration @@ -120,7 +120,7 @@ An Event-Driven Subgraph does not need to be implemented. It is simply a Subgrap ## Prerequisites -To use EDFS, you need to have an Event Source running and connected to the Router. Currently, we support NATS, Kafka, and Redis. For simplicity, NATS is used to explain the examples. +To use Cosmo Streams, you need to have an Event Source running and connected to the Router. Currently, we support NATS, Kafka, and Redis. For simplicity, NATS is used to explain the examples. To get started, run a NATS instance and add the following configuration to your `config.yaml` Router Configuration: @@ -315,7 +315,7 @@ Here's an **invalid** message as the `__typename` field is missing: -It's important to send the `__typename` field because this allows EDFS to also work for Union and Interface types. +It's important to send the `__typename` field because this allows Cosmo Streams to also work for Union and Interface types. It's worth noting that the Router will not send any responses before you publish a message on the topic. If you need the most recent result, first make a Query, and then subscribe to the Topic. The Router will send the first response only after a message is published on the rendered topic. @@ -437,11 +437,11 @@ The Cosmo Router deduplicates Subscriptions internally to save resources. If mul ### Statelessness of Subgraphs -With EDFS, the Router connects directly to the Event Source but doesn't require any stateful connections, e.g. WebSocket, to the Subgraphs. This makes the Subgraphs much simpler to reason about and easier to deploy. Serverless deployment options usually have limitations on request length. With an Event Broker in the middle, Subgraphs can be stateless without having to give up on Subscriptions. +With Cosmo Streams, the Router connects directly to the Event Source but doesn't require any stateful connections, e.g. WebSocket, to the Subgraphs. This makes the Subgraphs much simpler to reason about and easier to deploy. Serverless deployment options usually have limitations on request length. With an Event Broker in the middle, Subgraphs can be stateless without having to give up on Subscriptions. ### Efficiency, CPU & Memory Consumption (Epoll/Kqueue) -EDFS is built on top of Event-Driven principles, which means that the implementation is non-blocking, as CPU efficient as possible, and has a very low memory footprint. +Cosmo Streams is built on top of Event-Driven principles, which means that the implementation is non-blocking, as CPU efficient as possible, and has a very low memory footprint. We're using Epoll and Kqueue on Systems that support it (Linux, Darwin, etc.) to be as efficient as possible. @@ -451,10 +451,10 @@ The Router supports multi-core out of the box and is capable of scaling up to a ### Publish Events from any System, not just Subgraphs -It's worth noting that publishing Entity update Events is not limited to just Subgraphs. EDFS is designed to fully decouple the API Consumer from the implementation of the Event-Driven Architecture. +It's worth noting that publishing Entity update Events is not limited to just Subgraphs. Cosmo Streams is designed to fully decouple the API Consumer from the implementation of the Event-Driven Architecture. -A client can create a Job via a Mutation and Subscribe to the Job state via EDFS. Next, the Mutation can kick off a long-running process that will be handled by one or many systems in the background. At each step, e.g. when an increment of work is done, each subsystem can publish a message to indicate that the state of an Entity has changed. +A client can create a Job via a Mutation and Subscribe to the Job state via Cosmo Streams. Next, the Mutation can kick off a long-running process that will be handled by one or many systems in the background. At each step, e.g. when an increment of work is done, each subsystem can publish a message to indicate that the state of an Entity has changed. Once the message is published by one of the sub-systems, the Router can Query all Subgraphs to resolve the current state of the Job. -With EDFS, each Subgraph can add fields to an Entity that it's responsible for and publish events to the Message Broker when a long-running Operation updates the overall state of an Entity. \ No newline at end of file +With Cosmo Streams, each Subgraph can add fields to an Entity that it's responsible for and publish events to the Message Broker when a long-running Operation updates the overall state of an Entity. \ No newline at end of file diff --git a/docs/router/cosmo-streams/custom-modules.mdx b/docs/router/cosmo-streams/custom-modules.mdx new file mode 100644 index 00000000..fc070b26 --- /dev/null +++ b/docs/router/cosmo-streams/custom-modules.mdx @@ -0,0 +1,17 @@ +--- +title: "Custom Modules" +sidebarTitle: Overview +description: "Customize Streams behavior with powerful handlers for subscription lifecycle, event processing, and data transformation." +icon: "circle-info" +--- + +Cosmo Router provides powerful handlers to hook into the event processing of Cosmo Streams. +These handlers allow you to implement custom logic for subscription lifecycle management, event processing and data transformation. + +## Available Hooks + +The Cosmo Streams system provides three main hook interfaces that you can implement with [Custom Modules](/router/custom-modules): + +- [`SubscriptionOnStartHandler`](/router/cosmo-streams/custom-modules/subscription-on-start): Called when a client subscribes +- [`OnReceiveEventHandler`](/router/cosmo-streams/custom-modules/on-receive-event): Called when events are received from a message broker +- [`OnPublishEventHandler`](/router/cosmo-streams/custom-modules/on-publish-event): Called when events are going to be sent to a message broker diff --git a/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx b/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx new file mode 100644 index 00000000..17cc90ec --- /dev/null +++ b/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx @@ -0,0 +1,297 @@ +--- +title: "OnPublishEvents Handler" +description: "A Cosmo Streams Custom Module, which lets you customize events before they are sent to message brokers" +icon: "arrow-right" +--- + +The `OnPublishEvents` handler is a custom module hook that allows you to intercept and process events before they are sent to message brokers through GraphQL mutations. This handler is called whenever a batch of events is about to be published to a provider, giving you the opportunity to filter, transform, enrich, or validate events before they are sent to the message broker. + +This handler is particularly useful for: +- **Event validation**: Ensure events meet specific criteria before publishing +- **Data transformation**: Modify event payloads to match broker expectations +- **Event enrichment**: Add additional metadata or context to events +- **Authentication and authorization**: Filter events based on user permissions +- **Monitoring and analytics**: Log or track outgoing events for observability + + +This handler is executed only when a GraphQL mutation triggers event publishing. Unlike `OnReceiveEvents`, this handler processes outgoing events to message brokers, not incoming events from subscriptions. + + +## Handler Interface + +In order to use the `OnPublishEvents` handler you need to create a [Custom Module](../../custom-modules) which implements the `StreamPublishEventHandler` interface. + +```go +type StreamPublishEventHandler interface { + // OnPublishEvents is called each time a batch of events is going to be sent to a provider. + // The events argument contains all events from a batch. + // Use events.All() to iterate through them and event.Clone() to create mutable copies, when needed. + // Returning an error will result in a GraphQL error being returned to the client, could be customized returning a + // StreamHookError. + OnPublishEvents(ctx StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) +} + +type StreamPublishEventHandlerContext interface { + // Request is the original request received by the router. + Request() *http.Request + // Logger is the logger for the request + Logger() *zap.Logger + // Operation is the GraphQL operation + Operation() OperationContext + // Authentication is the authentication for the request + Authentication() authentication.Authentication + // PublishEventConfiguration the publish event configuration + PublishEventConfiguration() datasource.PublishEventConfiguration + // NewEvent creates a new event that can be used in the subscription. + NewEvent(data []byte) datasource.MutableStreamEvent +} +``` + +## Error Handling + +As mentioned in the [Publish Overview Section](/router/event-driven-federated-subscriptions-edfs#publish) the return type of a Cosmo Streams mutation +must use the type `PublishEventResult`. This type declares a boolean `success` field. +Implementations of `OnPublishEvents` handlers return two fields: `events` and `error`. When `error` is not `nil`, the client's response +will have the success field set to `false`. Also the error will be logged on the routers console output. +When events are returned, these will always be sent to the provider, even if you return an error. +This can be useful in case you partially processed data but hit an error along the way. +In case you don't wont to sent any events to the provider, you can return `datasource.NewStreamEvents(nil)`. +See code examples below for a demonstration. + + +When the `OnPublishEvents` handler returns an error, the router takes the following actions: + +1. The client will receive a response, where the `success` field is `false` +2. Returned events are sent to the message broker, if any provided +3. The error is logged by the router with details about the mutation, provider, and field name + + + +Returning events alongide an error from `OnPublishEvents` will send these events to the provider. +In case you don't want to send any you need to return an empty list of events. +Refer to the examples down below to see how this can be done. + + +Here is an example of proper error handling: + +```go +func (m *MyEventHandler) OnPublishEvents( + ctx core.StreamPublishEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + // For validation failures, don't send events to the provider + // and return a success=false to the client. + for _, event := range events.All() { + if !isValidEvent(event.GetData()) { + return datasource.NewStreamEvents(nil), errors.New("invalid event data - publication rejected") + } + } + + // In case of partial processing of data, + // you can send all events part of the successfull processing to the provider + // and can still return an error. + if failureAfterPartialProcessing { + return partialEvents, errors.New("error during data processing") + } + + return events, nil +} +``` + +## Usage Example + +### Complete Custom Module with Event Bypass + +The following example contains a complete Custom Module implementation, including handler registration, with a handler that will simply pass events through unchanged. This demonstrates how to register your `OnPublishEvents` Custom Module. + +```go +package module + +import ( + "github.com/wundergraph/cosmo/router/core" + "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" + "go.uber.org/zap" +) + +func init() { + // Register your module with the router + core.RegisterModule(&EventPublishModule{}) +} + +const ModuleID = "eventPublishModule" + +// EventPublishModule demonstrates a complete custom module implementation +// that implements StreamPublishEventHandler but simply passes events through unchanged +type EventPublishModule struct {} + +func (m *AuthEventHandler) OnPublishEvents( + ctx core.StreamPublishEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + auth := ctx.Authentication() + + // Require authentication for publishing events + if auth == nil { + logger.Warn("Unauthenticated user attempted to publish events") + return nil, errors.New("authentication required to publish events") + } + + // Check JWT claims for required permissions + claims := auth.Claims() + if claims == nil { + return nil, errors.New("invalid authentication token") + } + + // Check for required role to publish events + roleClaimValue, hasRole := claims["role"] + if !hasRole { + return nil, errors.New("missing role claim - publication denied") + } + + userRole, ok := roleClaimValue.(string) + if !ok || (userRole != "admin" && userRole != "publisher") { + logger.Warn("User without publish permissions attempted to publish events", + zap.Any("role", roleClaimValue), + ) + return nil, errors.New("insufficient permissions to publish events") + } + + // User is authorized - allow event publishing + logger.Debug("Authorized user publishing events", + zap.String("role", userRole), + zap.Int("event_count", events.Len()), + ) + return events, nil +} + +// Module returns the module information for registration +func (m *EventPublishModule) Module() core.ModuleInfo { + return core.ModuleInfo{ + ID: ModuleID, + New: func() core.Module { + return &EventPublishModule{} + }, + } +} + +// Interface guards to ensure we implement the required interfaces +var ( + _ core.StreamPublishEventHandler = (*EventPublishModule)(nil) +) +``` + +### Restrict Handler to run on certain mutations and providers + +Most of the time you want your hook to only deal with certain mutations. +The `OnPublishEvents` Handler is run for every mutation configured for Cosmo Streams. +You can access the name of the mutation you care for and return early if it's not the right one. + +```go +func (m *SelectivePublishHandler) OnPublishEvents( + ctx core.StreamPublishEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + pubConfig := ctx.PublishEventConfiguration() + + // Bypass handler if it's not the right mutation + if pubConfig.RootFieldName() != "updateEmployee" { + return events, nil + } + + // And / or you can decide to bypass in case it's not the right provider + // you want to deal with here. + if pubConfig.ProviderID() != "my-kafka" { + return events, nil + } + + + // Your specific event processing logic here + // ... + + return datasource.NewStreamEvents(processedEvents), nil +} +``` + +### Prevent unauthorized users from sending Cosmo Streams mutation events to providers + +You can use `ctx.Authentication()` to validate that only authorized users can publish events to specific message brokers. +This is useful for securing mutation operations that trigger event publishing. + +```go +func (m *AuthEventHandler) OnPublishEvents( + ctx core.StreamPublishEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + auth := ctx.Authentication() + + // Require authentication for publishing events + if auth == nil { + logger.Warn("Unauthenticated user attempted to publish events") + return nil, errors.New("authentication required to publish events") + } + + // Check JWT claims for required permissions + claims := auth.Claims() + if claims == nil { + return nil, errors.New("invalid authentication token") + } + + // Check for required role to publish events + roleClaimValue, hasRole := claims["role"] + if !hasRole { + return nil, errors.New("missing role claim - publication denied") + } + + userRole, ok := roleClaimValue.(string) + if !ok || (userRole != "admin" && userRole != "publisher") { + logger.Warn("User without publish permissions attempted to publish events", + zap.Any("role", roleClaimValue), + ) + return nil, errors.New("insufficient permissions to publish events") + } + + // User is authorized - allow event publishing + logger.Debug("Authorized user publishing events", + zap.String("role", userRole), + zap.Int("event_count", events.Len()), + ) + return events, nil +} +``` + +### Attach headers to Kafka events + +You can attach headers to Kafka events before sending them to brokers. + +```go +func (m *EventPublishModule) OnPublishEvents( + ctx core.StreamPublishEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + // Bypass handler in case it's not about Kafka events + if ctx.PublishEventConfiguration().ProviderType() != datasource.ProviderTypeKafka { + return events, nil + } + + eventsWithHeaders := make([]datasource.StreamEvent, 0, events.Len()) + for _, evt := range events.All() { + // In order to set headers we need to clone the event first to make it mutable + clonedEvent := evt.Clone() + kafkaEvent, ok := clonedEvent.(*kafka.MutableEvent) + if !ok { + rootFieldName := ctx.PublishEventConfiguration().RootFieldName() + ctx.Logger(). + With(zapcore.Field{Key: "root_field_name", String: rootFieldName}). + Warn("got non-kafka event in kafka based handler, this should not happen") + } + + kafkaEvent.Headers["event_source"] = []byte("graphql_mutation") + eventsWithHeaders = append(eventsWithHeaders, kafkaEvent) + } + + return datasource.NewStreamEvents(eventsWithHeaders), nil +} +``` diff --git a/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx b/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx new file mode 100644 index 00000000..17fad2ec --- /dev/null +++ b/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx @@ -0,0 +1,316 @@ +--- +title: "OnReceiveEvents Handler" +description: "A Cosmo Streams Custom Module, which lets you customize events received from a message broker before being passed to subscribers" +icon: "arrow-left" +--- + +The `OnReceiveEvents` handler is a custom module hook that allows you to intercept and process events received from +supported message brokers before they are delivered to GraphQL subscription clients. +This handler is called whenever a batch of events is received from a message broker, giving you the opportunity to filter, transform, enrich, +or validate events before they reach your subscribers. + +This handler is particularly useful for: +- **Event filtering**: Remove unwanted events based on custom logic +- **Data transformation**: Modify event payloads to match client expectations +- **Event enrichment**: Add additional data to events from external sources +- **Authentication and authorization**: Filter events based on user permissions +- **Monitoring and analytics**: Log or track events for observability + + +The handler runs once for each active subscription, so it's recommended to avoid resource-heavy computations or +blocking operations to maintain performance. + + + +If there is no active subscription this handler is not executed, even if new messages arrive at the broker. +This is because the Router will not listen for messages on the broker topic/queue until at least one +client subscribes to a particular subscription. + + +## Handler Interface + +In order to use the `OnReceiveEvents` handler you need to create a [Custom Module](../../custom-modules) which implements +the `StreamReceiveEventHandler` interface. + +```go +type StreamReceiveEventHandler interface { + // OnReceiveEvents is called whenever a batch of events is received from a provider, + // before delivering them to clients. + // The hook will be called once for each active subscription, therefore it is advised to + // avoid resource heavy computation or blocking tasks whenever possible. + // The events argument contains all events from a batch and is shared between + // all active subscribers of these events. + // Use events.All() to iterate through them and event.Clone() to create mutable copies, when needed. + // Returning an error will result in the subscription being closed and the error being logged. + OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) +} + +type StreamReceiveEventHandlerContext interface { + // Request is the initial client request that started the subscription + Request() *http.Request + // Logger is the logger for the request + Logger() *zap.Logger + // Operation is the GraphQL operation + Operation() OperationContext + // Authentication is the authentication for the request + Authentication() authentication.Authentication + // SubscriptionEventConfiguration the subscription event configuration + SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration + // NewEvent creates a new event that can be used in the subscription. + NewEvent(data []byte) datasource.MutableStreamEvent +} +``` + +## Asynchronous Execution + +The `OnReceiveEvents` handler is executed **asynchronously** for each active subscription when events are received from the message broker. +To control resource usage and prevent overwhelming your system, you can configure the maximum number of concurrent handlers using +the `max_concurrent_event_receive_handlers` configuration option. + +```yaml +events: + subscription_hooks: + max_concurrent_event_receive_handlers: 100 # Default: 100 +``` + +This setting controls how many `OnReceiveEvents` handlers routines can run simultaneously across all active subscriptions. +This helps prevent resource exhaustion while maintaining good performance and low latency for event processing. + +## Error Handling + +When the `OnReceiveEvents` handler returns an error, the router takes the following actions: + +1. **Send Returned Events**: Alongside the error you can return events, if you wish to sent them to the client prior connection closing +2. **Subscription Closure**: The affected subscription is immediately closed for the client that encountered the error +3. **Error Logging**: The error is logged by the router with details about the subscription, provider, and field name +4. **Error Deduplication**: If multiple subscriptions experience the same error for the same events, the router deduplicates the error messages in the logs to prevent spam +5. **No Error Propagation**: The error is **not** sent directly to the GraphQL client - the subscription simply closes + + +Returning an error from `OnReceiveEvents` will close the subscription for that specific client. Use this only when you want to terminate the subscription due to unrecoverable conditions. For filtering events, return an empty event list instead of an error. + + + +The error gets logged by the router but it won't be send to the client. +From the view of the client the subscription closes server-side without a reason. We are working on a solution for this. + + +**Example of proper error handling:** + +```go +func (m *MyEventHandler) OnReceiveEvents( + ctx core.StreamReceiveEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + // For recoverable issues, filter events instead of returning errors + if someCondition { + return datasource.NewStreamEvents(nil), nil // Empty events, no error + } + + // Only return errors for unrecoverable conditions + if criticalSystemFailure { + return nil, errors.New("critical system failure - closing subscription") + } + + return events, nil +} +``` + +## Usage Example + +### Complete Custom Module with Event Bypass + +The following example contains a complete Custom Module implementation, including handler registration, +with a handler that will simply bypass events unchanged. This is not useful on it's own but demonstrates +how to register your `OnReceiveEvents` Custom Module. + +```go +package module + +import ( + "github.com/wundergraph/cosmo/router/core" + "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" + "go.uber.org/zap" +) + +func init() { + // Register your module with the router + core.RegisterModule(&EventBypassModule{}) +} + +const ModuleID = "eventBypassModule" + +// EventBypassModule demonstrates a complete custom module implementation +// that implements StreamReceiveEventHandler but simply passes events through unchanged +type EventBypassModule struct {} + +// OnReceiveEvents passes all events through unchanged +func (m *EventBypassModule) OnReceiveEvents( + ctx core.StreamReceiveEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + logger.Debug("Processing events - bypassing unchanged", + zap.Int("event_count", len(events.All())), + ) + + // Simply return the events unchanged + return events, nil +} + +// Module returns the module information for registration +func (m *EventBypassModule) Module() core.ModuleInfo { + return core.ModuleInfo{ + ID: ModuleID, + New: func() core.Module { + return &EventBypassModule{} + }, + } +} + +// Interface guards to ensure we implement the required interfaces +var ( + _ core.StreamReceiveEventHandler = (*EventBypassModule)(nil) +) +``` + +### Restrict Handler to run on certain subscriptions and providers + +Most of the time you want your hook to only deal with a certain subscription. +The `OnReceiveEvents` Handler is run for every subscription configured for Cosmo Streams. +You can access the name of the subscription you care for and return early if it's not the right one. + +```go +func (m *SelectiveEventHandler) OnReceiveEvents( + ctx core.StreamReceiveEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + subConfig := ctx.SubscriptionEventConfiguration() + + // Bypass handler if it's not the right subscription + if subConfig.RootFieldName() != "employeeUpdated" { + return events, nil + } + + // And / or you can decide to process events only from a specific provider configured in the Router + if subConfig.ProviderID() != "my-nats" { + return events, nil + } + + + // Your specific event processing logic here + // ... + + return datasource.NewStreamEvents(processedEvents), nil +} +``` + +### Filter out events based on clients authentication token claim + +You can use `ctx.Authentication()` to access authentication data, such as tokens, if available. +Based on that you can filter events, if the token misses the proper claim. + +```go +func (m *EventFilterModule) OnReceiveEvents( + ctx core.StreamReceiveEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + auth := ctx.Authentication() + + // If no authentication, block all events + if auth == nil { + return datasource.NewStreamEvents(nil), + errors.New("No authentication present, closing subscription") + } + + // Check JWT claims + claims := auth.Claims() + if claims == nil { + return datasource.NewStreamEvents(nil), + errors.New("No claims present, closing subscription") + } + + // Check for admin role claim + roleClaimValue, hasRole := claims["role"] + if !hasRole { + logger.Debug("No role claim, blocking all events") + return datasource.NewStreamEvents(nil), nil + } + + userRole, ok := roleClaimValue.(string) + if !ok || userRole != "admin" { + logger.Debug("User is not admin, blocking all events", + zap.Any("role", roleClaimValue), + ) + return datasource.NewStreamEvents(nil), nil + } + + // User is admin - pass all events through + logger.Debug("Admin user authorized, passing all events", + zap.Int("event_count", events.Len()), + ) + return events, nil +} +``` + +### Filter out events based on message metadata + +Certain providers enrich their messages with metadata accessible by the Router. +Kafka and NATS, for example, have the option to add headers to messages. +Here's an example that filters out all messages coming from a Kafka broker where a header indicates +it's not meant for GraphQL subscriptions. + +```go +import ( + "github.com/wundergraph/cosmo/router/pkg/pubsub/kafka" +) + +func (m *HeaderFilterModule) OnReceiveEvents( + ctx core.StreamReceiveEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + + // Only process events from Kafka providers. + // Pass through unchanged for non-Kafka providers. + if ctx.SubscriptionEventConfiguration().ProviderType() != datasource.ProviderTypeKafka { + return events, nil + } + + // Optionally validate specific provider ID or subscription field + logger.Debug("Processing Kafka events for subscription", + zap.String("provider_id", subConfig.ProviderID()), + zap.String("field_name", subConfig.RootFieldName()), + zap.Int("event_count", events.Len()), + ) + + filteredEvents := make([]datasource.StreamEvent, 0, events.Len()) + + for _, event := range events.All() { + // Check if this is a Kafka event with headers + if kafkaEvent, ok := event.(*kafka.Event); ok { + headers := kafkaEvent.GetHeaders() + + // Filter out events with "internal" header set to "true" + if internalHeader, exists := headers["internal"]; exists { + if string(internalHeader) == "true" { + logger.Debug("Filtering out internal event") + continue + } + } + } + + // Include this event in the results + filteredEvents = append(filteredEvents, event) + } + + logger.Debug("Filtered events by headers", + zap.Int("original_count", events.Len()), + zap.Int("filtered_count", len(filteredEvents)), + ) + + return datasource.NewStreamEvents(filteredEvents), nil +} +``` \ No newline at end of file diff --git a/docs/router/cosmo-streams/custom-modules/subscription-on-start.mdx b/docs/router/cosmo-streams/custom-modules/subscription-on-start.mdx new file mode 100644 index 00000000..b5a7c0b9 --- /dev/null +++ b/docs/router/cosmo-streams/custom-modules/subscription-on-start.mdx @@ -0,0 +1,219 @@ +--- +title: "SubscriptionOnStart Handler" +description: "A Cosmo Streams Custom Module, which lets you customize subscription initialization behavior" +icon: "circle-play" +--- + +The `SubscriptionOnStart` handler is a custom module hook that allows you to intercept and customize the initialization of GraphQL subscriptions. +This handler is called once when a subscription starts, giving you the opportunity to validate permissions, send initial events, or perform setup logic. + +This handler is particularly useful for: +- **Subscription authentication**: Validate JWT tokens or user permissions before allowing subscriptions +- **Initial event delivery**: Send welcome messages or current state to new subscribers +- **Subscription logging**: Track subscription attempts and user behavior +- **Connection validation**: Ensure clients meet specific criteria before subscribing +- **Rate limiting**: Control subscription attempts per user or client +- **State initialization**: Initialize state used by other handlers such as `OnReceiveEvents` or `OnPublishEvents` of the same module + +## Handler Interface + +In order to use the `SubscriptionOnStart` handler you need to create a [Custom Module](../../custom-modules) which implements the `SubscriptionOnStartHandler` interface. + +```go +type SubscriptionOnStartHandler interface { + // SubscriptionOnStart is called once at subscription start + // The error is propagated to the client. + SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error +} + +type SubscriptionOnStartHandlerContext interface { + // Request is the original request received by the router. + Request() *http.Request + // Logger is the logger for the request + Logger() *zap.Logger + // Operation is the GraphQL operation + Operation() OperationContext + // Authentication is the authentication for the request + Authentication() authentication.Authentication + // SubscriptionEventConfiguration is the subscription event configuration (will return nil for engine subscription) + SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration + // WriteEvent writes an event to the stream of the current subscription + // It returns true if the event was written to the stream, false if the event was dropped + WriteEvent(event datasource.StreamEvent) bool + // NewEvent creates a new event that can be used in the subscription. + NewEvent(data []byte) datasource.MutableStreamEvent +} +``` + +## Error Handling + +When you return an error from the `SubscriptionOnStart` handler, the router responds to the client with an error event and closes the subscription. +You can choose to log a generic error or a custom error response with more details for the client. + +```go +return errors.New("my handler error") +``` + +This will result in an internal server error response to the client. + +```json +{ + "errors": [ + { + "message": "Internal server error" + } + ] +} +``` + +Whereas you can return a custom error response with more details for the client. + +```go +return core.NewHttpGraphqlError( + "my graphql error", + http.StatusText(http.StatusForbidden), + http.StatusForbidden +) +``` + +This will result in a error response with more details for the client. + +```json +{ + "errors": [ + { + "message": "my graphql error", + "extensions": { + "statusCode": 403, + "code": "Forbidden" + } + } + ] +} +``` + + +Errors are not logged automatically by the router. If you need the error to be logged, you can use `ctx.Logger()` to log the error yourself. + + +## Usage Example + +### Complete Custom Module with Event Bypass + +The following example demonstrates how to register a passive `SubscriptionOnStart` handler that logs subscription attempts but allows all subscriptions to proceed normally. + +```go +package module + +import ( + "github.com/wundergraph/cosmo/router/core" + "go.uber.org/zap" +) + +func init() { + // Register your module with the router + core.RegisterModule(&SubscriptionStartModule{}) +} + +const ModuleID = "subscriptionStartModule" + +// SubscriptionStartModule demonstrates a passive subscription start handler +type SubscriptionStartModule struct{} + +// SubscriptionOnStart logs subscription attempts and allows them to proceed +func (m *SubscriptionStartModule) SubscriptionOnStart( + ctx core.SubscriptionOnStartHandlerContext, +) error { + logger := ctx.Logger() + config := ctx.SubscriptionEventConfiguration() + + // Log subscription details + logger.Info("Subscription started", + zap.String("field_name", config.RootFieldName()), + zap.String("provider_id", config.ProviderID()), + zap.String("provider_type", string(config.ProviderType())), + ) + + // Allow subscription to proceed + return nil +} + +// Module returns the module information for registration +func (m *SubscriptionStartModule) Module() core.ModuleInfo { + return core.ModuleInfo{ + ID: ModuleID, + New: func() core.Module { + return &SubscriptionStartModule{} + }, + } +} + +// Interface guards to ensure we implement the required interfaces +var ( + _ core.SubscriptionOnStartHandler = (*SubscriptionStartModule)(nil) +) +``` + +### Return initial events + +You can use `ctx.WriteEvent()` to send initial or welcome events to subscribers immediately when they connect. +This is useful for providing current state or welcome messages. + +```go +func (m *SubscriptionStartModule) SubscriptionOnStart( + ctx core.SubscriptionOnStartHandlerContext, +) error { + // Bypass the handler on other subscriptions + if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdated" { + return nil + } + + // Create an initial event with minimal required fields + // The router will resolve all other fields requested by the subscriber + initialEventData := `{ "__typename": "Employee", "id": 1 }` + initialEvent := ctx.NewEvent([]byte(initialEventData)) + + success := ctx.WriteEvent(initialEvent) + if !success { + ctx.Logger().Warn("Failed to send initial event to subscriber") + } + + return nil +} +``` + +The payload data used to create a new event has to follow a specific format. +It has to be a valid JSON object that contains the `__typename` field to identify the entity type we want to return +for this subscription. The other field in this case is `id`, which represents the entity key of `Employee` types as defined in the schema. +The router will use this information to resolve all fields requested by the subscriber to generate a complete response. + +### Prevent subscriptions on missing token claims + +This example validates JWT tokens and blocks subscriptions for users without the required "role" claim, demonstrating proper authentication enforcement. + +```go +func (m *SubscriptionStartModule) SubscriptionOnStart(ctx core.SubscriptionOnStartHandlerContext) error { + // Only check "employeeUpdated" subscription + if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdated" { + return nil + } + + auth := ctx.Authentication() + if auth == nil { + return core.NewHttpGraphqlError("unauthorized", http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + } + + // Check for specific "admin" role + roleValue, hasRole := auth.Claims()["role"] + if !hasRole { + return core.NewHttpGraphqlError("missing role claim", http.StatusText(http.StatusForbidden), http.StatusForbidden) + } + + role, ok := roleValue.(string) + if !ok || role != "admin" { + return core.NewHttpGraphqlError("admin role required", http.StatusText(http.StatusForbidden), http.StatusForbidden) + } + + return nil +} +``` diff --git a/docs/router/event-driven-federated-subscriptions-edfs/kafka.mdx b/docs/router/cosmo-streams/kafka.mdx similarity index 89% rename from docs/router/event-driven-federated-subscriptions-edfs/kafka.mdx rename to docs/router/cosmo-streams/kafka.mdx index f24ecc40..cca125d2 100644 --- a/docs/router/event-driven-federated-subscriptions-edfs/kafka.mdx +++ b/docs/router/cosmo-streams/kafka.mdx @@ -1,10 +1,10 @@ --- title: "Kafka" icon: "sitemap" -descripton: "Kafka event provider support for EDFS" +descripton: "Kafka event provider support for Cosmo Streams" --- - + @@ -18,10 +18,10 @@ descripton: "Kafka event provider support for EDFS" ## Full schema example -Here is a comprehensive example of how to use Kafka with EDFS. This guide covers publish, subscribe, and the filter directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the EDFS schema contract and must not be modified. +Here is a comprehensive example of how to use Kafka with Cosmo Streams. This guide covers publish, subscribe, and the filter directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the Cosmo Streams schema contract and must not be modified. ```js -# EDFS +# Cosmo Streams directive @edfs__kafkaPublish(topic: String!, providerId: String! = "default") on FIELD_DEFINITION directive @edfs__kafkaSubscribe(topics: [String!]!, providerId: String! = "default") on FIELD_DEFINITION diff --git a/docs/router/event-driven-federated-subscriptions-edfs/nats.mdx b/docs/router/cosmo-streams/nats.mdx similarity index 88% rename from docs/router/event-driven-federated-subscriptions-edfs/nats.mdx rename to docs/router/cosmo-streams/nats.mdx index 56f72782..a4b579b6 100644 --- a/docs/router/event-driven-federated-subscriptions-edfs/nats.mdx +++ b/docs/router/cosmo-streams/nats.mdx @@ -2,11 +2,11 @@ title: "NATS" sidebarTitle: Overview icon: circle-info -description: "NATS event provider support for EDFS" +description: "NATS event provider support for Cosmo Streams" --- - + @@ -20,10 +20,10 @@ description: "NATS event provider support for EDFS" ## Full schema example -Here is a comprehensive example of how to use NATS with EDFS. This guide covers request, publish, subscribe directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the EDFS schema contract and must not be modified. +Here is a comprehensive example of how to use NATS with Cosmo Streams. This guide covers request, publish, subscribe directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the Cosmo Streams schema contract and must not be modified. ```js -# EDFS +# Cosmo Streams directive @edfs__natsRequest(subject: String!, providerId: String! = "default") on FIELD_DEFINITION directive @edfs__natsPublish(subject: String!, providerId: String! = "default") on FIELD_DEFINITION diff --git a/docs/router/event-driven-federated-subscriptions-edfs/nats/stream-and-consumer-configuration.mdx b/docs/router/cosmo-streams/nats/stream-and-consumer-configuration.mdx similarity index 100% rename from docs/router/event-driven-federated-subscriptions-edfs/nats/stream-and-consumer-configuration.mdx rename to docs/router/cosmo-streams/nats/stream-and-consumer-configuration.mdx diff --git a/docs/router/event-driven-federated-subscriptions-edfs/redis.mdx b/docs/router/cosmo-streams/redis.mdx similarity index 89% rename from docs/router/event-driven-federated-subscriptions-edfs/redis.mdx rename to docs/router/cosmo-streams/redis.mdx index 1be6d7a1..e5141864 100644 --- a/docs/router/event-driven-federated-subscriptions-edfs/redis.mdx +++ b/docs/router/cosmo-streams/redis.mdx @@ -1,10 +1,10 @@ --- title: "Redis" icon: "sitemap" -descripton: "Redis event provider support for EDFS" +descripton: "Redis event provider support for Cosmo Streams" --- - + ![](/images/router/event-driven-federated-subscriptions-edfs/redis1.png) @@ -18,10 +18,10 @@ descripton: "Redis event provider support for EDFS" ## Full schema example -Here is a comprehensive example of how to use Redis with EDFS. This guide covers publish, subscribe, and the filter directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the EDFS schema contract and must not be modified. +Here is a comprehensive example of how to use Redis with Cosmo Streams. This guide covers publish, subscribe, and the filter directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the Cosmo Streams schema contract and must not be modified. ```js -# EDFS +# Cosmo Streams directive @edfs__redisPublish(channel: String!, providerId: String! = "default") on FIELD_DEFINITION directive @edfs__redisSubscribe(channels: [String!]!, providerId: String! = "default") on FIELD_DEFINITION diff --git a/docs/router/event-driven-federated-subscriptions-edfs/custom-modules.mdx b/docs/router/event-driven-federated-subscriptions-edfs/custom-modules.mdx deleted file mode 100644 index 176aa69e..00000000 --- a/docs/router/event-driven-federated-subscriptions-edfs/custom-modules.mdx +++ /dev/null @@ -1,393 +0,0 @@ ---- -title: "Custom Modules" -description: "Customize Streams behavior with powerful hooks for subscription lifecycle, event processing, and data transformation." -icon: "cubes" ---- - -Cosmo Router provides powerful hooks for customizing Custom Streams (a.k.a. Event-Driven Federated Subscriptions, or Cosmo Streams) behavior. These hooks allow you to implement custom logic for subscription lifecycle management, event processing, and data transformation. - -## Available Hooks - -The Cosmo Streams system provides three main hook interfaces that you can implement in your custom modules: - -- `SubscriptionOnStartHandler`: Called once at subscription start -- `StreamReceiveEventHandler`: Triggered for each client/subscription when a batch of events is received from the provider, prior to delivery -- `StreamPublishEventHandler`: Called each time a batch of events is going to be sent to the provider - -## Hook Interfaces - -### SubscriptionOnStartHandler - -This hook is called once when a subscription starts, allowing you to implement custom logic such as authorization checks or initial message sending. - -```go -type SubscriptionOnStartHandler interface { - // OnSubscriptionOnStart is called once at subscription start - // Returning an error will result in a GraphQL error being returned to the client - SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error -} -``` - -**Use cases:** -- Authorization checks at subscription start -- Sending initial messages to clients -- Validating subscription parameters - -### StreamReceiveEventHandler - -This hook is triggered for each client/subscription when a batch of events is received from the provider, before delivering them to the client. - -```go -type StreamReceiveEventHandler interface { - // OnReceiveEvents is called each time a batch of events is received from the provider before delivering them to the client - // So for a single batch of events received from the provider, this hook will be called one time for each active subscription. - // It is important to optimize the logic inside this hook to avoid performance issues. - // Returning an error will result in a GraphQL error being returned to the client - OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events []StreamEvent) ([]StreamEvent, error) -} -``` - -**Use cases:** -- Event filtering based on client permissions -- Data transformation and mapping -- Event validation and sanitization - - -The `StreamReceiveEventHandler` is called for each active subscription when events are received, so optimize your logic to avoid performance issues. Even small inefficiencies can lead to significant delays when many subscriptions are active. - - -### StreamPublishEventHandler - -This hook is called each time a batch of events is going to be sent to the provider. - -```go -type StreamPublishEventHandler interface { - // OnPublishEvents is called each time a batch of events is going to be sent to the provider - // Returning an error will result in an error being returned and the client will see the mutation failing - OnPublishEvents(ctx StreamPublishEventHandlerContext, events []StreamEvent) ([]StreamEvent, error) -} -``` - -**Use cases:** -- Data transformation before publishing -- Event validation -- Adding metadata to events - -## Context Interfaces - -Each hook provides a rich context interface that gives you access to request information, authentication, and configuration: - -### SubscriptionOnStartHandlerContext - -```go -type SubscriptionOnStartHandlerContext interface { - // Request is the original request received by the router. - Request() *http.Request - // Logger is the logger for the request - Logger() *zap.Logger - // Operation is the GraphQL operation - Operation() OperationContext - // Authentication is the authentication for the request - Authentication() authentication.Authentication - // SubscriptionEventConfiguration is the subscription event configuration - SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration - // WriteEvent writes an event to the stream of the current subscription - // It returns true if the event was written to the stream, false if the event was dropped - WriteEvent(event datasource.StreamEvent) bool -} -``` - -### StreamReceiveEventHandlerContext - -```go -type StreamReceiveEventHandlerContext interface { - // Request is the initial client request that started the subscription - Request() *http.Request - // Logger is the logger for the request - Logger() *zap.Logger - // Operation is the GraphQL operation - Operation() OperationContext - // Authentication is the authentication for the request - Authentication() authentication.Authentication - // SubscriptionEventConfiguration is the subscription event configuration - SubscriptionEventConfiguration() SubscriptionEventConfiguration -} -``` - -### StreamPublishEventHandlerContext - -```go -type StreamPublishEventHandlerContext interface { - // Request is the original request received by the router. - Request() *http.Request - // Logger is the logger for the request - Logger() *zap.Logger - // Operation is the GraphQL operation - Operation() OperationContext - // Authentication is the authentication for the request - Authentication() authentication.Authentication - // PublishEventConfiguration is the publish event configuration - PublishEventConfiguration() PublishEventConfiguration -} -``` - -## Core Types - -### StreamEvent Interface - -The `StreamEvent` interface allows the hooks system to be provider-agnostic: - -```go -type StreamEvent interface { - GetData() []byte -} -``` - -Each provider (NATS, Kafka, Redis) will have its own event type with custom fields, but they all implement this common interface. - -### OperationContext - -The `OperationContext` provides access to GraphQL operation information: - -```go -type OperationContext interface { - Name() string - // the variables are currently not available, so we need to expose them here - Variables() *astjson.Value -} -``` - -### Configuration Interfaces - -#### SubscriptionEventConfiguration - -```go -type SubscriptionEventConfiguration interface { - ProviderID() string - ProviderType() string - // the root field name of the subscription in the schema - RootFieldName() string -} -``` - -#### PublishEventConfiguration - -```go -type PublishEventConfiguration interface { - ProviderID() string - ProviderType() string - // the root field name of the mutation in the schema - RootFieldName() string -} -``` - -## Example: Authorization and Event Filtering - -Here's a complete example that demonstrates how to implement authorization checks and event filtering: - -```go -package mymodule - -import ( - "encoding/json" - "slices" - "github.com/wundergraph/cosmo/router/core" - "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" - "github.com/wundergraph/cosmo/router/pkg/pubsub/nats" -) - -func init() { - // Register your module here and it will be loaded at router start - core.RegisterModule(&MyModule{}) -} - -type MyModule struct {} - -// Implement SubscriptionOnStartHandler for authorization -func (m *MyModule) SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error { - // Check if the provider is NATS - if ctx.SubscriptionEventConfiguration().ProviderType() != datasource.ProviderTypeNats { - return nil - } - - // Check if the provider ID matches - if ctx.SubscriptionEventConfiguration().ProviderID() != "my-nats" { - return nil - } - - // Check if the subscription is the expected one - if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdates" { - return nil - } - - // Check if the client is authenticated - if ctx.Authentication() == nil { - return core.NewHttpGraphqlError("client is not authenticated", http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) - } - - // Check if the client has the required permissions - clientAllowedEntitiesIds, found := ctx.Authentication().Claims()["readEmployee"] - if !found { - return core.NewHttpGraphqlError( - "client is not allowed to read employees", - http.StatusText(http.StatusForbidden), - http.StatusForbidden - ) - } - - return nil -} - -// Implement StreamReceiveEventHandler for event filtering and transformation -func (m *MyModule) OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events []core.StreamEvent) ([]core.StreamEvent, error) { - // Check if the provider is NATS - if ctx.SubscriptionEventConfiguration().ProviderType() != datasource.ProviderTypeNats { - return events, nil - } - - // Check if the provider ID matches - if ctx.SubscriptionEventConfiguration().ProviderID() != "my-nats" { - return events, nil - } - - // Check if the subscription is the expected one - if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdates" { - return events, nil - } - - newEvents := make([]core.StreamEvent, 0, len(events)) - - // Check if the client is authenticated - if ctx.Authentication() == nil { - // If the client is not authenticated, return no events - return newEvents, nil - } - - // Get client's allowed entity IDs - clientAllowedEntitiesIds, found := ctx.Authentication().Claims()["allowedEntitiesIds"] - if !found { - return newEvents, fmt.Errorf("client is not allowed to subscribe to the stream") - } - - for _, evt := range events { - natsEvent, ok := evt.(*nats.NatsEvent) - if !ok { - newEvents = append(newEvents, evt) - continue - } - - // Decode the event data coming from the provider - var dataReceived struct { - EmployeeId string `json:"EmployeeId"` - OtherField string `json:"OtherField"` - } - err := json.Unmarshal(natsEvent.Data, &dataReceived) - if err != nil { - return events, fmt.Errorf("error unmarshalling data: %w", err) - } - - // Filter events based on client's permissions - if !slices.Contains(clientAllowedEntitiesIds, dataReceived.EmployeeId) { - continue - } - - // Transform the data to match the expected GraphQL schema - var dataToSend struct { - Id string `json:"id"` - TypeName string `json:"__typename"` - } - dataToSend.Id = dataReceived.EmployeeId - dataToSend.TypeName = "Employee" - - // Marshal the transformed data - dataToSendMarshalled, err := json.Marshal(dataToSend) - if err != nil { - return events, fmt.Errorf("error marshalling data: %w", err) - } - - // Create the new event - newEvent := &nats.NatsEvent{ - Data: dataToSendMarshalled, - Metadata: natsEvent.Metadata, - } - newEvents = append(newEvents, newEvent) - } - return newEvents, nil -} - -func (m *MyModule) Module() core.ModuleInfo { - return core.ModuleInfo{ - ID: "myModule", - Priority: 1, - New: func() core.Module { - return &MyModule{} - }, - } -} - -// Interface guards -var ( - _ core.SubscriptionOnStartHandler = (*MyModule)(nil) - _ core.StreamReceiveEventHandler = (*MyModule)(nil) -) -``` - -## Example: Event Publishing with Transformation - -Here's an example of how to transform events before publishing: - -```go -// Implement StreamPublishEventHandler for event transformation -func (m *MyModule) OnPublishEvents(ctx StreamPublishEventHandlerContext, events []StreamEvent) ([]StreamEvent, error) { - // Check if the provider is NATS - if ctx.PublishEventConfiguration().ProviderType() != datasource.ProviderTypeNats { - return events, nil - } - - // Check if the provider ID matches - if ctx.PublishEventConfiguration().ProviderID() != "my-nats" { - return events, nil - } - - // Check if the mutation is the expected one - if ctx.PublishEventConfiguration().RootFieldName() != "updateEmployee" { - return events, nil - } - - transformedEvents := make([]StreamEvent, 0, len(events)) - - for _, evt := range events { - natsEvent, ok := evt.(*nats.NatsEvent) - if !ok { - transformedEvents = append(transformedEvents, evt) - continue - } - - // Decode the original event data - var originalData map[string]interface{} - err := json.Unmarshal(natsEvent.Data, &originalData) - if err != nil { - return events, fmt.Errorf("error unmarshalling data: %w", err) - } - - // Add metadata or transform the data - originalData["timestamp"] = time.Now().Unix() - originalData["source"] = "cosmo-router" - - // Marshal the transformed data - transformedData, err := json.Marshal(originalData) - if err != nil { - return events, fmt.Errorf("error marshalling transformed data: %w", err) - } - - // Create the transformed event - transformedEvent := &nats.NatsEvent{ - Data: transformedData, - Metadata: natsEvent.Metadata, - } - transformedEvents = append(transformedEvents, transformedEvent) - } - - return transformedEvents, nil -} -``` diff --git a/docs/router/metrics-and-monitoring.mdx b/docs/router/metrics-and-monitoring.mdx index 3d8c3e5e..367cb43e 100644 --- a/docs/router/metrics-and-monitoring.mdx +++ b/docs/router/metrics-and-monitoring.mdx @@ -214,8 +214,8 @@ telemetry: * `router.engine.messages.sent`: The number of total messages for subscriptions sent over from the subgraph to the router. -### EDFS Streams Metrics -We expose metrics for EDFS streams, these statistics are collected at the level when the message is sent to the messaging backend or directly received from the messaging backend. +### Cosmo Streams Metrics +We expose metrics for [Cosmo Streams](/router/cosmo-streams). These statistics are collected at the level when the message is sent to the messaging backend or directly received from the messaging backend. ```yaml config.yaml telemetry: @@ -234,7 +234,7 @@ telemetry: The following attributes are attached to both metrics: * `wg.stream.operation.name`: -This contains the operation type used to send a message to the message backend. This is useful to differentiate when an edfs adapter has multiple ways of sending messages, like in the case of "nats", with `publish`, and `request`. +This contains the operation type used to send a message to the message backend. This is useful to differentiate when a Cosmo Streams adapter has multiple ways of sending messages, like in the case of "nats", with `publish`, and `request`. The following values are possible, based on the messaging backend - nats: `publish`, `request`, `receive` @@ -244,7 +244,7 @@ The following values are possible, based on the messaging backend - redis: `publish`, `receive` * `wg.provider.type`: -One of the supported edfs provider types, which includes `kafka`, `nats`, `redis` +One of the supported Cosmo Streams provider types, which includes `kafka`, `nats`, `redis` * `wg.destination.name`: The name of the destination of the messaging backend (topic, queue, etc) @@ -438,8 +438,8 @@ Here you can see a few example queries to query useful information about your cl ``` - - EDFS stream metrics contain only two metrics. To make sense of your data you need to filter by the attributes. The following examples give you a basic idea of how to use these two metrics. + + Cosmo Streams metrics contain only two metrics. To make sense of your data you need to filter by the attributes. The following examples give you a basic idea of how to use these two metrics. #### Get failed publishes for a message broker Let's say we want to see any failed publishes to our kafka broker. We can use the following query, diff --git a/docs/router/metrics-and-monitoring/prometheus-metric-reference.mdx b/docs/router/metrics-and-monitoring/prometheus-metric-reference.mdx index 76424243..9eb4f429 100644 --- a/docs/router/metrics-and-monitoring/prometheus-metric-reference.mdx +++ b/docs/router/metrics-and-monitoring/prometheus-metric-reference.mdx @@ -122,8 +122,8 @@ telemetry: * [`router_engine_messages_sent_total`](#router-engine-messages-sent-total): The number of total messages for subscriptions sent over from the subgraph to the router. -### EDFS Streams Metrics -We expose metrics for EDFS streams, these statistics are collected at the level when the message is sent to the messaging backend or directly received from the messaging backend. +### Cosmo Streams Metrics +We expose metrics for [Cosmo Streams](/router/cosmo-streams), these statistics are collected at the level when the message is sent to the messaging backend or directly received from the messaging backend. ```yaml config.yaml telemetry: @@ -140,7 +140,7 @@ telemetry: The following attributes are attached to both metrics: * `wg_stream_operation_name`: -This contains the operation type used to send a message to the message backend. This is useful to differentiate when an edfs adapter has multiple ways of sending messages, like in the case of "nats", with `publish`, and `request`. +This contains the operation type used to send a message to the message backend. This is useful to differentiate when a Cosmo Streams adapter has multiple ways of sending messages, like in the case of "nats", with `publish`, and `request`. The following values are possible, based on the messaging backend - nats: `publish`, `request`, `receive` @@ -150,7 +150,7 @@ The following values are possible, based on the messaging backend - redis: `publish`, `receive` * `wg_provider_type`: -One of the supported edfs provider types, which includes `kafka`, `nats`, `redis` +One of the supported Cosmo Streams provider types, which includes `kafka`, `nats`, `redis` * `wg_destination_name`: The name of the destination of the messaging backend (topic, queue, etc)