1. added utility command to export call logs

2. mongo conn accepts port
Malar Kannan 2020-05-21 10:43:26 +05:30
parent 8e79bbb571
commit 2d5b720284
2 changed files with 93 additions and 38 deletions

View File

@ -12,15 +12,52 @@ app = typer.Typer()
@app.command()
def export_logs(call_logs_file: Path = Path("./call_sia_logs.yaml")):
from pymongo import MongoClient
def export_all_logs(call_logs_file: Path = Path("./call_sia_logs.yaml")):
from .utils import get_mongo_conn
from collections import defaultdict
from ruamel.yaml import YAML
yaml = YAML()
mongo_collection = MongoClient("mongodb://localhost:27017/").test.calls
mongo_coll = get_mongo_conn().test.calls
caller_calls = defaultdict(lambda: [])
for call in mongo_collection.find():
for call in mongo_coll.find():
sysid = call["SystemID"]
call_uri = f"http://sia-data.agaralabs.com/calls/{sysid}"
caller = call["Caller"]
caller_calls[caller].append(call_uri)
caller_list = []
for caller in caller_calls:
caller_list.append({"name": caller, "calls": caller_calls[caller]})
output_yaml = {"users": caller_list}
typer.echo("exporting call logs to yaml file")
with call_logs_file.open("w") as yf:
yaml.dump(output_yaml, yf)
@app.command()
def export_calls_between(
start_cid: str,
end_cid: str,
call_logs_file: Path = Path("./call_sia_logs.yaml"),
mongo_port: int = 27017,
):
from collections import defaultdict
from ruamel.yaml import YAML
from .utils import get_mongo_conn
yaml = YAML()
mongo_coll = get_mongo_conn(port=mongo_port).test.calls
start_meta = mongo_coll.find_one({"SystemID": start_cid})
end_meta = mongo_coll.find_one({"SystemID": end_cid})
caller_calls = defaultdict(lambda: [])
call_query = mongo_coll.find(
{
"StartTS": {"$gte": start_meta["StartTS"]},
"EndTS": {"$lte": end_meta["EndTS"]},
}
)
for call in call_query:
sysid = call["SystemID"]
call_uri = f"http://sia-data.agaralabs.com/calls/{sysid}"
caller = call["Caller"]
@ -39,13 +76,14 @@ def analyze(
leaderboard: bool = False,
plot_calls: bool = False,
extract_data: bool = False,
download_only: bool = False,
call_logs_file: Path = Path("./call_logs.yaml"),
output_dir: Path = Path("./data"),
mongo_port: int = 27017,
):
from urllib.parse import urlsplit
from functools import reduce
from pymongo import MongoClient
import boto3
from io import BytesIO
@ -64,7 +102,7 @@ def analyze(
import matplotlib.pyplot as plt
import matplotlib
from tqdm import tqdm
from .utils import asr_data_writer
from .utils import asr_data_writer, get_mongo_conn
from pydub import AudioSegment
from natural.date import compress
@ -80,7 +118,7 @@ def analyze(
# logger = logging.getLogger(__name__)
yaml = YAML()
s3 = boto3.client("s3")
mongo_collection = MongoClient("mongodb://localhost:27017/").test.calls
mongo_collection = get_mongo_conn(port=mongo_port).test.calls
call_media_dir: Path = output_dir / Path("call_wavs")
call_media_dir.mkdir(exist_ok=True, parents=True)
call_meta_dir: Path = output_dir / Path("call_metas")
@ -118,15 +156,32 @@ def analyze(
return get_timedelta
def chunk_n(evs, n):
return [evs[i * n : (i + 1) * n] for i in range((len(evs) + n - 1) // n)]
def get_data_points(utter_events, td_fn):
data_points = []
for evs in chunk_n(utter_events, 3):
assert evs[0]["Type"] == "CONV_RESULT"
assert evs[1]["Type"] == "STARTED_SPEAKING"
assert evs[2]["Type"] == "STOPPED_SPEAKING"
start_time = td_fn(evs[1]).total_seconds() - 1.5
end_time = td_fn(evs[2]).total_seconds()
code = evs[0]["Msg"]
data_points.append(
{"start_time": start_time, "end_time": end_time, "code": code}
)
return data_points
def process_call(call_obj):
call_meta = get_call_meta(call_obj)
call_events = call_meta["Events"]
def is_writer_event(ev):
return ev["Author"] == "AUDIO_WRITER"
def is_writer_uri_event(ev):
return ev["Author"] == "AUDIO_WRITER" and 's3://' in ev["Msg"]
writer_events = list(filter(is_writer_event, call_events))
s3_wav_url = re.search(r"saved to: (.*)", writer_events[0]["Msg"]).groups(0)[0]
writer_events = list(filter(is_writer_uri_event, call_events))
s3_wav_url = re.search(r"(s3://.*)", writer_events[0]["Msg"]).groups(0)[0]
s3_wav_url_p = urlsplit(s3_wav_url)
def is_first_audio_ev(state, ev):
@ -157,22 +212,6 @@ def analyze(
)
# %config InlineBackend.figure_format = "retina"
def chunk_n(evs, n):
return [evs[i * n : (i + 1) * n] for i in range((len(evs) + n - 1) // n)]
def get_data_points(utter_events):
data_points = []
for evs in chunk_n(utter_events, 3):
assert evs[0]["Type"] == "CONV_RESULT"
assert evs[1]["Type"] == "STARTED_SPEAKING"
assert evs[2]["Type"] == "STOPPED_SPEAKING"
start_time = get_ev_fev_timedelta(evs[1]).total_seconds() - 1.5
end_time = get_ev_fev_timedelta(evs[2]).total_seconds()
code = evs[0]["Msg"]
data_points.append(
{"start_time": start_time, "end_time": end_time, "code": code}
)
return data_points
def plot_events(y, sr, utter_events, file_path):
plt.figure(figsize=(16, 12))
@ -202,22 +241,36 @@ def analyze(
plt.title("Monophonic")
plt.savefig(file_path, format="png")
data_points = get_data_points(utter_events)
return {
"wav_path": saved_wav_path,
"num_samples": len(utter_events) // 3,
"meta": call_obj,
"data_points": data_points,
"first_event_fn": get_ev_fev_timedelta,
"utter_events": utter_events,
}
def retrieve_callmeta(uri):
cid = Path(urlsplit(uri).path).stem
def get_cid(uri):
return Path(urlsplit(uri).path).stem
def ensure_call(uri):
cid = get_cid(uri)
meta = mongo_collection.find_one({"SystemID": cid})
process_meta = process_call(meta)
return process_meta
def retrieve_processed_callmeta(uri):
cid = get_cid(uri)
meta = mongo_collection.find_one({"SystemID": cid})
duration = meta["EndTS"] - meta["StartTS"]
process_meta = process_call(meta)
data_points = get_data_points(process_meta['utter_events'], process_meta['first_event_fn'])
process_meta['data_points'] = data_points
return {"url": uri, "meta": meta, "duration": duration, "process": process_meta}
def download_meta_audio():
call_lens = lens["users"].Each()["calls"].Each()
call_lens.modify(ensure_call)(call_logs)
# @plot_app.command()
def plot_calls_data():
def plot_data_points(y, sr, data_points, file_path):
@ -252,9 +305,8 @@ def analyze(
plot_data_points(y, sr, data_points, str(file_path))
return file_path
# plot_call(retrieve_callmeta("http://saasdev.agaralabs.com/calls/JOR9V47L03AGUEL"))
call_lens = lens["users"].Each()["calls"].Each()
call_stats = call_lens.modify(retrieve_callmeta)(call_logs)
call_stats = call_lens.modify(retrieve_processed_callmeta)(call_logs)
# call_plot_data = call_lens.collect()(call_stats)
call_plots = call_lens.modify(plot_call)(call_stats)
# with ThreadPoolExecutor(max_workers=20) as exe:
@ -285,7 +337,7 @@ def analyze(
yield extracted_code, code_seg.duration_seconds, code_wav
call_lens = lens["users"].Each()["calls"].Each()
call_stats = call_lens.modify(retrieve_callmeta)(call_logs)
call_stats = call_lens.modify(retrieve_processed_callmeta)(call_logs)
call_objs = call_lens.collect()(call_stats)
def data_source():
@ -315,7 +367,7 @@ def analyze(
}
call_lens = lens["users"].Each()["calls"].Each()
call_stats = call_lens.modify(retrieve_callmeta)(call_logs)
call_stats = call_lens.modify(retrieve_processed_callmeta)(call_logs)
user_stats = lens["users"].Each().modify(compute_user_stats)(call_stats)
leader_df = (
pd.DataFrame(user_stats["users"])
@ -338,6 +390,9 @@ def analyze(
)
print(leader_board.to_string(index=False))
if download_only:
download_meta_audio()
return
if leaderboard:
show_leaderboard()
if plot_calls:

View File

@ -104,9 +104,9 @@ class ExtendedPath(type(Path())):
return json.dump(data, jf, indent=2)
def get_mongo_conn(host=''):
def get_mongo_conn(host='', port=27017):
mongo_host = host if host else os.environ.get("MONGO_HOST", "localhost")
mongo_uri = f"mongodb://{mongo_host}:27017/"
mongo_uri = f"mongodb://{mongo_host}:{port}/"
return pymongo.MongoClient(mongo_uri)