import urllib
import dask
import dask.threaded
import logging
import asyncio
from dask import delayed
from sqlalchemy import create_engine, text
from datetime import datetime, timezone
from datamanwithvan.config.config import datamanwithvanConfig
from datamanwithvan.utils import messages
from datamanwithvan.utils.bidirectional_queue import BidirectionalQueue
from datamanwithvan.utils.statuscodes import StatusCodes
from datamanwithvan.replication import replicationtask
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler()
con_frmt = logging.Formatter(
'%(asctime)s - %(name)s (%(levelname)s) : %(message)s')
console_handler.setFormatter(con_frmt)
logger.addHandler(console_handler)
try:
# TODO: How to get a default path from somewhere?
file_handler = logging.FileHandler("/var/log/datamanwithvan/dmwv.log")
file_format = logging.Formatter(
'%(asctime)s - %(name)s (%(levelname)s) : %(message)s')
logger.addHandler(file_handler)
except Exception as e:
print(f"Error while trying to open log file: {e}")
console_handler.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
[docs]
class ReplicationJob:
"""_summary_
Returns:
_type_: _description_
"""
replication_job_id = None
configuration = datamanwithvanConfig(config_file=None, runtime_params=None)
content = None
query_engine = None
StatusCodesObj = StatusCodes()
verbosity = True
tasks = []
job_metadata = {
"job_id": 0,
"job_friendly_name": "",
"job_status": "",
"job_started": datetime.now(timezone.utc).timestamp(),
"job_finished": "",
"job_tasks": [],
"comments": ""
}
main_to_job_channel = None
job_task_channel = BidirectionalQueue()
mainprogram_job_channel = BidirectionalQueue()
def __init__(
self,
replication_job_id,
configuration,
verbosity,
mainprogram_job_channel):
"""_summary_
Args:
replication_job_id (_type_): _description_
configuration (_type_): _description_
verbosity (_type_): _description_
mainprogram_job_channel (_type_): _description_
"""
self.replication_job_id = replication_job_id
self.configuration = configuration
self.mainprogram_job_channel = mainprogram_job_channel
self.content = messages.DatamanwithvanMessages()
self.query_engine = self._getQueryEngine()
# Section 84 : Set up the logger
self.verbosity = verbosity
if verbosity:
console_handler.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
# End of Section 84
asyncio.run(self._start_job())
[docs]
async def produce(
self,
communication: BidirectionalQueue,
sender,
message):
"""_summary_
Args:
communication (BidirectionalQueue): _description_
sender (_type_): _description_
message (_type_): _description_
"""
logger.info(f"{sender} is about to send {self.job_metadata} to main")
await communication.queue_to_producer.put(self.job_metadata)
logger.info(f"{sender} sent job status to main program")
[docs]
async def consume(self, communication: BidirectionalQueue, whoami):
"""_summary_
Args:
communication (BidirectionalQueue): _description_
whoami (_type_): _description_
"""
logger.info(f"{whoami} is waiting to receive from main prog")
message = await communication.queue_to_consumer.get()
self.job_metadata = self.run_job(int(message.split("_")[-1]))
logger.info(f"{whoami} received {message}")
async def _start_job(self):
"""_summary_
"""
await asyncio.gather(
self.produce(
self.mainprogram_job_channel,
"JOB",
"status-from-job-to-main"),
self.consume(self.mainprogram_job_channel, "JOB"))
[docs]
def start_replication_tasks(self):
"""_summary_
Returns:
_type_: _description_
"""
results = dask.compute(*self.tasks, scheduler='threads')
logger.debug(results)
return results
def _initiate_replication_tasks(self,
replication_rule,
configuration,
verbosity):
"""_summary_
Args:
replication_rule (_type_): _description_
"""
logger.info(f"_initiate_replication_tasks for {replication_rule}")
repTaskObj = replicationtask.ReplicationTask(replication_rule,
configuration,
verbosity)
task_metadata = repTaskObj.run_task()
self.job_metadata["job_tasks"].append(task_metadata)
return task_metadata
[docs]
def assign_replication_rules(self, replication_rules):
"""
For every replication rule in the job, make a separate thread
to handle it - dummy change 2
"""
status = None
try:
num_tasks = len(replication_rules)
self.tasks = [delayed(self._initiate_replication_tasks)(
replication_rules[i],
self.configuration,
self.verbosity) for i in range(0, num_tasks)]
status = 35
except Exception as e:
logger.error(f" failed to make replication tasks: {e}")
status = 36
return [status, self.tasks]
def _checkin_replication_job(self, replication_job_id):
"""This function registers that a job with id replication_job_id
has just started. If there's a backend DB set, it will
Args:
replication_job_id (int): The ID of the job to run
Returns:
dict: A dictionary containing the operation name, the status
and any other useful, textual comments
"""
result = {
"operation": "_checkin_replication_job",
"status": "STARTED",
"checkin_time": datetime.now(timezone.utc).timestamp(),
"comments": messages.DatamanwithvanMessages.warn_not_implemented
}
query_engine = self._getQueryEngine()
logger.info(query_engine)
return result
def _checkout_replication_job(self, jobMetadata):
"""_summary_
Args:
jobMetadata (_type_): _description_
Returns:
_type_: _description_
"""
logger.info(messages.DatamanwithvanMessages.warn_not_implemented)
return messages.DatamanwithvanMessages.warn_not_implemented
def _getQueryEng_mysql(self, configuration):
"""_summary_
Args:
configuration (_type_): _description_
Returns:
_type_: _description_
"""
return messages.DatamanwithvanMessages.warn_not_implemented
# TODO: Uncomment and implement this method
def _getQueryEng_cosmosdb(self, configuration):
"""_summary_
Args:
configuration (_type_): _description_
Returns:
_type_: _description_
"""
return messages.DatamanwithvanMessages.warn_not_implemented
# TODO: Uncomment and implement this method
def _getQueryEng_dynamodb(self, configuration):
"""_summary_
Args:
configuration (_type_): _description_
Returns:
_type_: _description_
"""
engine = ""
return engine
# TODO: Implement this method
def _getQueryEng_postgresql(self, configuration):
"""_summary_
Args:
configuration (_type_): _description_
Returns:
_type_: _description_
"""
return messages.DatamanwithvanMessages.warn_not_implemented
def _getQueryEng_azuresql(self, configuration):
"""_summary_
Args:
configuration (_type_): _description_
Returns:
_type_: _description_
"""
server = configuration.backenddatabase_server
database = configuration.backenddatabase_database
uid = configuration.backenddatabase_uid
pwd = configuration.backenddatabase_pwd
# credential = DefaultAzureCredential()
# token = credential.get_token(
# "https://database.windows.net/.default").token
# Construct connection string
connection_string = (
f"Driver={{ODBC Driver 18 for SQL Server}};"
f"Server={server};"
f"Database={database};"
f"Uid={uid};"
f"Pwd={pwd};"
f"Encrypt=yes;" # Ensure encryption
f"TrustServerCertificate=no;"
)
# Encode the connection string for SQLAlchemyy
params = urllib.parse.quote_plus(connection_string)
engine_url = f"mssql+pyodbc:///?odbc_connect={params}"
# Create SQLAlchemy engine
engine = create_engine(engine_url)
return engine
def _getQueryEngine(self):
"""
Returns an SQLAlchemy's QueryEngine Object.
Parameters:
- configuration (datamanwithvanConfig): The object containing
Datamanwithvan's master configuration
Returns:
- Engine: A class `_engine.Engine` instance.
"""
funcname = f"_getQueryEng_{self.configuration.backenddatabase_dbtype}"
func = getattr(self,
funcname,
None)
if callable(func):
return func(self.configuration)
else:
logger.error(messages.DatamanwithvanMessages.err_fn_no_found)
def _getReplicationRules(self, replication_job_id, query_engine):
"""_summary_
Args:
replication_job_id (_type_): _description_
Returns:
_type_: _description_
"""
result = []
status = self.StatusCodesObj.stat_code_generic
rules = []
try:
with query_engine.connect() as connection:
sch = self.configuration.backenddatabase_schema
tbl = self.configuration.backenddatabase_repl_rules_table
query = f"""SELECT *
from {sch}.{tbl}
where id={replication_job_id} and enabled=1"""
ruless = connection.execute(text(query))
for rr in ruless:
rules.append(rr)
if len(rules) == 0:
status = 1
else:
status = 0
except Exception as e:
logger.error(f"Could not fetch replication rules"
f"for Job ({replication_job_id}) : {e}")
status = 2
result = [status, rules]
return result
[docs]
def run_job(self, replication_job_id):
"""_summary_
Args:
replication_job_id (_type_): _description_
Returns:
_type_: _description_
"""
jobMetadata = {
"status": -1,
"operations": []
}
logger.info(f"Starting replication job {replication_job_id}")
# runjob step 1: Check-in the new job
x = self._checkin_replication_job(replication_job_id)
jobMetadata["operations"].append(x)
# end of runjob step 1
# runjob step 2: Get Replication Rules
replication_rules = self._getReplicationRules(
replication_job_id, self.query_engine)
logger.info(replication_rules)
# end of runjob step 2
# runjob step 3: Validate the Replication Rules
if replication_rules[0] == 0:
mg = messages.DatamanwithvanMessages.msg_info_repl_rule_exist
logger.info(mg)
dt = datetime.now(timezone.utc).timestamp()
self.job_metadata["job_id"] = replication_job_id
self.job_metadata["job_status"] = "TBD"
self.job_metadata["job_finished"] = dt
self.job_metadata["comments"] += f"{mg} // "
if replication_rules[0] == 1:
logger.warning(
messages.DatamanwithvanMessages.msg_warn_no_rep_rule)
dt = datetime.now(timezone.utc).timestamp()
mg = messages.DatamanwithvanMessages.msg_warn_no_rep_rule
self.job_metadata["job_id"] = replication_job_id
self.job_metadata["job_status"] = "NOT RUN"
self.job_metadata["job_finished"] = dt
self.job_metadata["comments"] = mg
return self.job_metadata
if replication_rules[0] == 2:
mg = messages.DatamanwithvanMessages.msg_error_cant_fetch_rep_rule
dt = datetime.now(timezone.utc).timestamp()
logger.error(mg)
self.job_metadata["job_id"] = replication_job_id
self.job_metadata["job_status"] = "FAILED"
self.job_metadata["job_finished"] = dt
self.job_metadata["comments"] = mg
return self.job_metadata
# end of runjob step 3
# runjob step 4: Assign the rules of the replication job
# to individual threads/tasks
x = replication_rules[1]
result_assign_rules = self.assign_replication_rules(x)
logger.info(result_assign_rules)
# end of runjob step 4
# runjob step 5: Kick off the replication tasks
result_start_tasks = self.start_replication_tasks()
logger.debug(result_start_tasks)
# end of runjob step 5
# runjob step 6: At this point, the replication has completed.
# Time to checkout and return all the metadata it gathered.
self._checkout_replication_job(jobMetadata)
# end of runjob step 6
logger.info(jobMetadata)
return jobMetadata
if __name__ == "__main__":
pass