Skip to content

Commit 72cb1ce

Browse files
author
EC2 Default User
committed
Merge remote-tracking branch 'redis-keyspaces-cache/master'
2 parents 85f27b7 + 4d8cd25 commit 72cb1ce

File tree

4 files changed

+235
-0
lines changed

4 files changed

+235
-0
lines changed
3.67 KB
Binary file not shown.
3.75 KB
Binary file not shown.

ks_redis.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
from cassandra.cluster import *
2+
from ssl import SSLContext, PROTOCOL_TLSv1_2 , CERT_REQUIRED
3+
from cassandra.auth import PlainTextAuthProvider
4+
from cassandra_sigv4.auth import SigV4AuthProvider
5+
from cassandra.query import SimpleStatement
6+
from rediscluster import RedisCluster
7+
import logging
8+
import time
9+
import boto3
10+
11+
#Keyspaces connection
12+
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
13+
ssl_context.load_verify_locations('/home/ec2-user/sf-class2-root.crt')
14+
ssl_context.verify_mode = CERT_REQUIRED
15+
boto_session = boto3.Session()
16+
auth_provider = SigV4AuthProvider(boto_session)
17+
cluster = Cluster(['cassandra.us-east-1.amazonaws.com'],
18+
ssl_context=ssl_context,
19+
auth_provider=auth_provider,
20+
port=9142)
21+
session = cluster.connect()
22+
23+
24+
#Amazon Elasticache connection
25+
logging.basicConfig(level=logging.ERROR)
26+
redis = RedisCluster(startup_nodes=[{"host": "keyspaces-cache.ebnqkc.clustercfg.use1.cache.amazonaws.com",
27+
"port": "6379"}],
28+
decode_responses=True,
29+
skip_full_coverage_check=True)
30+
31+
if redis.ping():
32+
logging.info("Connected to Redis")
33+
34+
35+
#Global variables
36+
keyspace_name="catalog"
37+
table_name="book_awards"
38+
39+
#Write a row
40+
def write_award(book_award):
41+
write_award_to_keyspaces(book_award)
42+
write_award_to_cache(book_award)
43+
44+
#write row to the Keyspaces table
45+
def write_award_to_keyspaces(book_award):
46+
stmt=SimpleStatement(f"INSERT INTO {keyspace_name}.{table_name} (award, year, category, rank, author, book_title, publisher) VALUES (%s, %s, %s, %s, %s, %s, %s)",
47+
consistency_level=ConsistencyLevel.LOCAL_QUORUM)
48+
session.execute(stmt,(book_award["award"],
49+
book_award["year"],
50+
book_award["category"],
51+
book_award["rank"],
52+
book_award["author"],
53+
book_award["book_title"],
54+
book_award["publisher"]))
55+
56+
#write row to the cache
57+
def write_award_to_cache(book_award):
58+
#construct Redis key name
59+
key_name=book_award["award"]+str(book_award["year"])+book_award["category"]+str(book_award["rank"])
60+
61+
#write to cache using Redis set, ex=300 sets TTL for this row to 300 seconds
62+
redis.set(key_name, str(book_award), ex=300)
63+
64+
65+
#Delete a row
66+
def delete_award(award, year, category, rank):
67+
delete_award_from_keyspaces(award, year, category, rank)
68+
delete_award_from_cache(award, year, category, rank)
69+
70+
#delete row from Keyspaces table
71+
def delete_award_from_keyspaces(award, year, category, rank):
72+
stmt = SimpleStatement(f"DELETE FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank=%s;",
73+
consistency_level=ConsistencyLevel.LOCAL_QUORUM)
74+
session.execute(stmt, (award, int(year), category, int(rank)))
75+
76+
#delete row from cache
77+
def delete_award_from_cache(award, year, category, rank):
78+
#construct Redis key name
79+
key_name=award+str(year)+category+str(rank)
80+
81+
#delete the row from cache if it exists
82+
if redis.get(key_name) is not None:
83+
book_award=redis.delete(key_name)
84+
85+
#Read a row
86+
def get_award(award, year, category, rank):
87+
#construct Redis key name from parameters
88+
key_name=award+str(year)+category+str(rank)
89+
book_award=redis.get(key_name)
90+
91+
#if row not in cache, fetch it from Keyspaces table
92+
if not book_award:
93+
print("Fetched from Cache: ", book_award)
94+
stmt = SimpleStatement(f"SELECT * FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank=%s;")
95+
res=session.execute(stmt, (award, int(year), category, int(rank)))
96+
if not res.current_rows:
97+
print("Fetched from Database: None")
98+
return None
99+
else:
100+
#lazy-load into cache
101+
book_award=redis.set(key_name, str(res.current_rows), ex=300)
102+
print("Fetched from Database: ")
103+
return res.current_rows
104+
else:
105+
print("Fetched from Cache: ")
106+
return book_award
107+
108+
109+
#Read one or more rows based on parameters
110+
def get_awards(parameters):
111+
#construct key name from parameters
112+
key_name=""
113+
for p in parameters:
114+
key_name=key_name+str(p)
115+
book_awards=redis.lrange(key_name, 0, -1)
116+
117+
#if result set not in cache, fetch it from Keyspaces table
118+
if not book_awards:
119+
print("Fetched from Cache: ", book_awards)
120+
stmt = SimpleStatement(f"SELECT * FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank<=%s;")
121+
res=session.execute(stmt, parameters)
122+
if not res.current_rows:
123+
print("Fetched from Database: None")
124+
return None
125+
else:
126+
#lazy-load into cache
127+
redis.rpush(key_name, str(res.current_rows))
128+
redis.expire(key_name, 300)
129+
print("Fetched from Database: ")
130+
return res.current_rows
131+
else:
132+
print("Fetched from Cache: ")
133+
return book_awards
134+

test_functions.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from ks_redis import *
2+
3+
def test_case_1():
4+
book_award={"award": "Golden Read",
5+
"year": 2021,
6+
"category": "sci-fi",
7+
"rank": 2,
8+
"author": "John Doe",
9+
"book_title": "Tomorrow is here",
10+
"publisher": "Ostrich books"}
11+
12+
#insert an award to the DB and cache
13+
write_award(book_award)
14+
print("Test Case 1:")
15+
print("New book award inserted.")
16+
17+
#cache hit - get award from cache
18+
print("\n")
19+
print("Verify cache hit:")
20+
res=get_award(book_award["award"],
21+
book_award["year"],
22+
book_award["category"],
23+
book_award["rank"])
24+
print(res)
25+
26+
#let the cache entry expire
27+
print("\n")
28+
print("Waiting for cached entry to expire, sleeping for 300 seconds...")
29+
time.sleep(300)
30+
31+
#cache miss - get award from DB and lazy load to cache
32+
print("\n")
33+
print("Entry expired in cache, award expected to be fetched from DB:")
34+
res=get_award(book_award["award"],
35+
book_award["year"],
36+
book_award["category"],
37+
book_award["rank"])
38+
print(res)
39+
40+
#cache hit - get award from cache
41+
print("\n")
42+
print("Verify that award is lazy loaded into cache:")
43+
res=get_award(book_award["award"],
44+
book_award["year"],
45+
book_award["category"],
46+
book_award["rank"])
47+
print(res)
48+
49+
#delete the award from cache and DB
50+
print("\n")
51+
print("Deleting book award.")
52+
delete_award(book_award["award"],
53+
book_award["year"],
54+
book_award["category"],
55+
book_award["rank"])
56+
57+
#confirm the award was deleted from cache and DB
58+
print("\n")
59+
print("Verify that the award was deleted from cache and DB:")
60+
res=get_award(book_award["award"],
61+
book_award["year"],
62+
book_award["category"],
63+
book_award["rank"])
64+
if res:
65+
print(res)
66+
67+
68+
def test_case_2():
69+
print("Test Case 2:")
70+
print("Get top 3 Must Read book awards for year 2021 in the Sci-Fi category")
71+
print("\n")
72+
res=get_awards(["Must Read", 2021, "Sci-Fi", 23])
73+
print(res)
74+
75+
#cache-hit - get awards from cache
76+
print("\n")
77+
print("Verify cache hit on subsequent query with same parameters:")
78+
res=get_awards(["Must Read", 2021, "Sci-Fi", 23])
79+
print(res)
80+
81+
#let the cache entry expire
82+
print("\n")
83+
print("Waiting for cached entry to expire, sleeping for 300 seconds...")
84+
time.sleep(300)
85+
86+
#cache miss - get award from DB and lazy load to cache
87+
print("\n")
88+
print("Entry expired in cache, awards expected to be fetched from DB.")
89+
res=get_awards(["Must Read", 2021, "Sci-Fi", 23])
90+
print(res)
91+
92+
#cache hit - get award from cache
93+
print("\n")
94+
print("Verify that awards are lazy loaded into cache:")
95+
res=get_awards(["Must Read", 2021, "Sci-Fi", 23])
96+
print(res)
97+
98+
test_case_1()
99+
print(" ")
100+
101+
test_case_2()

0 commit comments

Comments
 (0)