header png

Links

profile for Pushpendre on Stack Exchange

github

Linkedin Logo

Edit this page

Search this website

See deployment status

how to edit this page

make changes to index.html and validate them using "tidy --version ; tidy --warn-proprietary-attributes no -e -q index.html", this command is also added as a pre-commit hook. Also use "python -m http.server" to check that the site looks okay. For writing raw html go to editor.html

Pushpendre Rastogi

pushpendre at gmail

Introduction

I joined Google Deepmind as a Research Engineer in 2023 and moved to Bay Area from Seattle for the job. I earned a promotion to Senior Applied Scientist in 2021. I joined Amazon Prime Research to work on Plan Recommendation and Content Optimization in April 2020. I joined the Dialog State Tracking in Amazon Alexa in April 2019. I completed my Ph.D. in Computer Science at The Center For Language and Speech Processing, Johns Hopkins University. My advisor was Benjamin Van Durme. I TA'd graduate courses on representation learning and machine learning for three semesters during my Phd studies, and I received the George Sommerman Graduate Teaching Assistant Award with a cash award of $1000. I have reviewed for Transactions On Signal Processing-19, NEURIPS-19, ICML-19, ICLR-19, EMNLP-19, ACL-19, TPAMI-18, NeurIPS-18, KG4IR-18, EMNLP-18, ACL-18.

Selected Publications

See my google scholar profile for a complete list of publications.

Education

Introduction to BiologyMITx at EDX2021Pass
Ph.D. and M.S. in Computer ScienceJohns Hopkins University2013-193.75/4.0
Thesis Topic: Representation Learning for Words and Entities. I presented new methods for unsupervised learning of word and entity embeddings from texts and knowledge bases.
Courses and Grades: Natural Language Processing (A), Machine Learning in Complex Domains (A), Stochastic Search & Optimization (B), Parallel Programming (A-), Principles of Programming Languages (A-), Combinatorial Optimization (A+), Introduction to Convexity (A-)
M.Tech. in Information and Communication TechnologyIIT Delhi2010-118.77/10
B.Tech. in Electrical Engg.IIT Delhi2006-108.86/10

Code Snippets

Quick and dirty experiment tracker (open in new page)
Problem dependent reparameterization of a knapsack problem for asymptotic efficiency (open in new page)
Parsing using a grammar (abstraction + latent variables) vs actions on a stack (open in new page)
Some Graph Algorithms (open in new page)
Bayesian Optimization in AX with constraints (open in new page)
Simplify a jupyter notebook (open in new page)
Interactive pool for running jobs on the side in python.
 class InteractivePool:
    def __init__(self, J):
        import time
        self.J = J
        self.tic = time.time()

    def done(self):
        return sum(1 for j in self.J if j.done()), len(self.J)

    def collect(self,R=None):
        R = R or {}
        print(len(R))
        for i, j in enumerate(self.J):
            if i not in R and j.done() and not j.cancelled():
                R[i] = j.result()
        print(len(R))
        return R

    def time(self):
        import datetime, time as time_module
        return str(datetime.timedelta(seconds=time_module.time() - self.tic))

    def wait(self, interval=600):
        import time
        while True:
            time.sleep(interval)
            if sum(1 for j in self.J if j.done()) == len(self.J):
                break
        return

from concurrent.futures import ProcessPoolExecutor
sidejob = ProcessPoolExecutor(max_workers=4).submit
P = InteractivePool([sidejob(f, i) for i in range(80)]) 
Save spark dataframe to sparse scipy arrays
 from functools import partial
import pyspark.ml as pm
from typing import *
from scipy.sparse import csr_matrix, vstack, lil_matrix, load_npz, save_npz
from pyspark import TaskContext
from tempfile import TemporaryDirectory
from glob import glob

def sparseVectorList_to_CSRMatrix(X: List[pm.linalg.SparseVector]) -> csr_matrix:
     """ Convert list of pyspark sparse vectors to a scipy CSR matrix that
     a standard sklearn function/lightgbm can consume.
     """
     M = lil_matrix((len(X), X[0].size), dtype=np.float)
     for i, x in enumerate(X):
         I = np.argsort(x.indices)
         M.rows[i] = x.indices[I]
         M.data[i] = x.values[I]
     return M.tocsr(copy=False)

 class RowToPredict(NamedTuple):
     "This class was created just to facilitate linting and type hinting."
     customer_id: str
     features: pm.linalg.SparseVector

def save_features_in_spark_as_sparsescipy_to_hdfs(
     hdfs_dir,
     row_gen: Iterable[RowToPredict]):
     C, Flist = [], []
     for e in row_gen:
         Flist.append(e.features)
         C.append(e.customer_id)
     F = sparseVectorList_to_CSRMatrix(Flist)
     pid = TaskContext().partitionId()
     # Ideally I will upload file directly to HDFS, but I don't know how to directly
     # write to HDFS. hdfscli didn't work for me. So work-around is to save to local
     # file on task node, then upload to HDFS with a subprocess call.
     with TemporaryDirectory() as tmpdirname:
         print('created temporary directory', tmpdirname)
         fname = f'{tmpdirname}/F.{pid}.npz'
         cname = f'{tmpdirname}/C.{pid}.pkl'
         with open(fname) as fh:
             save_npz(fh, F)
         with open(cname) as fh:
             pickle.dump(C, fh)
         subprocess.getstatusoutput('hadoop fs -put {fname} {cname} {hdfs_dir}')
     return

# make sure that each partition has a reasonable number of rows so that we don't OOM.
npart = sdf.count() // 10000
sdf.rdd.repartition(npart).foreachPartition(partial(
    save_features_in_spark_as_sparsescipy_to_hdfs, '/data/')

# After saving all the parts to hdfs, download the parts, and open them on master node.
subprocess.getstatusoutput('hadoop -copyToLocal /data/ /home/hadoop/')
L = glob('data/*.npz')
F = vstack([load_npz(e) for e in L])
C = [c for e in L for c in pickle.load(open(e))] 
How to hash check pip files
 1. Install virtualenv
1. Create workspace, download package.
1. Go where the requirements file is.
1. Create fresh empty environment and activate it
1. install all requirements
1. generate hashes for all installed packages
1. close the shell and create new one
1. Create fresh empty environment and activate it
1. check that the new requirements file can be installed with

pip install virtualenv  pip-tools
python3 -m venv env;  source env/bin/activate
 pip list > before; pip install -r requirements.txt; pip list > after
 pip-compile requirements.txt --generate-hashes # this overwrites the original file.
 exit; bash
 python3 -m venv env2; source env2/bin/activate
 pip install --require-hashes -r requirements.txt
hdfs file system functionality exposed to python
 def hdfs_exists(path, flag='-e'):
    code, output = subprocess.getstatusoutput(f'hadoop fs -test {flag} {path}')
    if code != 0:
        print(output)
        return False
    else:
        return True

def copyFromLocal(src, dst):
    return subprocess.getstatusoutput(f'hadoop fs -copyFromLocal {src} {dst}')

def copyToLocal(src, dst):
    return subprocess.getstatusoutput(f'hadoop fs -copyToLocal {src} {dst}') 
Spark Setup
 def setup(RUNDATE='2020-07-16', spark_setting_file=None):
    """ Construct spark session, setup logger, and read YAML files
    from prime-ml-repo in production EMR clusters. After reading yaml files
    format the paths with dates.

    RUNDATE is a date string like this '2020-05-30'
    """
    assert re.match('\d{4}-\d{2}-\d{2}', RUNDATE)
    if spark_setting_file is None:
        spark_setting_file = StringIO("""scoring:
     spark.executor.memory: '20G'
     spark.executor.memoryOverhead: '4G'
     spark.executor.cores: 4
     spark.task.cpus: 1
     spark.yarn.am.memory: '2G'
     spark.serializer: 'org.apache.spark.serializer.KryoSerializer'
     spark.driver.maxResultSize: 0
     spark.executor.extraJavaOptions: '-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'
     spark.kryoserializer.buffer.max: '512M'
     spark.cleaner.periodicGC.interval: '5min'
     spark.network.timeout: '600s'""")
    # Use logger to log everything to file and also to stderr.
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
    )
    fh = logging.FileHandler("/home/hadoop/scoring_log_file.log")
    fh.setLevel(logging.INFO)
    fh.setFormatter(formatter)
    logger.addHandler(fh)
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    logger.info(f"Initialize job parameters. {RUNDATE}")

    parameters = {}

    logger.info("Initialize spark settings and spark session.")
    spark = SparkSession.builder.appName("claire")
    for key, value in yaml.safe_load(spark_setting_file)["scoring"].items():
        logger.info(f'spark: {key}={value}')
        spark = spark.config(key, value)
    spark = spark.enableHiveSupport().getOrCreate()
    spark.sparkContext.setLogLevel(os.environ.get("SPARK_LOG_LEVEL", "DEBUG"))
    try:
        spark.sparkContext.setCheckpointDir("hdfs:///checkpoint/spark/")
    except Exception as exception:
        warnings.warn("Unable to set spark checkpoint directory !")
    return spark, parameters, logger 
Common model inspections on binary classification test set
 def tabulate(label, proba, **kwargs):
    """ Compute common statistics on a binary classification problem
    given the true labels and the class probabilities.
    """
    assert proba.shape[1] == 2
    assert len(label) == len(proba)
    R = SimpleNamespace()
    R.accuracy = (label == proba.argmax(1)).mean()
    R.majority_rule_accuracy = max(1 - label.mean(), label.mean())
    R.log_loss = -np.log(np.select([label==0, label==1],
                                   [proba[:, 0], proba[:, 1]])).mean()
    fpr, tpr, thresholds = skm.roc_curve(label, proba[:, 1])
    R.roc_auc = skm.auc(fpr, tpr)
    try:
        idx = np.where(fpr < 0.05)[0].max()
        R.tp_at_fp_less_than_5_percent = tpr[idx]
        R.fp_at_fp_less_than_5_percent = fpr[idx]
        R.threshold_at_fp_less_than_5_percent = thresholds[idx]
    except ValueError as e:
        print(e)
        pass
    precision, recall, thresholds = skm.precision_recall_curve(label, proba[:, 1])
    R.prauc = skm.auc(recall, precision)
    R.precision = precision
    R.recall = recall
    R.thresholds = thresholds
    idx = np.where(precision > 0.9)[0].min()
    R.smallest_precision_greater_than_90pct = precision[idx]
    R.recall_at_precision_90pct = recall[idx]
    R.threshold_at_precision_90pct = thresholds[idx]

    idx = np.where(precision > 0.5)[0].min()
    R.smallest_precision_greater_than_50pct = precision[idx]
    R.recall_at_precision_50pct = recall[idx]
    R.threshold_at_precision_50pct = thresholds[idx]

    R = R.__dict__
    R.update(kwargs)
    return R 
Boilerplate for configuring logger in python
 import logging
def setup_logger(file_path=None):
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
    )

    if all(not isinstance(e, logging.FileHandler)
           for e in logger.handlers):
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)
        ch.setFormatter(formatter)
        logger.addHandler(ch)
        logger.info(f"Initialized logger with StreamHandler")

    if file_path and all(not isinstance(e, logging.FileHandler)
                         for e in logger.handlers):
        fh = logging.FileHandler(file_path, 'a')
        fh.setLevel(logging.INFO)
        fh.setFormatter(formatter)
        logger.addHandler(fh)
        logger.info(f"Initialized logger with FileHandler({file_path})")

    return 
Java - Python/Numpy Fast Copy-Free Exchange
 /* Java */

        short a = 3; // 2
        long b = 5; // 8
        float c = (float)7.0; // 4
        ByteBuffer bb = ByteBuffer.allocate(14);
        bb.order(ByteOrder.LITTLE_ENDIAN);
        bb.putShort(a);
        pp(bb.position()); // 2
        bb.putLong(b);
        pp(bb.position()); // 10
        bb.putFloat(c);
        pp(bb.position()); // 14
        bb.position(0); // crucial.
        try(RandomAccessFile f = new RandomAccessFile("/tmp/tmp.dat", "rw");
            FileChannel fc = (f).getChannel();) {
                pp(bb.order().toString());
                pp(fc.write(bb));
        }


        ## Python

        from mmap import mmap, PROT_READ
        import os
        import numpy as np
        import sys
        assert sys.byteorder == 'little'
        fd = os.open('/tmp/tmp.dat', os.O_RDONLY)
        buf = mmap(fd, 14, prot=PROT_READ)
        # L = little endian.
        arr1 = np.frombuffer(buf, dtype=np.dtype('int16').newbyteorder('L'), count=1, offset=0)
        arr2 = np.frombuffer(buf, dtype=np.dtype('int64').newbyteorder('L'), count=1, offset=2)
        arr3 = np.frombuffer(buf, dtype=np.dtype('float32').newbyteorder('L'), count=1, offset=10)
        arr1, arr2, arr3 
Java - Simple printing function.
 static void pp(Object format, Object... args) {
        System.out.printf(format.toString(), args);
        System.out.println();
    } 
Json encode numpy objects
 class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        elif type(obj).__module__ == 'numpy':
            if type(obj).__name__.startswith('float'):
                return float(obj)
            elif type(obj).__name__.startswith('int'):
                return int(obj)
            else:
                return bool(obj)
        return json.JSONEncoder.default(self, obj) 
List busy EMR machines and the total reserved machines
 aws emr list-clusters --region us-east-1 --active  > /tmp/tmp1
jq '.Clusters|.[]|.Id' /tmp/tmp1 -r | xargs -n 1 -I % sh -c 'aws emr list-instances --cluster-id % --instance-states RUNNING --region us-east-1' > /tmp/tmp2
jq '.Instances | .[] | .InstanceType' /tmp/tmp2 -r | sort | uniq -c
aws ec2 describe-reserved-instances --region us-east-1 | jq '.ReservedInstances | .[] | [.InstanceType, .InstanceCount, .State]' -c -r | fgrep -v retired | sort 
Excel VBA functions for testing significance of binomial A/B tests. Two-sided Z-Test P-value from ratios and counts
 'Two-sided ZTest Pvalue from counts
Public Function CountsZTest(count1, nob1, count2, nob2)
    CountsZTest = RatioZTest(count1 * 1# / nob1, nob1, count2 * 1# / nob2, nob2)
End Function
Public Function RatioZTest(p1, nob1, p2, nob2)
    diff = p1 - p2
    p_pooled = (p1 * nob1 + p2 * nob2) * 1# / (nob1 + nob2)
    nobs_2xhm = 1# / nob1 + 1# / nob2
    var1 = p_pooled * (1 - p_pooled) * nobs_2xhm
    std_diff = Sqr(var1)
    RatioZTest = Application.WorksheetFunction.Norm_S_Dist(-Abs(diff / std_diff), True) * 2
End Function 

Technical Notes

Steps for adding a note

If you have a PDF then store it in the posts folder. Clone posts/pdftemplate.html and search-replace any mention of note1.pdf from the html. Finally, link that html file here. Otherwise copy posts/template.html and modify it.

Note 12 Offline evaluation and learning in bandits.
Note 11 - Sparsity in Deep Learning Layers(open in new page)
Note 10 Exploration Scavenging in comparison to other off-policy estimators.
Note 9 Using a PID Controller for controlling the number of servers in a data-center.[YC][Gist]
Note 8 Using kelly criterion for bankroll management.
Note 7 The foundations for finding MVUE via the use of Rao-Blackwellization.
Note 6 The VW FAQ.
Note 5 The Basics of ZeroMQ.
Note 4 A visual summary of the inequalities govening Entropy, Cross Entropy, Joint Entropy, KL Divergence, and Mutual Information including the Data Processing Inequality.
Note 3 [WIP] A visual proof of the UCB algorithm.
Note 2 A video tutorial about the difference between PnL and Cashflow, and how a company can have positive cash flow but still make loss, without raising debt. (you may need to download the video and play with VLC)
Note 1: Describes how the variance of an AB test can be reduced in the special case when we are comparing two policies with the same small-finite action space.
हिंदी विवरण

हम दो विधियों/treatments के बीच में कितना फर्क है ये पता करना चाहते है। साधारण तरीका होगा AB testing/ randomized control trials जिसमें की हम randomly/बेतरतीब तरिके से आधे लोगो को विधि A आवंटित करते है और आधे को विधि B प्रदान करते है । उसके बाद दोनों दल में औसत फर्क का अंतर हम पता करते है । ये सबसे आसान पद्धति हैं और मानलो की इस पद्धति को इस्तेमाल करने पर हमे 10000 लोगो पे परीक्षण करना पड़ेगा ताकि हम 10% का फर्क दोनो दलों के बीच मे पता कर पाए। जो pdf मैंने भेजी है वो एक विशेष स्थिति का विश्लेषण प्रशेष करती है जो की 25% कम sample इस्तेमाल करती है। ofcourse ये कोई नई तकनीक नही है सिर्फ मैने अपनी समझ के लिये लिखी है।

Trivia

how to add trivia

First compile a pdf, either in overleaf, or using latexmk, then add all the assets, the .tex and .pdf file to res/trivia folder. Then add the iframe with src, loading, width, height attributes.

Poses, quaternions, and the SE(3) Manifold(open in new page)
The interpretation of nu in nu-SVM(open in new page)
Sampling methods for Bayesian Statistics(open in new page)
Basics of Hamiltonian Dynamics(open in new page)
High Contrast ML(open in new page)
Lessons from the Open Pre-Trained Transformer Logbook(open in new page)
The optimization landscape(open in new page)
The distribution of many-to-one functions of random variables

The formula for change of distribution under one-to-one differentiable maps is well-known. If \( f \) maps \( x \) to \( y \) then \( p_Y(y) = p_X(f^{-1}(y)) abs(det( jac_{f^{-1}}( y )) \). However what happens in the more general case where let's say that the function is not invertible ? This can happen in situations like Z = X/Y or Z = X + Y etc. In such situations the most clean method is to compute the probability of the event underlying the CDF and then differentiating. Basically

\[ \begin{align}p_Z(z) &= \frac{d}{dz} \int_{\mathbf{x}} \mathbb{I}[f(\mathbf{x}) < z] \ p_{\mathbf{X}}(\mathbf{x}) d\mathbf{x} = \frac{d}{dz} \int_{\mathbb{I}[f(\mathbf{x}) < z]} \ p_{\mathbf{X}}(\mathbf{x}) d\mathbf{x} \end{align} \]

Now recall that \( \frac{d}{dx}\int_a^{u(x)} f(t) dt = u'(x) f(u(x)) \) therefore if we can write the acceptable region as a function of \( z \) then we are in business. For example, if \( z = x_2/x_1 \) then the acceptable region is \( \{x_1 > 0, x_2 < zx_1\} \cup \{x_1 < 0, x_2 > zx_1\} \) and the integral is \[ \int_0^\infty x_1 p(x_1, zx_1)dx_1 + \int_{-\infty}^0-x_1p(x_1, zx_1)dx_1 \]where the derivative has been interchanged assuming fubini's theorem applies in this case.

Calculus Theorems
  • MVT - a continuous and differentiable function has atleast one point between a, b that achieves the linear slope between these two points.
  • EVT - continuos functions on compact sets achieve extrema.
  • Intermediate Value T - If f is a continuous function then the image of an interval is also an interval.
  • Taylor's Expansion - This is the best polynomial approximation \( f(x) = f(a) + f'(a)(x - a) + \frac{f''(a)}{2!} (x - a)^2 + ... h_k(x) (x - a)^k \)with \( \lim_{x \to a}h(x) = 0 \) for analytic functions. Taylor's theorem also has a multivariate version.
  • Newton's method for root of f(x) = x - f(x)/f'(x) derived by taylor expansion. Newton's method for optimization therefore involves the inverse of the derivative of the gradient, i.e. the inverse of hessian.
  • Darboux's theorem - Every function that has an integral, i.e. this function resulted from the differentiation of another function, satisfied the IVT property even if it not continuous. See this page for example of a function with non-continuous derivative  and see this page for more exotic function
  • Clairaut's theorem for symmetry of second derivatives , i.e. when can we interchange differentiation ? The second partial derivatives need to exist and be continuous. 
  • Fubini's theorem, i.e. when can we interchange integration ? absolute integrability implies interchange is possible.
  • Leibniz Integral rule, i.e. when can we interchange differentiation and integration ? Let \( f(x,t) \) be a function such that both f and its partial derivative \( f_x \) are continuous and suppose that the limit functions are continuous and have continuous derivatives. Then 
    \[ \frac{d}{dx}\Big(\int_{a(x)}^{b(x)} f(x,t) dt\Big) = f(x,b(x)) b'(x) - f(x,a(x))a'(x) + \int_{a(x)}^{b(x)} \frac{\partial f(x,t)}{\partial x}dt \]
  • Inverse function theorem -- If f is continuously differentiable in nbhd of \( a \) and its derivative is nonzero at \( a \) then the function is invertible and the inverse is continuously differentiable and the derivative is \( 1/f'(f^{-1}(b)) \)
  • Implicit function theorem -- Given a system of m equations \( \{f_i(x_1, \ldots, x_n, y_1, \ldots, y_m) = 0 \mid i=1,\ldots,m\} \) satisfied at point \( (\bar{a},\bar{b}) \) under a mild condition on the partial derivateives with respect to \( y_i \) (that the jacobian of partial derivatives wrt to y is intertible) and the system itself is continuously differentiable in a nbhd of \( (\bar{a},\bar{b}) \) then the y variables are unique continuously differentiable functions of \( \{ x_j \} \) in some nbhd of the point. And infact this theorem also gives the formula for the jacobian of this unique function.
Difference between testing for non-stationarity tests vs testing for trend

Stationarity simply means that the n-th order joint pdfs are independent of time. Wide-sense stationarity (or loose sense stationarity) means that the mean is constant with time, and that the correlation between any two observations only varies as a function of time. Stationarity does not mean that the the correlation function (RX (T1, 𝛕= T2 - T1)) is zero or delta function. No! Wide-Sense Stationarity just means that the correlation function is invariant with time, only depends on the difference. Note that stationarity / non-stationarity describes a process, not the actual observed signal. The observed signals can still exhibit periodicity while being w-stationary. Now In an auto-regressive (AR) process, the current value of the signal is a linear function of past observations plus a noise term. This process can be non-stationary if the system has a so-called "unit-root". but AR processes may still be stationary without trend. OTOH Moving average processes (MA) are always defined as linear combinations of observations coming from some hidden iid error terms. So the MA process is always stationary. Dickey Fuller procedure tests whether a unit-root it present in an AR process. There are different versions of this test depending on exactly what type of AR process is assumed and even what is the alternative to the null hypothesis such as Augmented Dickey Fuller, KPSS. But none of these tests can check for whether a trend is present in the dataset because these "stationarity" tests can give false-positives for trend. If a *DF test or KPSS test says that the signal is stationary then rest assured we can say that no trend exists, but when these tests say that a trend exists, then it's possible that they are just detecting a unit-root random walk which obviously does not have actual trend.

Hessian with Backprop.

The delta method.

Variational characterization of the absolute value function.

The T distribution and its relation to sampling.

Hobby Software/Hardware Projects (Personal)

FlagPlanter / Timestamper App for deepfake prevention, Evidence Timestamping with working apk

The flag planter app help to claim priority over an idea without revealing the idea itself to the world.

Hobby Software/Hardware Projects (Others)

▷ Hosted Jsmol: an open-source Javascript viewer for chemical structures in 3D. link.

▷ Hosted JSME: an open-source Javascript Molecule Editor. link.

Diary

Mom

My mom's untimely death.

Dad

How we navigated my father's diagnosis for cardiac ischemia.