From 2d5b7202844530fec9cbaaf3ea7865ce9a1a96a9 Mon Sep 17 00:00:00 2001 From: Malar Kannan Date: Thu, 21 May 2020 10:43:26 +0530 Subject: [PATCH] 1. added utility command to export call logs 2. mongo conn accepts port --- jasper/data_utils/call_recycler.py | 127 +++++++++++++++++++++-------- jasper/data_utils/utils.py | 4 +- 2 files changed, 93 insertions(+), 38 deletions(-) diff --git a/jasper/data_utils/call_recycler.py b/jasper/data_utils/call_recycler.py index ae966d5..5989d33 100644 --- a/jasper/data_utils/call_recycler.py +++ b/jasper/data_utils/call_recycler.py @@ -12,15 +12,52 @@ app = typer.Typer() @app.command() -def export_logs(call_logs_file: Path = Path("./call_sia_logs.yaml")): - from pymongo import MongoClient +def export_all_logs(call_logs_file: Path = Path("./call_sia_logs.yaml")): + from .utils import get_mongo_conn from collections import defaultdict from ruamel.yaml import YAML yaml = YAML() - mongo_collection = MongoClient("mongodb://localhost:27017/").test.calls + mongo_coll = get_mongo_conn().test.calls caller_calls = defaultdict(lambda: []) - for call in mongo_collection.find(): + for call in mongo_coll.find(): + sysid = call["SystemID"] + call_uri = f"http://sia-data.agaralabs.com/calls/{sysid}" + caller = call["Caller"] + caller_calls[caller].append(call_uri) + caller_list = [] + for caller in caller_calls: + caller_list.append({"name": caller, "calls": caller_calls[caller]}) + output_yaml = {"users": caller_list} + typer.echo("exporting call logs to yaml file") + with call_logs_file.open("w") as yf: + yaml.dump(output_yaml, yf) + + +@app.command() +def export_calls_between( + start_cid: str, + end_cid: str, + call_logs_file: Path = Path("./call_sia_logs.yaml"), + mongo_port: int = 27017, +): + from collections import defaultdict + from ruamel.yaml import YAML + from .utils import get_mongo_conn + + yaml = YAML() + mongo_coll = get_mongo_conn(port=mongo_port).test.calls + start_meta = mongo_coll.find_one({"SystemID": start_cid}) + end_meta = mongo_coll.find_one({"SystemID": end_cid}) + + caller_calls = defaultdict(lambda: []) + call_query = mongo_coll.find( + { + "StartTS": {"$gte": start_meta["StartTS"]}, + "EndTS": {"$lte": end_meta["EndTS"]}, + } + ) + for call in call_query: sysid = call["SystemID"] call_uri = f"http://sia-data.agaralabs.com/calls/{sysid}" caller = call["Caller"] @@ -39,13 +76,14 @@ def analyze( leaderboard: bool = False, plot_calls: bool = False, extract_data: bool = False, + download_only: bool = False, call_logs_file: Path = Path("./call_logs.yaml"), output_dir: Path = Path("./data"), + mongo_port: int = 27017, ): from urllib.parse import urlsplit from functools import reduce - from pymongo import MongoClient import boto3 from io import BytesIO @@ -64,7 +102,7 @@ def analyze( import matplotlib.pyplot as plt import matplotlib from tqdm import tqdm - from .utils import asr_data_writer + from .utils import asr_data_writer, get_mongo_conn from pydub import AudioSegment from natural.date import compress @@ -80,7 +118,7 @@ def analyze( # logger = logging.getLogger(__name__) yaml = YAML() s3 = boto3.client("s3") - mongo_collection = MongoClient("mongodb://localhost:27017/").test.calls + mongo_collection = get_mongo_conn(port=mongo_port).test.calls call_media_dir: Path = output_dir / Path("call_wavs") call_media_dir.mkdir(exist_ok=True, parents=True) call_meta_dir: Path = output_dir / Path("call_metas") @@ -118,15 +156,32 @@ def analyze( return get_timedelta + def chunk_n(evs, n): + return [evs[i * n : (i + 1) * n] for i in range((len(evs) + n - 1) // n)] + + def get_data_points(utter_events, td_fn): + data_points = [] + for evs in chunk_n(utter_events, 3): + assert evs[0]["Type"] == "CONV_RESULT" + assert evs[1]["Type"] == "STARTED_SPEAKING" + assert evs[2]["Type"] == "STOPPED_SPEAKING" + start_time = td_fn(evs[1]).total_seconds() - 1.5 + end_time = td_fn(evs[2]).total_seconds() + code = evs[0]["Msg"] + data_points.append( + {"start_time": start_time, "end_time": end_time, "code": code} + ) + return data_points + def process_call(call_obj): call_meta = get_call_meta(call_obj) call_events = call_meta["Events"] - def is_writer_event(ev): - return ev["Author"] == "AUDIO_WRITER" + def is_writer_uri_event(ev): + return ev["Author"] == "AUDIO_WRITER" and 's3://' in ev["Msg"] - writer_events = list(filter(is_writer_event, call_events)) - s3_wav_url = re.search(r"saved to: (.*)", writer_events[0]["Msg"]).groups(0)[0] + writer_events = list(filter(is_writer_uri_event, call_events)) + s3_wav_url = re.search(r"(s3://.*)", writer_events[0]["Msg"]).groups(0)[0] s3_wav_url_p = urlsplit(s3_wav_url) def is_first_audio_ev(state, ev): @@ -157,22 +212,6 @@ def analyze( ) # %config InlineBackend.figure_format = "retina" - def chunk_n(evs, n): - return [evs[i * n : (i + 1) * n] for i in range((len(evs) + n - 1) // n)] - - def get_data_points(utter_events): - data_points = [] - for evs in chunk_n(utter_events, 3): - assert evs[0]["Type"] == "CONV_RESULT" - assert evs[1]["Type"] == "STARTED_SPEAKING" - assert evs[2]["Type"] == "STOPPED_SPEAKING" - start_time = get_ev_fev_timedelta(evs[1]).total_seconds() - 1.5 - end_time = get_ev_fev_timedelta(evs[2]).total_seconds() - code = evs[0]["Msg"] - data_points.append( - {"start_time": start_time, "end_time": end_time, "code": code} - ) - return data_points def plot_events(y, sr, utter_events, file_path): plt.figure(figsize=(16, 12)) @@ -202,22 +241,36 @@ def analyze( plt.title("Monophonic") plt.savefig(file_path, format="png") - data_points = get_data_points(utter_events) - return { "wav_path": saved_wav_path, "num_samples": len(utter_events) // 3, "meta": call_obj, - "data_points": data_points, + "first_event_fn": get_ev_fev_timedelta, + "utter_events": utter_events, } - def retrieve_callmeta(uri): - cid = Path(urlsplit(uri).path).stem + def get_cid(uri): + return Path(urlsplit(uri).path).stem + + def ensure_call(uri): + cid = get_cid(uri) + meta = mongo_collection.find_one({"SystemID": cid}) + process_meta = process_call(meta) + return process_meta + + def retrieve_processed_callmeta(uri): + cid = get_cid(uri) meta = mongo_collection.find_one({"SystemID": cid}) duration = meta["EndTS"] - meta["StartTS"] process_meta = process_call(meta) + data_points = get_data_points(process_meta['utter_events'], process_meta['first_event_fn']) + process_meta['data_points'] = data_points return {"url": uri, "meta": meta, "duration": duration, "process": process_meta} + def download_meta_audio(): + call_lens = lens["users"].Each()["calls"].Each() + call_lens.modify(ensure_call)(call_logs) + # @plot_app.command() def plot_calls_data(): def plot_data_points(y, sr, data_points, file_path): @@ -252,9 +305,8 @@ def analyze( plot_data_points(y, sr, data_points, str(file_path)) return file_path - # plot_call(retrieve_callmeta("http://saasdev.agaralabs.com/calls/JOR9V47L03AGUEL")) call_lens = lens["users"].Each()["calls"].Each() - call_stats = call_lens.modify(retrieve_callmeta)(call_logs) + call_stats = call_lens.modify(retrieve_processed_callmeta)(call_logs) # call_plot_data = call_lens.collect()(call_stats) call_plots = call_lens.modify(plot_call)(call_stats) # with ThreadPoolExecutor(max_workers=20) as exe: @@ -285,7 +337,7 @@ def analyze( yield extracted_code, code_seg.duration_seconds, code_wav call_lens = lens["users"].Each()["calls"].Each() - call_stats = call_lens.modify(retrieve_callmeta)(call_logs) + call_stats = call_lens.modify(retrieve_processed_callmeta)(call_logs) call_objs = call_lens.collect()(call_stats) def data_source(): @@ -315,7 +367,7 @@ def analyze( } call_lens = lens["users"].Each()["calls"].Each() - call_stats = call_lens.modify(retrieve_callmeta)(call_logs) + call_stats = call_lens.modify(retrieve_processed_callmeta)(call_logs) user_stats = lens["users"].Each().modify(compute_user_stats)(call_stats) leader_df = ( pd.DataFrame(user_stats["users"]) @@ -338,6 +390,9 @@ def analyze( ) print(leader_board.to_string(index=False)) + if download_only: + download_meta_audio() + return if leaderboard: show_leaderboard() if plot_calls: diff --git a/jasper/data_utils/utils.py b/jasper/data_utils/utils.py index da6ec5e..9ff26eb 100644 --- a/jasper/data_utils/utils.py +++ b/jasper/data_utils/utils.py @@ -104,9 +104,9 @@ class ExtendedPath(type(Path())): return json.dump(data, jf, indent=2) -def get_mongo_conn(host=''): +def get_mongo_conn(host='', port=27017): mongo_host = host if host else os.environ.get("MONGO_HOST", "localhost") - mongo_uri = f"mongodb://{mongo_host}:27017/" + mongo_uri = f"mongodb://{mongo_host}:{port}/" return pymongo.MongoClient(mongo_uri)