Skip to content

Commit c6d4b35

Browse files
committed
refactor: project structure and oop
1 parent 400cd47 commit c6d4b35

File tree

13 files changed

+268
-219
lines changed

13 files changed

+268
-219
lines changed

create-synth-data/create_collection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import os
66
from dotenv import load_dotenv
77

8-
load_dotenv()
8+
load_dotenv('../.env')
99
# get database name
1010
FIRESTORE_DATABASE = os.environ.get('FIRESTORE_DATABASE')
1111

@@ -49,7 +49,7 @@ def populate_firestore(rows):
4949
for _ in range(rows):
5050
product = generate_random_product()
5151
collection_ref.document(product["_id"]).set(product)
52-
print("Inserted {rows} random products into Firestore.")
52+
print(f"Inserted {rows} random products into Firestore.")
5353

5454

5555
populate_firestore(100)

create-synth-data/create_dynamo_table.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
dynamodb = session.resource('dynamodb')
2121

2222
# Define the table schema
23-
table_name = 'Products'
23+
table_name = os.environ.get('DYNAMODB_TABLE')
2424
attribute_definitions = [
2525
{'AttributeName': '_id', 'AttributeType': 'S'}, # Partition key
2626
{'AttributeName': 'category', 'AttributeType': 'S'} # Sort key (optional)

dynamodb_firestore.py

Lines changed: 0 additions & 54 deletions
This file was deleted.

dynamodb_to_firebase.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from migration import source, destination, migrate
2+
from google.cloud import firestore
3+
import boto3
4+
from dotenv import load_dotenv
5+
import os
6+
import logging
7+
import sys
8+
9+
logging.basicConfig(
10+
level=logging.INFO,
11+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
12+
handlers=[
13+
logging.StreamHandler(sys.stdout) # Specify sys.stdout for standard output
14+
],
15+
)
16+
17+
load_dotenv()
18+
19+
if __name__ == "__main__":
20+
21+
BATCH_SIZE = os.environ.get("BATCH_SIZE")
22+
# Set up Firestore and Dynamodb configurations
23+
FIRESTORE_DATABASE = os.environ.get("FIRESTORE_DATABASE")
24+
FIRESTORE_COLLECTION = os.environ.get("FIRESTORE_COLLECTION")
25+
DYNAMODB_TABLE = os.environ.get("DYNAMODB_TABLE")
26+
27+
# Initialize Firestore
28+
firestore_client = firestore.Client(database=FIRESTORE_DATABASE)
29+
firestore_collection = firestore_client.collection(FIRESTORE_COLLECTION)
30+
31+
# Initialize DynamoDB
32+
session = boto3.Session(
33+
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
34+
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
35+
region_name=os.environ.get("AWS_REGION"),
36+
)
37+
dynamodb = session.resource("dynamodb")
38+
table = dynamodb.Table(DYNAMODB_TABLE)
39+
40+
source_dynamodb = source.DynamoDB(table, BATCH_SIZE)
41+
destination_firebase = destination.Firebase(firestore_client, firestore_collection)
42+
43+
migrate = migrate.DatabaseMigration(source_dynamodb, destination_firebase)
44+
migrate.run()

firebase_to_dynamodb.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from migration import source, destination, migrate
2+
from google.cloud import firestore
3+
import boto3
4+
from dotenv import load_dotenv
5+
import os
6+
import logging
7+
import sys
8+
9+
logging.basicConfig(
10+
level=logging.INFO,
11+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
12+
handlers=[
13+
logging.StreamHandler(sys.stdout) # Specify sys.stdout for standard output
14+
],
15+
)
16+
17+
load_dotenv()
18+
19+
if __name__ == "__main__":
20+
21+
BATCH_SIZE = os.environ.get("BATCH_SIZE")
22+
# Set up Firestore and Dynamodb configurations
23+
FIRESTORE_DATABASE = os.environ.get("FIRESTORE_DATABASE")
24+
FIRESTORE_COLLECTION = os.environ.get("FIRESTORE_COLLECTION")
25+
DYNAMODB_TABLE = os.environ.get("DYNAMODB_TABLE")
26+
27+
# Initialize Firestore
28+
firestore_client = firestore.Client(database=FIRESTORE_DATABASE)
29+
firestore_collection = firestore_client.collection(FIRESTORE_COLLECTION)
30+
31+
# Initialize DynamoDB
32+
session = boto3.Session(
33+
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
34+
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
35+
region_name=os.environ.get("AWS_REGION"),
36+
)
37+
dynamodb = session.resource("dynamodb")
38+
table = dynamodb.Table(DYNAMODB_TABLE)
39+
40+
source_firebase = source.Firebase(firestore_collection, BATCH_SIZE)
41+
destination_dynamodb = destination.DynamoDB(table)
42+
43+
migrate = migrate.DatabaseMigration(source_firebase, destination_dynamodb)
44+
migrate.run()

firestore_dynamodb.py

Lines changed: 0 additions & 57 deletions
This file was deleted.

migrate/extract.py

Lines changed: 0 additions & 39 deletions
This file was deleted.

migrate/load.py

Lines changed: 0 additions & 29 deletions
This file was deleted.

migration/destination.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from abc import ABC, abstractmethod
2+
import logging
3+
from migration.utils.convert import floats_to_decimals, decimals_to_floats
4+
5+
logger = logging.getLogger(__name__)
6+
7+
8+
class Loader(ABC):
9+
@abstractmethod
10+
def load(self, data):
11+
pass
12+
13+
14+
class Firebase(Loader):
15+
def __init__(self, client, collection) -> None:
16+
self.client = client
17+
self.collection = collection
18+
19+
def load(self, data):
20+
"""Load data into Firestore in batches"""
21+
logger.info(f"Loading to Firestore")
22+
23+
batch = self.client.batch()
24+
data = decimals_to_floats(data)
25+
26+
for item in data:
27+
doc_ref = self.colletion.document(item.get("_id"))
28+
batch.set(doc_ref, item)
29+
30+
batch.commit()
31+
logger.info(f"Loaded {len(data)} documents")
32+
33+
34+
class DynamoDB(Loader):
35+
def __init__(self, table) -> None:
36+
self.table = table
37+
38+
39+
def load(self, data):
40+
"""Load data into DynamoDB in batches"""
41+
logger.info(f"Loading to dynamodb")
42+
43+
data = floats_to_decimals(data)
44+
with self.table.batch_writer() as batch:
45+
for item in data:
46+
batch.put_item(Item=item)
47+
logger.info(f"Loaded {len(data)} documents")

migration/migrate.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import logging
2+
import traceback
3+
4+
from migration.destination import Loader
5+
from migration.source import Extractor
6+
7+
8+
class DatabaseMigration:
9+
def __init__(self, extractor: Extractor, loader: Loader) -> None:
10+
self.extractor = extractor
11+
self.loader = loader
12+
self.logger = logging.getLogger(__name__)
13+
14+
def run(self):
15+
last_document = False
16+
try:
17+
while True:
18+
data, last_document = self.extractor.extract(last_document)
19+
if data:
20+
self.loader.load(data)
21+
else:
22+
break
23+
self.logger.info("Migration complete")
24+
except Exception as e:
25+
stack_trace = traceback.format_exc()
26+
logging.error(stack_trace)
27+
logging.error(f"Error when migrating data: {e}")

0 commit comments

Comments
 (0)