Skip to content

Commit 342a7d0

Browse files
authored
Merge pull request #52 from juhpatil/main
Sample code to use Amazon Elasticache Redis as a cache for Amazon Keyspaces data - added sample data, requirements, readme files
2 parents 85f27b7 + 614a353 commit 342a7d0

File tree

8 files changed

+339
-0
lines changed

8 files changed

+339
-0
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
dist/
2+
build/
3+
write-through-cache.egg-info
4+
write-through-cache/__pycache__/
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
## Using Amazon Elasticache with Redis OSS compatibility as a cache for Amazon Keyspaces
2+
3+
This sample project shows the use of the Amazon Elasticache Redis as a cache for book awards data stored in Amazon Keyspaces. The sample uses DataStax Python Driver for Apache Cassandra to connect to Amazon Keyspaces, and Redis Client to connect to Amazon Elasticache Redis. SigV4 has been used for authentication using short-term credentials.
4+
5+
### Prerequisites
6+
You should have Python and pip installed. This sample works with Python 3.x.
7+
8+
You should also setup Amazon Keyspaces with an IAM user. See [Accessing Amazon Keyspaces](https://docs.aws.amazon.com/keyspaces/latest/devguide/accessing.html) for more.
9+
10+
You need the Starfield digital certificate 'sf-class2-root.crt' downloaded locally or in your home directory - provide the certificate path on line 13 of write_through_caching_sample/ks_redis.py. The certificate can be found at write_through_caching_sample/resources.
11+
12+
You should have the connection string for your Amazon Elasticache Redis cluster which you will provide on line of write_through_caching_sample/ks_redis.py.
13+
14+
You should have the keyspace name and table name for your Amazon Keyspaces resource which you will provide on line 36 and line 37 of write_through_caching_sample/ks_redis.py respectively.
15+
16+
Sample data can be found at write_through_caching_sample/resources/ and can be loaded using [CQLSH](https://docs.aws.amazon.com/keyspaces/latest/devguide/bulk-upload.html) or [DSBulk](https://docs.aws.amazon.com/keyspaces/latest/devguide/dsbulk-upload.html).
17+
18+
19+
### Running the sample
20+
21+
This sample uses Boto3 which will find credentials based on environment variables, config or credentials file on your client, see [Boto3 Credentials Guide](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for how to set this up.
22+
23+
You can quickly get this to run by explicitly setting the following environment variables...
24+
25+
- AWS_REGION (for example 'us-east-1)
26+
- AWS_ACCESS_KEY_ID (ex: 'AKIAIOSFODNN7EXAMPLE')
27+
- AWS_SECRET_ACCESS_KEY (ex: 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY')
28+
29+
### Install the dependencies
30+
31+
From the project directory...
32+
```
33+
pip install -r requirements.txt
34+
```
35+
36+
### Testing
37+
From this project directory...
38+
```
39+
python3 write_through_caching_sample/test_functions.py
40+
```
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
cassandra-driver
2+
cassandra-sigv4
3+
redis-py-cluster
4+
boto3
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from setuptools import setup, find_packages
2+
3+
setup(
4+
name='write_through_caching_sample',
5+
version='1.0',
6+
include_package_data=True,
7+
packages=['write_through_cachine_sample'],
8+
install_requires=[
9+
'cassandra-driver',
10+
'cassandra-sigv4',
11+
'redis-py-cluster',
12+
'boto3'
13+
],
14+
15+
)
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
from cassandra.cluster import *
2+
from ssl import SSLContext, PROTOCOL_TLSv1_2 , CERT_REQUIRED
3+
from cassandra.auth import PlainTextAuthProvider
4+
from cassandra_sigv4.auth import SigV4AuthProvider
5+
from cassandra.query import SimpleStatement
6+
from rediscluster import RedisCluster
7+
import logging
8+
import time
9+
import boto3
10+
11+
#Keyspaces connection
12+
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
13+
ssl_context.load_verify_locations('/home/ec2-user/sf-class2-root.crt')
14+
ssl_context.verify_mode = CERT_REQUIRED
15+
boto_session = boto3.Session()
16+
auth_provider = SigV4AuthProvider(boto_session)
17+
cluster = Cluster(['cassandra.us-east-1.amazonaws.com'],
18+
ssl_context=ssl_context,
19+
auth_provider=auth_provider,
20+
port=9142)
21+
session = cluster.connect()
22+
23+
24+
#Amazon Elasticache connection
25+
logging.basicConfig(level=logging.ERROR)
26+
redis = RedisCluster(startup_nodes=[{"host": "YOUR_CLUSTER.clustercfg.use1.cache.amazonaws.com",
27+
"port": "6379"}],
28+
decode_responses=True,
29+
skip_full_coverage_check=True)
30+
31+
if redis.ping():
32+
logging.info("Connected to Redis")
33+
34+
35+
#Global variables
36+
keyspace_name="catalog"
37+
table_name="book_awards"
38+
39+
#Write a row
40+
def write_award(book_award):
41+
write_award_to_keyspaces(book_award)
42+
write_award_to_cache(book_award)
43+
44+
#write row to the Keyspaces table
45+
def write_award_to_keyspaces(book_award):
46+
stmt=SimpleStatement(f"INSERT INTO {keyspace_name}.{table_name} (award, year, category, rank, author, book_title, publisher) VALUES (%s, %s, %s, %s, %s, %s, %s)",
47+
consistency_level=ConsistencyLevel.LOCAL_QUORUM)
48+
session.execute(stmt,(book_award["award"],
49+
book_award["year"],
50+
book_award["category"],
51+
book_award["rank"],
52+
book_award["author"],
53+
book_award["book_title"],
54+
book_award["publisher"]))
55+
56+
#write row to the cache
57+
def write_award_to_cache(book_award):
58+
#construct Redis key name
59+
key_name=book_award["award"]+str(book_award["year"])+book_award["category"]+str(book_award["rank"])
60+
61+
#write to cache using Redis set, ex=300 sets TTL for this row to 300 seconds
62+
redis.set(key_name, str(book_award), ex=300)
63+
64+
65+
#Delete a row
66+
def delete_award(award, year, category, rank):
67+
delete_award_from_keyspaces(award, year, category, rank)
68+
delete_award_from_cache(award, year, category, rank)
69+
70+
#delete row from Keyspaces table
71+
def delete_award_from_keyspaces(award, year, category, rank):
72+
stmt = SimpleStatement(f"DELETE FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank=%s;",
73+
consistency_level=ConsistencyLevel.LOCAL_QUORUM)
74+
session.execute(stmt, (award, int(year), category, int(rank)))
75+
76+
#delete row from cache
77+
def delete_award_from_cache(award, year, category, rank):
78+
#construct Redis key name
79+
key_name=award+str(year)+category+str(rank)
80+
81+
#delete the row from cache if it exists
82+
if redis.get(key_name) is not None:
83+
book_award=redis.delete(key_name)
84+
85+
#Read a row
86+
def get_award(award, year, category, rank):
87+
#construct Redis key name from parameters
88+
key_name=award+str(year)+category+str(rank)
89+
book_award=redis.get(key_name)
90+
91+
#if row not in cache, fetch it from Keyspaces table
92+
if not book_award:
93+
print("Fetched from Cache: ", book_award)
94+
stmt = SimpleStatement(f"SELECT * FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank=%s;")
95+
res=session.execute(stmt, (award, int(year), category, int(rank)))
96+
if not res.current_rows:
97+
print("Fetched from Database: None")
98+
return None
99+
else:
100+
#lazy-load into cache
101+
book_award=redis.set(key_name, str(res.current_rows), ex=300)
102+
print("Fetched from Database: ")
103+
return res.current_rows
104+
else:
105+
print("Fetched from Cache: ")
106+
return book_award
107+
108+
109+
#Read one or more rows based on parameters
110+
def get_awards(parameters):
111+
#construct key name from parameters
112+
key_name=""
113+
for p in parameters:
114+
key_name=key_name+str(p)
115+
book_awards=redis.lrange(key_name, 0, -1)
116+
117+
#if result set not in cache, fetch it from Keyspaces table
118+
if not book_awards:
119+
print("Fetched from Cache: ", book_awards)
120+
stmt = SimpleStatement(f"SELECT * FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank<=%s;")
121+
res=session.execute(stmt, parameters)
122+
if not res.current_rows:
123+
print("Fetched from Database: None")
124+
return None
125+
else:
126+
#lazy-load into cache
127+
redis.rpush(key_name, str(res.current_rows))
128+
redis.expire(key_name, 300)
129+
print("Fetched from Database: ")
130+
return res.current_rows
131+
else:
132+
print("Fetched from Cache: ")
133+
return book_awards
134+
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
"award","year","book_title","author","rank","publisher","category"
2+
"Must Read",2021,"The mystery of the 7th dimension","Polly Gon",1,"PublishGo","Sci-Fi"
3+
"Must Read",2021,"Time travellers guide","Kai K",2,"Publish123","Sci-Fi"
4+
"Must Read",2021,"Key to the teleporter","Mick Key",3,"Penguins","Sci-Fi"
5+
"Must Read",2021,"Victors Mars","Anonymous",4,"PinkPublish","Sci-Fi"
6+
"Must Read1",2021,"Tomorrow is here","John Doe",2,"Ostrich books1","Sci-Fi"
7+
"Must Read1",2021,"Tomorrow is here","John Doe",3,"Ostrich books1","Sci-Fi"
8+
"International Best Seller",2023,"Adventures of Ji","Ji Jill",1,"PublishGo","Young Adult"
9+
"International Best Seller",2023,"The Chronicles of Myrtlini","Jane Doe",2,"Penguins","Young Adult"
10+
"International Best Seller",2023,"Dreams","Gigi",3,"WellPublishers","Young Adult"
11+
"International Best Seller",2023,"Vision of Utopia","Anonymous",4,"ABCPublish","Young Adult"
12+
"International Best Seller",2023,"The sailor said..","Pyter P",5,"PublishGo","Young Adult"
13+
"Must read",2056,"Postcard from Andromeda","James Doe",1,"Publish123","sci-fi"
14+
"Must read",2056,"Postcard from Almeda","James Doug",2,"Publish123","sci-fi"
15+
"Must read",2056,"Postcard from Mars","James Doug",5,"Publish123","sci-fi"
16+
"Golden Read",2056,"Postcard from Andromeda","James Doe",1,"Publish123","sci-fi"
17+
"Golden Read",2056,"Tomorrow is here","John Doe",2,"Ostrich books","sci-fi"
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
-----BEGIN CERTIFICATE-----
2+
MIIEDzCCAvegAwIBAgIBADANBgkqhkiG9w0BAQUFADBoMQswCQYDVQQGEwJVUzEl
3+
MCMGA1UEChMcU3RhcmZpZWxkIFRlY2hub2xvZ2llcywgSW5jLjEyMDAGA1UECxMp
4+
U3RhcmZpZWxkIENsYXNzIDIgQ2VydGlmaWNhdGlvbiBBdXRob3JpdHkwHhcNMDQw
5+
NjI5MTczOTE2WhcNMzQwNjI5MTczOTE2WjBoMQswCQYDVQQGEwJVUzElMCMGA1UE
6+
ChMcU3RhcmZpZWxkIFRlY2hub2xvZ2llcywgSW5jLjEyMDAGA1UECxMpU3RhcmZp
7+
ZWxkIENsYXNzIDIgQ2VydGlmaWNhdGlvbiBBdXRob3JpdHkwggEgMA0GCSqGSIb3
8+
DQEBAQUAA4IBDQAwggEIAoIBAQC3Msj+6XGmBIWtDBFk385N78gDGIc/oav7PKaf
9+
8MOh2tTYbitTkPskpD6E8J7oX+zlJ0T1KKY/e97gKvDIr1MvnsoFAZMej2YcOadN
10+
+lq2cwQlZut3f+dZxkqZJRRU6ybH838Z1TBwj6+wRir/resp7defqgSHo9T5iaU0
11+
X9tDkYI22WY8sbi5gv2cOj4QyDvvBmVmepsZGD3/cVE8MC5fvj13c7JdBmzDI1aa
12+
K4UmkhynArPkPw2vCHmCuDY96pzTNbO8acr1zJ3o/WSNF4Azbl5KXZnJHoe0nRrA
13+
1W4TNSNe35tfPe/W93bC6j67eA0cQmdrBNj41tpvi/JEoAGrAgEDo4HFMIHCMB0G
14+
A1UdDgQWBBS/X7fRzt0fhvRbVazc1xDCDqmI5zCBkgYDVR0jBIGKMIGHgBS/X7fR
15+
zt0fhvRbVazc1xDCDqmI56FspGowaDELMAkGA1UEBhMCVVMxJTAjBgNVBAoTHFN0
16+
YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xMjAwBgNVBAsTKVN0YXJmaWVsZCBD
17+
bGFzcyAyIENlcnRpZmljYXRpb24gQXV0aG9yaXR5ggEAMAwGA1UdEwQFMAMBAf8w
18+
DQYJKoZIhvcNAQEFBQADggEBAAWdP4id0ckaVaGsafPzWdqbAYcaT1epoXkJKtv3
19+
L7IezMdeatiDh6GX70k1PncGQVhiv45YuApnP+yz3SFmH8lU+nLMPUxA2IGvd56D
20+
eruix/U0F47ZEUD0/CwqTRV/p2JdLiXTAAsgGh1o+Re49L2L7ShZ3U0WixeDyLJl
21+
xy16paq8U4Zt3VekyvggQQto8PT7dL5WXXp59fkdheMtlb71cZBDzI0fmgAKhynp
22+
VSJYACPq4xJDKVtHCN2MQWplBqjlIapBtJUhlbl90TSrE9atvNziPTnNvT51cKEY
23+
WQPJIrSPnNVeKtelttQKbfi3QBFGmh95DmK/D5fs4C8fF5Q=
24+
-----END CERTIFICATE-----
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from ks_redis import *
2+
3+
def test_case_1():
4+
book_award={"award": "Golden Read",
5+
"year": 2021,
6+
"category": "sci-fi",
7+
"rank": 2,
8+
"author": "John Doe",
9+
"book_title": "Tomorrow is here",
10+
"publisher": "Ostrich books"}
11+
12+
#insert an award to the DB and cache
13+
write_award(book_award)
14+
print("Test Case 1:")
15+
print("New book award inserted.")
16+
17+
#cache hit - get award from cache
18+
print("\n")
19+
print("Verify cache hit:")
20+
res=get_award(book_award["award"],
21+
book_award["year"],
22+
book_award["category"],
23+
book_award["rank"])
24+
print(res)
25+
26+
#let the cache entry expire
27+
print("\n")
28+
print("Waiting for cached entry to expire, sleeping for 300 seconds...")
29+
time.sleep(300)
30+
31+
#cache miss - get award from DB and lazy load to cache
32+
print("\n")
33+
print("Entry expired in cache, award expected to be fetched from DB:")
34+
res=get_award(book_award["award"],
35+
book_award["year"],
36+
book_award["category"],
37+
book_award["rank"])
38+
print(res)
39+
40+
#cache hit - get award from cache
41+
print("\n")
42+
print("Verify that award is lazy loaded into cache:")
43+
res=get_award(book_award["award"],
44+
book_award["year"],
45+
book_award["category"],
46+
book_award["rank"])
47+
print(res)
48+
49+
#delete the award from cache and DB
50+
print("\n")
51+
print("Deleting book award.")
52+
delete_award(book_award["award"],
53+
book_award["year"],
54+
book_award["category"],
55+
book_award["rank"])
56+
57+
#confirm the award was deleted from cache and DB
58+
print("\n")
59+
print("Verify that the award was deleted from cache and DB:")
60+
res=get_award(book_award["award"],
61+
book_award["year"],
62+
book_award["category"],
63+
book_award["rank"])
64+
if res:
65+
print(res)
66+
67+
68+
def test_case_2():
69+
print("Test Case 2:")
70+
print("Get top 3 Must Read book awards for year 2021 in the Sci-Fi category")
71+
print("\n")
72+
res=get_awards(["Must Read", 2021, "Sci-Fi", 3])
73+
print(res)
74+
75+
#cache-hit - get awards from cache
76+
print("\n")
77+
print("Verify cache hit on subsequent query with same parameters:")
78+
res=get_awards(["Must Read", 2021, "Sci-Fi", 3])
79+
print(res)
80+
81+
#let the cache entry expire
82+
print("\n")
83+
print("Waiting for cached entry to expire, sleeping for 300 seconds...")
84+
time.sleep(300)
85+
86+
#cache miss - get award from DB and lazy load to cache
87+
print("\n")
88+
print("Entry expired in cache, awards expected to be fetched from DB.")
89+
res=get_awards(["Must Read", 2021, "Sci-Fi", 3])
90+
print(res)
91+
92+
#cache hit - get award from cache
93+
print("\n")
94+
print("Verify that awards are lazy loaded into cache:")
95+
res=get_awards(["Must Read", 2021, "Sci-Fi", 3])
96+
print(res)
97+
98+
test_case_1()
99+
print(" ")
100+
101+
test_case_2()

0 commit comments

Comments
 (0)