make changes to index.html and validate them using "tidy --version ; tidy --warn-proprietary-attributes no -e -q index.html", this command is also added as a pre-commit hook. Also use "python -m http.server" to check that the site looks okay. For writing raw html go to editor.html
I joined Google Deepmind as a Research Engineer in 2023 and moved to Bay Area from Seattle for the job. I earned a promotion to Senior Applied Scientist in 2021. I joined Amazon Prime Research to work on Plan Recommendation and Content Optimization in April 2020. I joined the Dialog State Tracking in Amazon Alexa in April 2019. I completed my Ph.D. in Computer Science at The Center For Language and Speech Processing, Johns Hopkins University. My advisor was Benjamin Van Durme. I TA'd graduate courses on representation learning and machine learning for three semesters during my Phd studies, and I received the George Sommerman Graduate Teaching Assistant Award with a cash award of $1000. I have reviewed for Transactions On Signal Processing-19, NEURIPS-19, ICML-19, ICLR-19, EMNLP-19, ACL-19, TPAMI-18, NeurIPS-18, KG4IR-18, EMNLP-18, ACL-18.
See my google scholar profile for a complete list of publications.
Introduction to Biology | MITx at EDX | 2021 | Pass |
Ph.D. and M.S. in Computer Science | Johns Hopkins University | 2013-19 | 3.75/4.0 |
Thesis Topic: Representation Learning for Words and Entities. I presented new methods for unsupervised learning of word and entity embeddings from texts and knowledge bases. Courses and Grades: Natural Language Processing (A), Machine Learning in Complex Domains (A), Stochastic Search & Optimization (B), Parallel Programming (A-), Principles of Programming Languages (A-), Combinatorial Optimization (A+), Introduction to Convexity (A-) | |||
M.Tech. in Information and Communication Technology | IIT Delhi | 2010-11 | 8.77/10 |
B.Tech. in Electrical Engg. | IIT Delhi | 2006-10 | 8.86/10 |
class InteractivePool:
def __init__(self, J):
import time
self.J = J
self.tic = time.time()
def done(self):
return sum(1 for j in self.J if j.done()), len(self.J)
def collect(self,R=None):
R = R or {}
print(len(R))
for i, j in enumerate(self.J):
if i not in R and j.done() and not j.cancelled():
R[i] = j.result()
print(len(R))
return R
def time(self):
import datetime, time as time_module
return str(datetime.timedelta(seconds=time_module.time() - self.tic))
def wait(self, interval=600):
import time
while True:
time.sleep(interval)
if sum(1 for j in self.J if j.done()) == len(self.J):
break
return
from concurrent.futures import ProcessPoolExecutor
sidejob = ProcessPoolExecutor(max_workers=4).submit
P = InteractivePool([sidejob(f, i) for i in range(80)])
from functools import partial
import pyspark.ml as pm
from typing import *
from scipy.sparse import csr_matrix, vstack, lil_matrix, load_npz, save_npz
from pyspark import TaskContext
from tempfile import TemporaryDirectory
from glob import glob
def sparseVectorList_to_CSRMatrix(X: List[pm.linalg.SparseVector]) -> csr_matrix:
""" Convert list of pyspark sparse vectors to a scipy CSR matrix that
a standard sklearn function/lightgbm can consume.
"""
M = lil_matrix((len(X), X[0].size), dtype=np.float)
for i, x in enumerate(X):
I = np.argsort(x.indices)
M.rows[i] = x.indices[I]
M.data[i] = x.values[I]
return M.tocsr(copy=False)
class RowToPredict(NamedTuple):
"This class was created just to facilitate linting and type hinting."
customer_id: str
features: pm.linalg.SparseVector
def save_features_in_spark_as_sparsescipy_to_hdfs(
hdfs_dir,
row_gen: Iterable[RowToPredict]):
C, Flist = [], []
for e in row_gen:
Flist.append(e.features)
C.append(e.customer_id)
F = sparseVectorList_to_CSRMatrix(Flist)
pid = TaskContext().partitionId()
# Ideally I will upload file directly to HDFS, but I don't know how to directly
# write to HDFS. hdfscli didn't work for me. So work-around is to save to local
# file on task node, then upload to HDFS with a subprocess call.
with TemporaryDirectory() as tmpdirname:
print('created temporary directory', tmpdirname)
fname = f'{tmpdirname}/F.{pid}.npz'
cname = f'{tmpdirname}/C.{pid}.pkl'
with open(fname) as fh:
save_npz(fh, F)
with open(cname) as fh:
pickle.dump(C, fh)
subprocess.getstatusoutput('hadoop fs -put {fname} {cname} {hdfs_dir}')
return
# make sure that each partition has a reasonable number of rows so that we don't OOM.
npart = sdf.count() // 10000
sdf.rdd.repartition(npart).foreachPartition(partial(
save_features_in_spark_as_sparsescipy_to_hdfs, '/data/')
# After saving all the parts to hdfs, download the parts, and open them on master node.
subprocess.getstatusoutput('hadoop -copyToLocal /data/ /home/hadoop/')
L = glob('data/*.npz')
F = vstack([load_npz(e) for e in L])
C = [c for e in L for c in pickle.load(open(e))]
1. Install virtualenv
1. Create workspace, download package.
1. Go where the requirements file is.
1. Create fresh empty environment and activate it
1. install all requirements
1. generate hashes for all installed packages
1. close the shell and create new one
1. Create fresh empty environment and activate it
1. check that the new requirements file can be installed with
pip install virtualenv pip-tools
python3 -m venv env; source env/bin/activate
pip list > before; pip install -r requirements.txt; pip list > after
pip-compile requirements.txt --generate-hashes # this overwrites the original file.
exit; bash
python3 -m venv env2; source env2/bin/activate
pip install --require-hashes -r requirements.txt
def hdfs_exists(path, flag='-e'):
code, output = subprocess.getstatusoutput(f'hadoop fs -test {flag} {path}')
if code != 0:
print(output)
return False
else:
return True
def copyFromLocal(src, dst):
return subprocess.getstatusoutput(f'hadoop fs -copyFromLocal {src} {dst}')
def copyToLocal(src, dst):
return subprocess.getstatusoutput(f'hadoop fs -copyToLocal {src} {dst}')
def setup(RUNDATE='2020-07-16', spark_setting_file=None):
""" Construct spark session, setup logger, and read YAML files
from prime-ml-repo in production EMR clusters. After reading yaml files
format the paths with dates.
RUNDATE is a date string like this '2020-05-30'
"""
assert re.match('\d{4}-\d{2}-\d{2}', RUNDATE)
if spark_setting_file is None:
spark_setting_file = StringIO("""scoring:
spark.executor.memory: '20G'
spark.executor.memoryOverhead: '4G'
spark.executor.cores: 4
spark.task.cpus: 1
spark.yarn.am.memory: '2G'
spark.serializer: 'org.apache.spark.serializer.KryoSerializer'
spark.driver.maxResultSize: 0
spark.executor.extraJavaOptions: '-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'
spark.kryoserializer.buffer.max: '512M'
spark.cleaner.periodicGC.interval: '5min'
spark.network.timeout: '600s'""")
# Use logger to log everything to file and also to stderr.
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
fh = logging.FileHandler("/home/hadoop/scoring_log_file.log")
fh.setLevel(logging.INFO)
fh.setFormatter(formatter)
logger.addHandler(fh)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info(f"Initialize job parameters. {RUNDATE}")
parameters = {}
logger.info("Initialize spark settings and spark session.")
spark = SparkSession.builder.appName("claire")
for key, value in yaml.safe_load(spark_setting_file)["scoring"].items():
logger.info(f'spark: {key}={value}')
spark = spark.config(key, value)
spark = spark.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel(os.environ.get("SPARK_LOG_LEVEL", "DEBUG"))
try:
spark.sparkContext.setCheckpointDir("hdfs:///checkpoint/spark/")
except Exception as exception:
warnings.warn("Unable to set spark checkpoint directory !")
return spark, parameters, logger
def tabulate(label, proba, **kwargs):
""" Compute common statistics on a binary classification problem
given the true labels and the class probabilities.
"""
assert proba.shape[1] == 2
assert len(label) == len(proba)
R = SimpleNamespace()
R.accuracy = (label == proba.argmax(1)).mean()
R.majority_rule_accuracy = max(1 - label.mean(), label.mean())
R.log_loss = -np.log(np.select([label==0, label==1],
[proba[:, 0], proba[:, 1]])).mean()
fpr, tpr, thresholds = skm.roc_curve(label, proba[:, 1])
R.roc_auc = skm.auc(fpr, tpr)
try:
idx = np.where(fpr < 0.05)[0].max()
R.tp_at_fp_less_than_5_percent = tpr[idx]
R.fp_at_fp_less_than_5_percent = fpr[idx]
R.threshold_at_fp_less_than_5_percent = thresholds[idx]
except ValueError as e:
print(e)
pass
precision, recall, thresholds = skm.precision_recall_curve(label, proba[:, 1])
R.prauc = skm.auc(recall, precision)
R.precision = precision
R.recall = recall
R.thresholds = thresholds
idx = np.where(precision > 0.9)[0].min()
R.smallest_precision_greater_than_90pct = precision[idx]
R.recall_at_precision_90pct = recall[idx]
R.threshold_at_precision_90pct = thresholds[idx]
idx = np.where(precision > 0.5)[0].min()
R.smallest_precision_greater_than_50pct = precision[idx]
R.recall_at_precision_50pct = recall[idx]
R.threshold_at_precision_50pct = thresholds[idx]
R = R.__dict__
R.update(kwargs)
return R
import logging
def setup_logger(file_path=None):
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
if all(not isinstance(e, logging.FileHandler)
for e in logger.handlers):
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info(f"Initialized logger with StreamHandler")
if file_path and all(not isinstance(e, logging.FileHandler)
for e in logger.handlers):
fh = logging.FileHandler(file_path, 'a')
fh.setLevel(logging.INFO)
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.info(f"Initialized logger with FileHandler({file_path})")
return
/* Java */
short a = 3; // 2
long b = 5; // 8
float c = (float)7.0; // 4
ByteBuffer bb = ByteBuffer.allocate(14);
bb.order(ByteOrder.LITTLE_ENDIAN);
bb.putShort(a);
pp(bb.position()); // 2
bb.putLong(b);
pp(bb.position()); // 10
bb.putFloat(c);
pp(bb.position()); // 14
bb.position(0); // crucial.
try(RandomAccessFile f = new RandomAccessFile("/tmp/tmp.dat", "rw");
FileChannel fc = (f).getChannel();) {
pp(bb.order().toString());
pp(fc.write(bb));
}
## Python
from mmap import mmap, PROT_READ
import os
import numpy as np
import sys
assert sys.byteorder == 'little'
fd = os.open('/tmp/tmp.dat', os.O_RDONLY)
buf = mmap(fd, 14, prot=PROT_READ)
# L = little endian.
arr1 = np.frombuffer(buf, dtype=np.dtype('int16').newbyteorder('L'), count=1, offset=0)
arr2 = np.frombuffer(buf, dtype=np.dtype('int64').newbyteorder('L'), count=1, offset=2)
arr3 = np.frombuffer(buf, dtype=np.dtype('float32').newbyteorder('L'), count=1, offset=10)
arr1, arr2, arr3
static void pp(Object format, Object... args) {
System.out.printf(format.toString(), args);
System.out.println();
}
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
elif type(obj).__module__ == 'numpy':
if type(obj).__name__.startswith('float'):
return float(obj)
elif type(obj).__name__.startswith('int'):
return int(obj)
else:
return bool(obj)
return json.JSONEncoder.default(self, obj)
aws emr list-clusters --region us-east-1 --active > /tmp/tmp1
jq '.Clusters|.[]|.Id' /tmp/tmp1 -r | xargs -n 1 -I % sh -c 'aws emr list-instances --cluster-id % --instance-states RUNNING --region us-east-1' > /tmp/tmp2
jq '.Instances | .[] | .InstanceType' /tmp/tmp2 -r | sort | uniq -c
aws ec2 describe-reserved-instances --region us-east-1 | jq '.ReservedInstances | .[] | [.InstanceType, .InstanceCount, .State]' -c -r | fgrep -v retired | sort
'Two-sided ZTest Pvalue from counts
Public Function CountsZTest(count1, nob1, count2, nob2)
CountsZTest = RatioZTest(count1 * 1# / nob1, nob1, count2 * 1# / nob2, nob2)
End Function
Public Function RatioZTest(p1, nob1, p2, nob2)
diff = p1 - p2
p_pooled = (p1 * nob1 + p2 * nob2) * 1# / (nob1 + nob2)
nobs_2xhm = 1# / nob1 + 1# / nob2
var1 = p_pooled * (1 - p_pooled) * nobs_2xhm
std_diff = Sqr(var1)
RatioZTest = Application.WorksheetFunction.Norm_S_Dist(-Abs(diff / std_diff), True) * 2
End Function
Steps for adding a noteIf you have a PDF then store it in the posts folder. Clone posts/pdftemplate.html and search-replace any mention of note1.pdf from the html. Finally, link that html file here. Otherwise copy posts/template.html and modify it. |
Note 12 Offline evaluation and learning in bandits. |
Note 11 - Sparsity in Deep Learning Layers(open in new page) |
Note 10 Exploration Scavenging in comparison to other off-policy estimators. |
Note 9 Using a PID Controller for controlling the number of servers in a data-center.[YC][Gist] |
Note 8 Using kelly criterion for bankroll management. |
Note 7 The foundations for finding MVUE via the use of Rao-Blackwellization. |
Note 6 The VW FAQ. |
Note 5 The Basics of ZeroMQ. |
Note 4 A visual summary of the inequalities govening Entropy, Cross Entropy, Joint Entropy, KL Divergence, and Mutual Information including the Data Processing Inequality. |
Note 3 [WIP] A visual proof of the UCB algorithm. |
Note 2 A video tutorial about the difference between PnL and Cashflow, and how a company can have positive cash flow but still make loss, without raising debt. (you may need to download the video and play with VLC) |
Note 1: Describes how the variance of an AB test can be reduced in the special case when we are comparing two policies with the same small-finite action space.
हिंदी विवरणहम दो विधियों/treatments के बीच में कितना फर्क है ये पता करना चाहते है। साधारण तरीका होगा AB testing/ randomized control trials जिसमें की हम randomly/बेतरतीब तरिके से आधे लोगो को विधि A आवंटित करते है और आधे को विधि B प्रदान करते है । उसके बाद दोनों दल में औसत फर्क का अंतर हम पता करते है । ये सबसे आसान पद्धति हैं और मानलो की इस पद्धति को इस्तेमाल करने पर हमे 10000 लोगो पे परीक्षण करना पड़ेगा ताकि हम 10% का फर्क दोनो दलों के बीच मे पता कर पाए। जो pdf मैंने भेजी है वो एक विशेष स्थिति का विश्लेषण प्रशेष करती है जो की 25% कम sample इस्तेमाल करती है। ofcourse ये कोई नई तकनीक नही है सिर्फ मैने अपनी समझ के लिये लिखी है। |
First compile a pdf, either in overleaf, or using latexmk, then add all the assets, the .tex and .pdf file to res/trivia folder. Then add the iframe with src, loading, width, height attributes.
The formula for change of distribution under one-to-one differentiable maps is well-known. If \( f \) maps \( x \) to \( y \) then \( p_Y(y) = p_X(f^{-1}(y)) abs(det( jac_{f^{-1}}( y )) \). However what happens in the more general case where let's say that the function is not invertible ? This can happen in situations like Z = X/Y or Z = X + Y etc. In such situations the most clean method is to compute the probability of the event underlying the CDF and then differentiating. Basically
\[ \begin{align}p_Z(z) &= \frac{d}{dz} \int_{\mathbf{x}} \mathbb{I}[f(\mathbf{x}) < z] \ p_{\mathbf{X}}(\mathbf{x}) d\mathbf{x} = \frac{d}{dz} \int_{\mathbb{I}[f(\mathbf{x}) < z]} \ p_{\mathbf{X}}(\mathbf{x}) d\mathbf{x} \end{align} \]
Now recall that \( \frac{d}{dx}\int_a^{u(x)} f(t) dt = u'(x) f(u(x)) \) therefore if we can write the acceptable region as a function of \( z \) then we are in business. For example, if \( z = x_2/x_1 \) then the acceptable region is \( \{x_1 > 0, x_2 < zx_1\} \cup \{x_1 < 0, x_2 > zx_1\} \) and the integral is \[ \int_0^\infty x_1 p(x_1, zx_1)dx_1 + \int_{-\infty}^0-x_1p(x_1, zx_1)dx_1 \]where the derivative has been interchanged assuming fubini's theorem applies in this case.
Stationarity simply means that the n-th order joint pdfs are independent of time. Wide-sense stationarity (or loose sense stationarity) means that the mean is constant with time, and that the correlation between any two observations only varies as a function of time. Stationarity does not mean that the the correlation function (RX (T1, 𝛕= T2 - T1)) is zero or delta function. No! Wide-Sense Stationarity just means that the correlation function is invariant with time, only depends on the difference. Note that stationarity / non-stationarity describes a process, not the actual observed signal. The observed signals can still exhibit periodicity while being w-stationary. Now In an auto-regressive (AR) process, the current value of the signal is a linear function of past observations plus a noise term. This process can be non-stationary if the system has a so-called "unit-root". but AR processes may still be stationary without trend. OTOH Moving average processes (MA) are always defined as linear combinations of observations coming from some hidden iid error terms. So the MA process is always stationary. Dickey Fuller procedure tests whether a unit-root it present in an AR process. There are different versions of this test depending on exactly what type of AR process is assumed and even what is the alternative to the null hypothesis such as Augmented Dickey Fuller, KPSS. But none of these tests can check for whether a trend is present in the dataset because these "stationarity" tests can give false-positives for trend. If a *DF test or KPSS test says that the signal is stationary then rest assured we can say that no trend exists, but when these tests say that a trend exists, then it's possible that they are just detecting a unit-root random walk which obviously does not have actual trend.
▷ Hosted Jsmol: an open-source Javascript viewer for chemical structures in 3D. link.
▷ Hosted JSME: an open-source Javascript Molecule Editor. link.