Skip to content

Commit 481329c

Browse files
authored
Shadow pipelines (#1948)
1 parent df2c91d commit 481329c

File tree

11 files changed

+90
-13
lines changed

11 files changed

+90
-13
lines changed

cli/cmd/lib_traffic_splitters.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,14 @@ func trafficSplitTable(trafficSplitter schema.APIResponse, env cliconfig.Environ
7373

7474
apiRes := apisRes[0]
7575
lastUpdated := time.Unix(apiRes.Spec.LastUpdated, 0)
76+
77+
apiName := apiRes.Spec.Name
78+
if api.Shadow {
79+
apiName += " (shadow)"
80+
}
7681
rows = append(rows, []interface{}{
7782
env.Name,
78-
apiRes.Spec.Name,
83+
apiName,
7984
api.Weight,
8085
apiRes.Status.Message(),
8186
apiRes.Status.Requested,
@@ -108,7 +113,11 @@ func trafficSplitterListTable(trafficSplitter []schema.APIResponse, envNames []s
108113
lastUpdated := time.Unix(splitAPI.Spec.LastUpdated, 0)
109114
var apis []string
110115
for _, api := range splitAPI.Spec.APIs {
111-
apis = append(apis, api.Name+":"+s.Int32(api.Weight))
116+
apiName := api.Name
117+
if api.Shadow {
118+
apiName += " (shadow)"
119+
}
120+
apis = append(apis, apiName+":"+s.Int32(api.Weight))
112121
}
113122
apisStr := s.TruncateEllipses(strings.Join(apis, " "), 50)
114123
rows = append(rows, []interface{}{

docs/workloads/realtime/traffic-splitter/configuration.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@
77
endpoint: <string> # the endpoint for the Traffic Splitter (default: <name>)
88
apis: # list of Realtime APIs to target
99
- name: <string> # name of a Realtime API that is already running or is included in the same configuration file (required)
10-
weight: <int> # percentage of traffic to route to the Realtime API (all weights must sum to 100) (required)
10+
weight: <int> # percentage of traffic to route to the Realtime API (all non-shadow weights must sum to 100) (required)
11+
shadow: <bool> # duplicate incoming traffic and send fire-and-forget to this api (only one shadow per traffic splitter) (default: false)
1112
```

pkg/lib/k8s/virtual_service.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,34 @@ type Destination struct {
5050
ServiceName string
5151
Weight int32
5252
Port uint32
53+
Shadow bool
5354
}
5455

5556
func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualService {
5657
destinations := []*istionetworking.HTTPRouteDestination{}
58+
var mirror *istionetworking.Destination
59+
var mirrorWeight *istionetworking.Percent
60+
5761
for _, destination := range spec.Destinations {
58-
destinations = append(destinations, &istionetworking.HTTPRouteDestination{
59-
Destination: &istionetworking.Destination{
62+
if destination.Shadow {
63+
mirror = &istionetworking.Destination{
6064
Host: destination.ServiceName,
6165
Port: &istionetworking.PortSelector{
6266
Number: destination.Port,
6367
},
64-
},
65-
Weight: destination.Weight,
66-
})
68+
}
69+
mirrorWeight = &istionetworking.Percent{Value: float64(destination.Weight)}
70+
} else {
71+
destinations = append(destinations, &istionetworking.HTTPRouteDestination{
72+
Destination: &istionetworking.Destination{
73+
Host: destination.ServiceName,
74+
Port: &istionetworking.PortSelector{
75+
Number: destination.Port,
76+
},
77+
},
78+
Weight: destination.Weight,
79+
})
80+
}
6781
}
6882

6983
var httpRoutes []*istionetworking.HTTPRoute
@@ -79,7 +93,9 @@ func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualServ
7993
},
8094
},
8195
},
82-
Route: destinations,
96+
Route: destinations,
97+
Mirror: mirror,
98+
MirrorPercentage: mirrorWeight,
8399
})
84100

85101
if spec.Rewrite != nil {
@@ -98,7 +114,9 @@ func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualServ
98114
},
99115
},
100116
},
101-
Route: destinations,
117+
Route: destinations,
118+
Mirror: mirror,
119+
MirrorPercentage: mirrorWeight,
102120
}
103121

104122
prefixMatch := &istionetworking.HTTPRoute{
@@ -111,7 +129,9 @@ func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualServ
111129
},
112130
},
113131
},
114-
Route: destinations,
132+
Route: destinations,
133+
Mirror: mirror,
134+
MirrorPercentage: mirrorWeight,
115135
}
116136

117137
if spec.Rewrite != nil {

pkg/operator/resources/trafficsplitter/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ func getTrafficSplitterDestinations(trafficSplitter *spec.API) []k8s.Destination
112112
ServiceName: operator.K8sName(api.Name),
113113
Weight: api.Weight,
114114
Port: uint32(_defaultPortInt32),
115+
Shadow: api.Shadow,
115116
}
116117
}
117118
return destinations

pkg/types/spec/errors.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ const (
9191
ErrConcurrencyMismatchServerSideBatchingPython = "spec.concurrency_mismatch_server_side_batching_python"
9292
ErrIncorrectTrafficSplitterWeight = "spec.incorrect_traffic_splitter_weight"
9393
ErrTrafficSplitterAPIsNotUnique = "spec.traffic_splitter_apis_not_unique"
94+
ErrOneShadowPerTrafficSplitter = "spec.one_shadow_per_traffic_splitter"
9495
ErrUnexpectedDockerSecretData = "spec.unexpected_docker_secret_data"
9596
)
9697

@@ -593,7 +594,7 @@ func ErrorConcurrencyMismatchServerSideBatchingPython(maxBatchsize int32, thread
593594
func ErrorIncorrectTrafficSplitterWeightTotal(totalWeight int32) error {
594595
return errors.WithStack(&errors.Error{
595596
Kind: ErrIncorrectTrafficSplitterWeight,
596-
Message: fmt.Sprintf("expected weights to sum to 100 but found %d", totalWeight),
597+
Message: fmt.Sprintf("expected weights of all non-shadow apis to sum to 100 but found %d", totalWeight),
597598
})
598599
}
599600

@@ -604,6 +605,13 @@ func ErrorTrafficSplitterAPIsNotUnique(names []string) error {
604605
})
605606
}
606607

608+
func ErrorOneShadowPerTrafficSplitter() error {
609+
return errors.WithStack(&errors.Error{
610+
Kind: ErrOneShadowPerTrafficSplitter,
611+
Message: fmt.Sprintf("multiple shadow apis detected; only one api is allowed to be marked as a shadow"),
612+
})
613+
}
614+
607615
var _pwRegex = regexp.MustCompile(`"password":"[^"]+"`)
608616
var _authRegex = regexp.MustCompile(`"auth":"[^"]+"`)
609617

pkg/types/spec/utils.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,9 @@ func getModelVersionsFromPaths(paths []string, prefix string) []string {
512512
func verifyTotalWeight(apis []*userconfig.TrafficSplit) error {
513513
totalWeight := int32(0)
514514
for _, api := range apis {
515-
totalWeight += api.Weight
515+
if !api.Shadow {
516+
totalWeight += api.Weight
517+
}
516518
}
517519
if totalWeight == 100 {
518520
return nil

pkg/types/spec/validations.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ func multiAPIsValidation() *cr.StructFieldValidation {
131131
LessThanOrEqualTo: pointer.Int32(100),
132132
},
133133
},
134+
{
135+
StructField: "Shadow",
136+
BoolValidation: &cr.BoolValidation{},
137+
},
134138
},
135139
},
136140
},
@@ -846,6 +850,16 @@ func ValidateTrafficSplitter(api *userconfig.API) error {
846850
return err
847851
}
848852

853+
hasShadow := false
854+
for _, api := range api.APIs {
855+
if api.Shadow {
856+
if hasShadow {
857+
return ErrorOneShadowPerTrafficSplitter()
858+
}
859+
hasShadow = true
860+
}
861+
}
862+
849863
return nil
850864
}
851865

pkg/types/userconfig/api.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type MultiModels struct {
8686
type TrafficSplit struct {
8787
Name string `json:"name" yaml:"name"`
8888
Weight int32 `json:"weight" yaml:"weight"`
89+
Shadow bool `json:"shadow" yaml:"shadow"`
8990
}
9091

9192
type ModelResource struct {
@@ -386,6 +387,7 @@ func (trafficSplit *TrafficSplit) UserStr() string {
386387
var sb strings.Builder
387388
sb.WriteString(fmt.Sprintf("%s: %s\n", NameKey, trafficSplit.Name))
388389
sb.WriteString(fmt.Sprintf("%s: %s\n", WeightKey, s.Int32(trafficSplit.Weight)))
390+
sb.WriteString(fmt.Sprintf("%s: %s\n", ShadowKey, s.Bool(trafficSplit.Shadow)))
389391
return sb.String()
390392
}
391393

pkg/types/userconfig/config_key.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
// TrafficSplitter
3131
APIsKey = "apis"
3232
WeightKey = "weight"
33+
ShadowKey = "shadow"
3334

3435
// Predictor
3536
TypeKey = "type"

test/apis/traffic-splitter/cortex.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,19 @@
1414
models:
1515
path: s3://cortex-examples/onnx/iris-classifier/
1616

17+
- name: request-recorder
18+
kind: RealtimeAPI
19+
predictor:
20+
type: python
21+
path: request_recorder.py
22+
1723
- name: iris-classifier
1824
kind: TrafficSplitter
1925
apis:
2026
- name: iris-classifier-onnx
2127
weight: 30
2228
- name: iris-classifier-pytorch
2329
weight: 70
30+
- name: request-recorder
31+
shadow: true
32+
weight: 100

0 commit comments

Comments
 (0)