|
| 1 | +from cassandra.cluster import * |
| 2 | +from ssl import SSLContext, PROTOCOL_TLSv1_2 , CERT_REQUIRED |
| 3 | +from cassandra.auth import PlainTextAuthProvider |
| 4 | +from cassandra_sigv4.auth import SigV4AuthProvider |
| 5 | +from cassandra.query import SimpleStatement |
| 6 | +from rediscluster import RedisCluster |
| 7 | +import logging |
| 8 | +import time |
| 9 | +import boto3 |
| 10 | + |
| 11 | +#Keyspaces connection |
| 12 | +ssl_context = SSLContext(PROTOCOL_TLSv1_2) |
| 13 | +ssl_context.load_verify_locations('/home/ec2-user/sf-class2-root.crt') |
| 14 | +ssl_context.verify_mode = CERT_REQUIRED |
| 15 | +boto_session = boto3.Session() |
| 16 | +auth_provider = SigV4AuthProvider(boto_session) |
| 17 | +cluster = Cluster(['cassandra.us-east-1.amazonaws.com'], |
| 18 | + ssl_context=ssl_context, |
| 19 | + auth_provider=auth_provider, |
| 20 | + port=9142) |
| 21 | +session = cluster.connect() |
| 22 | + |
| 23 | + |
| 24 | +#Amazon Elasticache connection |
| 25 | +logging.basicConfig(level=logging.ERROR) |
| 26 | +redis = RedisCluster(startup_nodes=[{"host": "keyspaces-cache.ebnqkc.clustercfg.use1.cache.amazonaws.com", |
| 27 | + "port": "6379"}], |
| 28 | + decode_responses=True, |
| 29 | + skip_full_coverage_check=True) |
| 30 | + |
| 31 | +if redis.ping(): |
| 32 | + logging.info("Connected to Redis") |
| 33 | + |
| 34 | + |
| 35 | +#Global variables |
| 36 | +keyspace_name="catalog" |
| 37 | +table_name="book_awards" |
| 38 | + |
| 39 | +#Write a row |
| 40 | +def write_award(book_award): |
| 41 | + write_award_to_keyspaces(book_award) |
| 42 | + write_award_to_cache(book_award) |
| 43 | + |
| 44 | +#write row to the Keyspaces table |
| 45 | +def write_award_to_keyspaces(book_award): |
| 46 | + stmt=SimpleStatement(f"INSERT INTO {keyspace_name}.{table_name} (award, year, category, rank, author, book_title, publisher) VALUES (%s, %s, %s, %s, %s, %s, %s)", |
| 47 | + consistency_level=ConsistencyLevel.LOCAL_QUORUM) |
| 48 | + session.execute(stmt,(book_award["award"], |
| 49 | + book_award["year"], |
| 50 | + book_award["category"], |
| 51 | + book_award["rank"], |
| 52 | + book_award["author"], |
| 53 | + book_award["book_title"], |
| 54 | + book_award["publisher"])) |
| 55 | + |
| 56 | +#write row to the cache |
| 57 | +def write_award_to_cache(book_award): |
| 58 | + #construct Redis key name |
| 59 | + key_name=book_award["award"]+str(book_award["year"])+book_award["category"]+str(book_award["rank"]) |
| 60 | + |
| 61 | + #write to cache using Redis set, ex=300 sets TTL for this row to 300 seconds |
| 62 | + redis.set(key_name, str(book_award), ex=300) |
| 63 | + |
| 64 | + |
| 65 | +#Delete a row |
| 66 | +def delete_award(award, year, category, rank): |
| 67 | + delete_award_from_keyspaces(award, year, category, rank) |
| 68 | + delete_award_from_cache(award, year, category, rank) |
| 69 | + |
| 70 | +#delete row from Keyspaces table |
| 71 | +def delete_award_from_keyspaces(award, year, category, rank): |
| 72 | + stmt = SimpleStatement(f"DELETE FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank=%s;", |
| 73 | + consistency_level=ConsistencyLevel.LOCAL_QUORUM) |
| 74 | + session.execute(stmt, (award, int(year), category, int(rank))) |
| 75 | + |
| 76 | +#delete row from cache |
| 77 | +def delete_award_from_cache(award, year, category, rank): |
| 78 | + #construct Redis key name |
| 79 | + key_name=award+str(year)+category+str(rank) |
| 80 | + |
| 81 | + #delete the row from cache if it exists |
| 82 | + if redis.get(key_name) is not None: |
| 83 | + book_award=redis.delete(key_name) |
| 84 | + |
| 85 | +#Read a row |
| 86 | +def get_award(award, year, category, rank): |
| 87 | + #construct Redis key name from parameters |
| 88 | + key_name=award+str(year)+category+str(rank) |
| 89 | + book_award=redis.get(key_name) |
| 90 | + |
| 91 | + #if row not in cache, fetch it from Keyspaces table |
| 92 | + if not book_award: |
| 93 | + print("Fetched from Cache: ", book_award) |
| 94 | + stmt = SimpleStatement(f"SELECT * FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank=%s;") |
| 95 | + res=session.execute(stmt, (award, int(year), category, int(rank))) |
| 96 | + if not res.current_rows: |
| 97 | + print("Fetched from Database: None") |
| 98 | + return None |
| 99 | + else: |
| 100 | + #lazy-load into cache |
| 101 | + book_award=redis.set(key_name, str(res.current_rows), ex=300) |
| 102 | + print("Fetched from Database: ") |
| 103 | + return res.current_rows |
| 104 | + else: |
| 105 | + print("Fetched from Cache: ") |
| 106 | + return book_award |
| 107 | + |
| 108 | + |
| 109 | +#Read one or more rows based on parameters |
| 110 | +def get_awards(parameters): |
| 111 | + #construct key name from parameters |
| 112 | + key_name="" |
| 113 | + for p in parameters: |
| 114 | + key_name=key_name+str(p) |
| 115 | + book_awards=redis.lrange(key_name, 0, -1) |
| 116 | + |
| 117 | + #if result set not in cache, fetch it from Keyspaces table |
| 118 | + if not book_awards: |
| 119 | + print("Fetched from Cache: ", book_awards) |
| 120 | + stmt = SimpleStatement(f"SELECT * FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank<=%s;") |
| 121 | + res=session.execute(stmt, parameters) |
| 122 | + if not res.current_rows: |
| 123 | + print("Fetched from Database: None") |
| 124 | + return None |
| 125 | + else: |
| 126 | + #lazy-load into cache |
| 127 | + redis.rpush(key_name, str(res.current_rows)) |
| 128 | + redis.expire(key_name, 300) |
| 129 | + print("Fetched from Database: ") |
| 130 | + return res.current_rows |
| 131 | + else: |
| 132 | + print("Fetched from Cache: ") |
| 133 | + return book_awards |
| 134 | + |
0 commit comments