Skip to content

Commit 8b50a2c

Browse files
authored
Allow users to configure shared memory for IPC (#1756)
1 parent 6160591 commit 8b50a2c

File tree

9 files changed

+98
-8
lines changed

9 files changed

+98
-8
lines changed

docs/workloads/batch/configuration.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
image: <string> # docker image to use for the Predictor (default: quay.io/cortexlabs/python-predictor-cpu:master or quay.io/cortexlabs/python-predictor-gpu:master based on compute)
1515
env: <string: string> # dictionary of environment variables
1616
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
17+
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
1718
networking:
1819
endpoint: <string> # the endpoint for the API (default: <api_name>)
1920
api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide)
@@ -50,6 +51,7 @@
5051
tensorflow_serving_image: <string> # docker image to use for the TensorFlow Serving container (default: quay.io/cortexlabs/tensorflow-serving-gpu:master or quay.io/cortexlabs/tensorflow-serving-cpu:master based on compute)
5152
env: <string: string> # dictionary of environment variables
5253
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
54+
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
5355
networking:
5456
endpoint: <string> # the endpoint for the API (default: <api_name>)
5557
api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide)
@@ -80,6 +82,7 @@
8082
image: <string> # docker image to use for the Predictor (default: quay.io/cortexlabs/onnx-predictor-gpu:master or quay.io/cortexlabs/onnx-predictor-cpu:master based on compute)
8183
env: <string: string> # dictionary of environment variables
8284
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
85+
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
8386
networking:
8487
endpoint: <string> # the endpoint for the API (default: <api_name>)
8588
api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide)

docs/workloads/realtime/configuration.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
image: <string> # docker image to use for the Predictor (default: quay.io/cortexlabs/python-predictor-cpu:master or quay.io/cortexlabs/python-predictor-gpu:master based on compute)
2929
env: <string: string> # dictionary of environment variables
3030
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
31+
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
3132
networking:
3233
endpoint: <string> # the endpoint for the API (default: <api_name>)
3334
api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) (aws only)
@@ -85,6 +86,7 @@
8586
tensorflow_serving_image: <string> # docker image to use for the TensorFlow Serving container (default: quay.io/cortexlabs/tensorflow-serving-gpu:master or quay.io/cortexlabs/tensorflow-serving-cpu:master based on compute)
8687
env: <string: string> # dictionary of environment variables
8788
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
89+
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
8890
networking:
8991
endpoint: <string> # the endpoint for the API (default: <api_name>)
9092
api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) (aws only)
@@ -136,6 +138,7 @@
136138
image: <string> # docker image to use for the Predictor (default: quay.io/cortexlabs/onnx-predictor-gpu:master or quay.io/cortexlabs/onnx-predictor-cpu:master based on compute)
137139
env: <string: string> # dictionary of environment variables
138140
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
141+
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
139142
networking:
140143
endpoint: <string> # the endpoint for the API (default: <api_name>)
141144
api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) (aws only)

pkg/operator/operator/k8s.go

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ func PythonPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume
140140
apiPodResourceList["nvidia.com/gpu"] = *kresource.NewQuantity(api.Compute.GPU, kresource.DecimalSI)
141141
apiPodResourceLimitsList["nvidia.com/gpu"] = *kresource.NewQuantity(api.Compute.GPU, kresource.DecimalSI)
142142
}
143-
144143
} else {
145144
volumes = append(volumes, kcore.Volume{
146145
Name: "neuron-sock",
@@ -173,6 +172,22 @@ func PythonPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume
173172
containers = append(containers, neuronContainer)
174173
}
175174

175+
if api.Predictor.ShmSize != nil {
176+
volumes = append(volumes, kcore.Volume{
177+
Name: "dshm",
178+
VolumeSource: kcore.VolumeSource{
179+
EmptyDir: &kcore.EmptyDirVolumeSource{
180+
Medium: kcore.StorageMediumMemory,
181+
SizeLimit: k8s.QuantityPtr(api.Predictor.ShmSize.Quantity),
182+
},
183+
},
184+
})
185+
apiPodVolumeMounts = append(apiPodVolumeMounts, kcore.VolumeMount{
186+
Name: "dshm",
187+
MountPath: "/dev/shm",
188+
})
189+
}
190+
176191
containers = append(containers, kcore.Container{
177192
Name: APIContainerName,
178193
Image: api.Predictor.Image,
@@ -262,6 +277,22 @@ func TensorFlowPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Vo
262277
containers = append(containers, neuronContainer)
263278
}
264279

280+
if api.Predictor.ShmSize != nil {
281+
volumes = append(volumes, kcore.Volume{
282+
Name: "dshm",
283+
VolumeSource: kcore.VolumeSource{
284+
EmptyDir: &kcore.EmptyDirVolumeSource{
285+
Medium: kcore.StorageMediumMemory,
286+
SizeLimit: k8s.QuantityPtr(api.Predictor.ShmSize.Quantity),
287+
},
288+
},
289+
})
290+
volumeMounts = append(volumeMounts, kcore.VolumeMount{
291+
Name: "dshm",
292+
MountPath: "/dev/shm",
293+
})
294+
}
295+
265296
containers = append(containers, kcore.Container{
266297
Name: APIContainerName,
267298
Image: api.Predictor.Image,
@@ -294,9 +325,11 @@ func TensorFlowPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Vo
294325
return containers, volumes
295326
}
296327

297-
func ONNXPredictorContainers(api *spec.API) []kcore.Container {
328+
func ONNXPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume) {
298329
resourceList := kcore.ResourceList{}
299330
resourceLimitsList := kcore.ResourceList{}
331+
apiPodVolumeMounts := defaultVolumeMounts()
332+
volumes := DefaultVolumes()
300333
containers := []kcore.Container{}
301334

302335
if api.Compute.CPU != nil {
@@ -316,13 +349,29 @@ func ONNXPredictorContainers(api *spec.API) []kcore.Container {
316349
resourceLimitsList["nvidia.com/gpu"] = *kresource.NewQuantity(api.Compute.GPU, kresource.DecimalSI)
317350
}
318351

352+
if api.Predictor.ShmSize != nil {
353+
volumes = append(volumes, kcore.Volume{
354+
Name: "dshm",
355+
VolumeSource: kcore.VolumeSource{
356+
EmptyDir: &kcore.EmptyDirVolumeSource{
357+
Medium: kcore.StorageMediumMemory,
358+
SizeLimit: k8s.QuantityPtr(api.Predictor.ShmSize.Quantity),
359+
},
360+
},
361+
})
362+
apiPodVolumeMounts = append(apiPodVolumeMounts, kcore.VolumeMount{
363+
Name: "dshm",
364+
MountPath: "/dev/shm",
365+
})
366+
}
367+
319368
containers = append(containers, kcore.Container{
320369
Name: APIContainerName,
321370
Image: api.Predictor.Image,
322371
ImagePullPolicy: kcore.PullAlways,
323372
Env: getEnvVars(api, APIContainerName),
324373
EnvFrom: baseEnvVars(),
325-
VolumeMounts: defaultVolumeMounts(),
374+
VolumeMounts: apiPodVolumeMounts,
326375
ReadinessProbe: FileExistsProbe(_apiReadinessFile),
327376
LivenessProbe: _apiLivenessProbe,
328377
Lifecycle: nginxGracefulStopper(api.Kind),
@@ -338,7 +387,7 @@ func ONNXPredictorContainers(api *spec.API) []kcore.Container {
338387
},
339388
})
340389

341-
return containers
390+
return containers, volumes
342391
}
343392

344393
func getEnvVars(api *spec.API, container string) []kcore.EnvVar {

pkg/operator/resources/batchapi/k8s_specs.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func tensorFlowPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, erro
144144
}
145145

146146
func onnxPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, error) {
147-
containers := operator.ONNXPredictorContainers(api)
147+
containers, volumes := operator.ONNXPredictorContainers(api)
148148

149149
for i, container := range containers {
150150
if container.Name == operator.APIContainerName {
@@ -186,7 +186,7 @@ func onnxPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, error) {
186186
"workload": "true",
187187
},
188188
Tolerations: operator.Tolerations,
189-
Volumes: operator.DefaultVolumes(),
189+
Volumes: volumes,
190190
ServiceAccountName: "default",
191191
},
192192
},

pkg/operator/resources/realtimeapi/k8s_specs.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func pythonAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deplo
152152
}
153153

154154
func onnxAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deployment {
155-
containers := operator.ONNXPredictorContainers(api)
155+
containers, volumes := operator.ONNXPredictorContainers(api)
156156

157157
if config.Provider == types.AWSProviderType {
158158
containers = append(containers, operator.RequestMonitorContainer(api))
@@ -196,7 +196,7 @@ func onnxAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deploym
196196
"workload": "true",
197197
},
198198
Tolerations: operator.Tolerations,
199-
Volumes: operator.DefaultVolumes(),
199+
Volumes: volumes,
200200
ServiceAccountName: "default",
201201
},
202202
},

pkg/types/spec/errors.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/cortexlabs/cortex/pkg/consts"
2424
"github.com/cortexlabs/cortex/pkg/lib/errors"
2525
"github.com/cortexlabs/cortex/pkg/lib/files"
26+
"github.com/cortexlabs/cortex/pkg/lib/k8s"
2627
libmath "github.com/cortexlabs/cortex/pkg/lib/math"
2728
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
2829
s "github.com/cortexlabs/cortex/pkg/lib/strings"
@@ -52,6 +53,8 @@ const (
5253

5354
ErrModelCachingNotSupportedWhenMultiprocessingEnabled = "spec.model_caching_not_supported_when_multiprocessing_enabled"
5455

56+
ErrShmSizeCannotExceedMem = "spec.shm_size_cannot_exceed_mem"
57+
5558
ErrFileNotFound = "spec.file_not_found"
5659
ErrDirIsEmpty = "spec.dir_is_empty"
5760
ErrMustBeRelativeProjectPath = "spec.must_be_relative_project_path"
@@ -228,6 +231,13 @@ func ErrorSurgeAndUnavailableBothZero() error {
228231
})
229232
}
230233

234+
func ErrorShmSizeCannotExceedMem(shmSize k8s.Quantity, mem k8s.Quantity) error {
235+
return errors.WithStack(&errors.Error{
236+
Kind: ErrShmSizeCannotExceedMem,
237+
Message: fmt.Sprintf("predictor.shm_size (%s) cannot exceed compute.mem (%s)", shmSize.UserString, mem.UserString),
238+
})
239+
}
240+
231241
func ErrorModelCachingNotSupportedWhenMultiprocessingEnabled(desiredProcesses int32) error {
232242
const maxNumProcesses int32 = 1
233243
return errors.WithStack(&errors.Error{

pkg/types/spec/validations.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,14 @@ func predictorValidation() *cr.StructFieldValidation {
203203
GreaterThanOrEqualTo: pointer.Int32(1),
204204
},
205205
},
206+
{
207+
StructField: "ShmSize",
208+
StringPtrValidation: &cr.StringPtrValidation{
209+
Default: nil,
210+
AllowExplicitNull: true,
211+
},
212+
Parser: k8s.QuantityParser(&k8s.QuantityValidation{}),
213+
},
206214
{
207215
StructField: "LogLevel",
208216
StringValidation: &cr.StringValidation{
@@ -795,6 +803,12 @@ func ValidateAPI(
795803
}
796804
}
797805

806+
if api.Predictor != nil && api.Predictor.ShmSize != nil && api.Compute.Mem != nil {
807+
if api.Predictor.ShmSize.Cmp(api.Compute.Mem.Quantity) > 0 {
808+
return ErrorShmSizeCannotExceedMem(*api.Predictor.ShmSize, *api.Compute.Mem)
809+
}
810+
}
811+
798812
return nil
799813
}
800814

pkg/types/userconfig/api.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type Predictor struct {
5353
ServerSideBatching *ServerSideBatching `json:"server_side_batching" yaml:"server_side_batching"`
5454
ProcessesPerReplica int32 `json:"processes_per_replica" yaml:"processes_per_replica"`
5555
ThreadsPerProcess int32 `json:"threads_per_process" yaml:"threads_per_process"`
56+
ShmSize *k8s.Quantity `json:"shm_size" yaml:"shm_size"`
5657
PythonPath *string `json:"python_path" yaml:"python_path"`
5758
LogLevel LogLevel `json:"log_level" yaml:"log_level"`
5859
Image string `json:"image" yaml:"image"`
@@ -370,6 +371,10 @@ func (predictor *Predictor) UserStr() string {
370371
sb.WriteString(fmt.Sprintf("%s: %s\n", ProcessesPerReplicaKey, s.Int32(predictor.ProcessesPerReplica)))
371372
sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerProcessKey, s.Int32(predictor.ThreadsPerProcess)))
372373

374+
if predictor.ShmSize != nil {
375+
sb.WriteString(fmt.Sprintf("%s: %s\n", ShmSize, predictor.ShmSize.UserString))
376+
}
377+
373378
if len(predictor.Config) > 0 {
374379
sb.WriteString(fmt.Sprintf("%s:\n", ConfigKey))
375380
d, _ := yaml.Marshal(&predictor.Config)
@@ -603,6 +608,11 @@ func (api *API) TelemetryEvent(provider types.ProviderType) map[string]interface
603608
event["predictor.type"] = api.Predictor.Type
604609
event["predictor.processes_per_replica"] = api.Predictor.ProcessesPerReplica
605610
event["predictor.threads_per_process"] = api.Predictor.ThreadsPerProcess
611+
612+
if api.Predictor.ShmSize != nil {
613+
event["predictor.shm_size"] = api.Predictor.ShmSize.String()
614+
}
615+
606616
event["predictor.log_level"] = api.Predictor.LogLevel
607617

608618
if api.Predictor.PythonPath != nil {

pkg/types/userconfig/config_key.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ const (
3939
TensorFlowServingImageKey = "tensorflow_serving_image"
4040
ProcessesPerReplicaKey = "processes_per_replica"
4141
ThreadsPerProcessKey = "threads_per_process"
42+
ShmSize = "shm_size"
4243
LogLevelKey = "log_level"
4344
ConfigKey = "config"
4445
EnvKey = "env"

0 commit comments

Comments
 (0)