Skip to content

Commit d0da741

Browse files
Added replicationcontroller permissions to permissions-fix.yaml. Uploaded new examples with datasets from tensorflow_datasets.
1 parent 15dfb6b commit d0da741

File tree

9 files changed

+240
-1
lines changed

9 files changed

+240
-1
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# EUROSAT and VGG16
2+
3+
The following VGG16 TensorFlow deep learning model has been used in Kafka-ML for this example using the EUROSAT dataset:
4+
5+
```
6+
base_model = tf.keras.applications.VGG16(include_top=False, weights='imagenet', input_shape=(64,64,3))
7+
x = tf.keras.layers.Flatten()(base_model.output)
8+
x = tf.keras.layers.Dense(1000, activation='relu')(x)
9+
x = tf.keras.layers.Dense(512, activation='relu')(x)
10+
x = tf.keras.layers.Dense(128, activation='relu')(x)
11+
predictions = tf.keras.layers.Dense(10, activation = 'softmax')(x)
12+
13+
model = tf.keras.Model(inputs = base_model.input, outputs = predictions)
14+
15+
model.compile(optimizer=tf.keras.optimizers.SGD(0.001),
16+
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
17+
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
18+
```
19+
The batch_size used is 256 and the training configuration (epochs=50, shuffle=True).
20+
21+
In the PyTorch Case, the following VGG16 deep learning model has been used in Kafka-ML for the EUROSAT dataset example:
22+
23+
```
24+
class VGG16(nn.Module):
25+
def __init__(self):
26+
super(VGG16, self).__init__()
27+
self.pretrained = models.vgg16(pretrained=True)
28+
self.flatten = nn.Flatten()
29+
self.linear_relu_stack = nn.Sequential(
30+
nn.Linear(1000, 512),
31+
nn.ReLU(),
32+
nn.Linear(512, 256),
33+
nn.ReLU(),
34+
nn.Linear(256, 128),
35+
nn.ReLU(),
36+
nn.Linear(128, 10),
37+
nn.Softmax()
38+
)
39+
40+
def forward(self, x):
41+
x = self.pretrained(x)
42+
x = self.flatten(x)
43+
output = self.linear_relu_stack(x)
44+
return output
45+
46+
def loss_fn(self):
47+
return nn.CrossEntropyLoss()
48+
49+
def optimizer(self):
50+
return torch.optim.SGD(model.parameters(), lr=1e-3)
51+
52+
def metrics(self):
53+
val_metrics = {
54+
"accuracy": Accuracy(),
55+
"loss": Loss(self.loss_fn())
56+
}
57+
return val_metrics
58+
59+
model = VGG16()
60+
```
61+
The batch_size used is 256 and the training configuration (max_epochs=50, shuffle=True)
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import tensorflow as tf
2+
import tensorflow_datasets as tfds
3+
import logging
4+
from kafka import KafkaProducer, KafkaConsumer
5+
6+
logging.basicConfig(level=logging.INFO)
7+
8+
INPUT_TOPIC = 'eurosat-in'
9+
OUTPUT_TOPIC = 'eurosat-out'
10+
BOOTSTRAP_SERVERS= '127.0.0.1:9094'
11+
ITEMS_TO_PREDICT = 10
12+
13+
eurosat = tfds.load('eurosat', as_supervised=True, shuffle_files=True,
14+
split=[f"train[:{ITEMS_TO_PREDICT}]"], data_dir='datasets/eurosat')
15+
16+
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
17+
"""Creates a producer to send the values to predict"""
18+
19+
for image, _ in eurosat[0]:
20+
producer.send(INPUT_TOPIC, image.numpy().tobytes())
21+
"""Sends the value to predict to Kafka"""
22+
producer.flush()
23+
producer.close()
24+
25+
output_consumer = KafkaConsumer(OUTPUT_TOPIC, bootstrap_servers=BOOTSTRAP_SERVERS, group_id="output_group")
26+
"""Creates an output consumer to receive the predictions"""
27+
28+
print('\n')
29+
30+
print('Output consumer: ')
31+
for msg in output_consumer:
32+
print (msg.value.decode())
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import sys
2+
sys.path.append(sys.path[0] + "/../..")
3+
"""To allow importing datasources"""
4+
5+
from datasources.raw_sink import RawSink
6+
import tensorflow as tf
7+
import tensorflow_datasets as tfds
8+
import logging
9+
10+
logging.basicConfig(level=logging.INFO)
11+
12+
eurosat = RawSink(boostrap_servers='localhost:9094', topic='automl', deployment_id=1,
13+
description='eurosat dataset', validation_rate=0.1, test_rate=0.1)
14+
15+
ds = tfds.load('eurosat', as_supervised=True, shuffle_files=True, data_dir='datasets/eurosat')
16+
ds['train'] = ds['train'].shuffle(buffer_size=1000)
17+
18+
for image, label in ds['train']:
19+
eurosat.send(data=image.numpy(), label=label.numpy())
20+
21+
eurosat.close()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
tensorflow==2.7.0
2+
tensorflow-datasets==4.5.2
3+
kafka-python==2.0.2
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# SO2SAT and VGG16
2+
3+
The following VGG16 TensorFlow deep learning model has been used in Kafka-ML for this example using the SO2SAT dataset:
4+
5+
```
6+
base_model = tf.keras.applications.VGG16(include_top=False, weights='imagenet', input_shape=(32,32,3))
7+
x = tf.keras.layers.Flatten()(base_model.output)
8+
x = tf.keras.layers.Dense(1000, activation='relu')(x)
9+
x = tf.keras.layers.Dense(512, activation='relu')(x)
10+
x = tf.keras.layers.Dense(128, activation='relu')(x)
11+
predictions = tf.keras.layers.Dense(17, activation = 'softmax')(x)
12+
13+
model = tf.keras.Model(inputs = base_model.input, outputs = predictions)
14+
15+
model.compile(optimizer=tf.keras.optimizers.SGD(0.001),
16+
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
17+
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
18+
```
19+
The batch_size used is 256 and the training configuration (epochs=50, shuffle=True).
20+
21+
In the PyTorch Case, the following VGG16 deep learning model has been used in Kafka-ML for the SO2SAT dataset example:
22+
23+
```
24+
class VGG16(nn.Module):
25+
def __init__(self):
26+
super(VGG16, self).__init__()
27+
self.pretrained = models.vgg16(pretrained=True)
28+
self.flatten = nn.Flatten()
29+
self.linear_relu_stack = nn.Sequential(
30+
nn.Linear(1000, 512),
31+
nn.ReLU(),
32+
nn.Linear(512, 256),
33+
nn.ReLU(),
34+
nn.Linear(256, 128),
35+
nn.ReLU(),
36+
nn.Linear(128, 17),
37+
nn.Softmax()
38+
)
39+
40+
def forward(self, x):
41+
x = self.pretrained(x)
42+
x = self.flatten(x)
43+
output = self.linear_relu_stack(x)
44+
return output
45+
46+
def loss_fn(self):
47+
return nn.CrossEntropyLoss()
48+
49+
def optimizer(self):
50+
return torch.optim.SGD(model.parameters(), lr=1e-3)
51+
52+
def metrics(self):
53+
val_metrics = {
54+
"accuracy": Accuracy(),
55+
"loss": Loss(self.loss_fn())
56+
}
57+
return val_metrics
58+
59+
model = VGG16()
60+
```
61+
The batch_size used is 256 and the training configuration (max_epochs=50, shuffle=True)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
tensorflow==2.7.0
2+
tensorflow-datasets==4.5.2
3+
kafka-python==2.0.2
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import tensorflow as tf
2+
import tensorflow_datasets as tfds
3+
import logging
4+
from kafka import KafkaProducer, KafkaConsumer
5+
6+
logging.basicConfig(level=logging.INFO)
7+
8+
INPUT_TOPIC = 'so2sat-in'
9+
OUTPUT_TOPIC = 'so2sat-out'
10+
BOOTSTRAP_SERVERS= '127.0.0.1:9094'
11+
ITEMS_TO_PREDICT = 10
12+
13+
so2sat = tfds.load('so2sat', as_supervised=True, shuffle_files=True,
14+
split=[f"validation[:{ITEMS_TO_PREDICT}]"], data_dir='datasets/so2sat')
15+
16+
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
17+
"""Creates a producer to send the values to predict"""
18+
19+
for image, _ in so2sat[0]:
20+
producer.send(INPUT_TOPIC, image.numpy().tobytes())
21+
"""Sends the value to predict to Kafka"""
22+
producer.flush()
23+
producer.close()
24+
25+
output_consumer = KafkaConsumer(OUTPUT_TOPIC, bootstrap_servers=BOOTSTRAP_SERVERS, group_id="output_group")
26+
"""Creates an output consumer to receive the predictions"""
27+
28+
print('\n')
29+
30+
print('Output consumer: ')
31+
for msg in output_consumer:
32+
print (msg.value.decode())
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import sys
2+
sys.path.append(sys.path[0] + "/../..")
3+
"""To allow importing datasources"""
4+
5+
from datasources.raw_sink import RawSink
6+
import tensorflow as tf
7+
import tensorflow_datasets as tfds
8+
import logging
9+
10+
logging.basicConfig(level=logging.INFO)
11+
12+
so2sat = RawSink(boostrap_servers='localhost:9094', topic='automl', deployment_id=1,
13+
description='so2sat dataset', validation_rate=0.1, test_rate=0.1)
14+
15+
ds = tfds.load('so2sat', as_supervised=True, shuffle_files=True, data_dir='datasets/so2sat')
16+
17+
ds['train'] = ds['train'].shuffle(buffer_size=1000)
18+
ds['validation'] = ds['validation'].shuffle(buffer_size=1000)
19+
20+
for image, label in ds['train']:
21+
so2sat.send(data=image.numpy(), label=label.numpy())
22+
23+
for image, label in ds['validation']:
24+
so2sat.send(data=image.numpy(), label=label.numpy())
25+
26+
so2sat.close()

permissions-fix.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ metadata:
66
namespace: kafkaml
77
rules:
88
- apiGroups: ["", "apps", "batch"]
9-
resources: [ "deployments", "jobs", pods", "replicasets", services" ]
9+
resources: [ "deployments", "jobs", "pods", "replicasets", "services", "replicationcontrollers"]
1010
verbs: [ "create", "get", "list", "delete", "watch"]
1111
---
1212
apiVersion: rbac.authorization.k8s.io/v1

0 commit comments

Comments
 (0)