header png

Links

profile for Pushpendre on Stack Exchange

github

Linkedin Logo

Edit this page

See deployment status

how to edit this page

make changes to index.html and validate them using "tidy --warn-proprietary-attributes no -e -q index.html", this command is also added as a pre-commit hook.

Pushpendre Rastogi

pushpendre at gmail

Introduction

I joined Amazon Prime Research to work on Plan Recommendation and Content Optimization in April 2020. I joined the Dialog State Tracking in Amazon Alexa in April 2019. I completed my Ph.D. in Computer Science at The Center For Language and Speech Processing, Johns Hopkins University. My advisor was Benjamin Van Durme. I TA'd graduate courses on representation learning and machine learning for three semesters during my Phd studies, and I received the George Sommerman Graduate Teaching Assistant Award with a cash award of $1000. I have reviewed for Transactions On Signal Processing-19, NEURIPS-19, ICML-19, ICLR-19, EMNLP-19, ACL-19, TPAMI-18, NeurIPS-18, KG4IR-18, EMNLP-18, ACL-18.

Selected Publications

See my google scholar profile for a complete list of publications.

Education

Ph.D. and M.S. in Computer ScienceJohns Hopkins University2013-193.75/4.0
Thesis Topic: Representation Learning for Words and Entities. I presented new methods for unsupervised learning of word and entity embeddings from texts and knowledge bases.
Courses and Grades: Natural Language Processing (A), Machine Learning in Complex Domains (A), Stochastic Search & Optimization (B), Parallel Programming (A-), Principles of Programming Languages (A-), Combinatorial Optimization (A+), Introduction to Convexity (A-)
M.Tech. in Information and Communication TechnologyIIT Delhi2010-118.77/10
B.Tech. in Electrical Engg.IIT Delhi2006-108.86/10

Code Snippets

Interactive pool for running jobs on the side in python.
 class InteractivePool:
    def __init__(self, J):
        import time
        self.J = J
        self.tic = time.time()
        
    def done(self):
        return sum(1 for j in self.J if j.done()), len(self.J)
    
    def collect(self,R=None):
        R = R or {}
        print(len(R))
        for i, j in enumerate(self.J):
            if i not in R and j.done() and not j.cancelled():
                R[i] = j.result()
        print(len(R))
        return R
    
    def time(self):
        import datetime, time as time_module
        return str(datetime.timedelta(seconds=time_module.time() - self.tic))
    
    def wait(self, interval=600):                                
        import time                                              
        while True:                                              
            time.sleep(interval)                                 
            if sum(1 for j in self.J if j.done()) == len(self.J):
                break                                            
        return                                                   

from concurrent.futures import ProcessPoolExecutor    
sidejob = ProcessPoolExecutor(max_workers=4).submit
P = InteractivePool([sidejob(f, i) for i in range(80)]) 
Save spark dataframe to sparse scipy arrays
 from functools import partial
import pyspark.ml as pm
from typing import * 
from scipy.sparse import csr_matrix, vstack, lil_matrix, load_npz, save_npz
from pyspark import TaskContext
from tempfile import TemporaryDirectory
from glob import glob

def sparseVectorList_to_CSRMatrix(X: List[pm.linalg.SparseVector]) -> csr_matrix:
     """ Convert list of pyspark sparse vectors to a scipy CSR matrix that
     a standard sklearn function/lightgbm can consume.
     """
     M = lil_matrix((len(X), X[0].size), dtype=np.float)
     for i, x in enumerate(X):
         I = np.argsort(x.indices)
         M.rows[i] = x.indices[I]
         M.data[i] = x.values[I]
     return M.tocsr(copy=False)

 class RowToPredict(NamedTuple):
     "This class was created just to facilitate linting and type hinting."
     customer_id: str
     features: pm.linalg.SparseVector

def save_features_in_spark_as_sparsescipy_to_hdfs(
     hdfs_dir,
     row_gen: Iterable[RowToPredict]):
     C, Flist = [], []
     for e in row_gen:
         Flist.append(e.features)
         C.append(e.customer_id)
     F = sparseVectorList_to_CSRMatrix(Flist)
     pid = TaskContext().partitionId()
     # Ideally I will upload file directly to HDFS, but I don't know how to directly
     # write to HDFS. hdfscli didn't work for me. So work-around is to save to local
     # file on task node, then upload to HDFS with a subprocess call.
     with TemporaryDirectory() as tmpdirname:
         print('created temporary directory', tmpdirname)
         fname = f'{tmpdirname}/F.{pid}.npz'
         cname = f'{tmpdirname}/C.{pid}.pkl'
         with open(fname) as fh:
             save_npz(fh, F)
         with open(cname) as fh:
             pickle.dump(C, fh)
         subprocess.getstatusoutput('hadoop fs -put {fname} {cname} {hdfs_dir}')
     return 

# make sure that each partition has a reasonable number of rows so that we don't OOM.
npart = sdf.count() // 10000 
sdf.rdd.repartition(npart).foreachPartition(partial(
    save_features_in_spark_as_sparsescipy_to_hdfs, '/data/')

# After saving all the parts to hdfs, download the parts, and open them on master node.
subprocess.getstatusoutput('hadoop -copyToLocal /data/ /home/hadoop/')
L = glob('data/*.npz')
F = vstack([load_npz(e) for e in L])
C = [c for e in L for c in pickle.load(open(e))] 
How to hash check pip files
 1. Install virtualenv                                         
1. Create workspace, download package.                        
1. Go where the requirements file is.                         
1. Create fresh empty environment and activate it             
1. install all requirements                                   
1. generate hashes for all installed packages                 
1. close the shell and create new one                         
1. Create fresh empty environment and activate it             
1. check that the new requirements file can be installed with 

pip install virtualenv  pip-tools
python3 -m venv env;  source env/bin/activate                                                                                                                                                        
 pip list > before; pip install -r requirements.txt; pip list > after                                                                                                                          
 pip-compile requirements.txt --generate-hashes # this overwrites the original file.                                                                                                           
 exit; bash                                                                                                                                                                                           
 python3 -m venv env2; source env2/bin/activate                                                                                                                                                       
 pip install --require-hashes -r requirements.txt
hdfs file system functionality exposed to python
 def hdfs_exists(path, flag='-e'):
    code, output = subprocess.getstatusoutput(f'hadoop fs -test {flag} {path}')
    if code != 0:
        print(output)
        return False
    else:
        return True

def copyFromLocal(src, dst):
    return subprocess.getstatusoutput(f'hadoop fs -copyFromLocal {src} {dst}')

def copyToLocal(src, dst):
    return subprocess.getstatusoutput(f'hadoop fs -copyToLocal {src} {dst}') 
Spark Setup
 def setup(RUNDATE='2020-07-16', spark_setting_file=None):
    """ Construct spark session, setup logger, and read YAML files
    from prime-ml-repo in production EMR clusters. After reading yaml files
    format the paths with dates.

    RUNDATE is a date string like this '2020-05-30'
    """
    assert re.match('\d{4}-\d{2}-\d{2}', RUNDATE)
    if spark_setting_file is None:
        spark_setting_file = StringIO("""scoring:
     spark.executor.memory: '20G'
     spark.executor.memoryOverhead: '4G'
     spark.executor.cores: 4
     spark.task.cpus: 1
     spark.yarn.am.memory: '2G'
     spark.serializer: 'org.apache.spark.serializer.KryoSerializer'
     spark.driver.maxResultSize: 0
     spark.executor.extraJavaOptions: '-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'
     spark.kryoserializer.buffer.max: '512M'
     spark.cleaner.periodicGC.interval: '5min'
     spark.network.timeout: '600s'""")
    # Use logger to log everything to file and also to stderr.
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
    )
    fh = logging.FileHandler("/home/hadoop/scoring_log_file.log")
    fh.setLevel(logging.INFO)
    fh.setFormatter(formatter)
    logger.addHandler(fh)
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    logger.info(f"Initialize job parameters. {RUNDATE}")

    parameters = {}

    logger.info("Initialize spark settings and spark session.")
    spark = SparkSession.builder.appName("claire")
    for key, value in yaml.safe_load(spark_setting_file)["scoring"].items():
        logger.info(f'spark: {key}={value}')
        spark = spark.config(key, value)
    spark = spark.enableHiveSupport().getOrCreate()
    spark.sparkContext.setLogLevel(os.environ.get("SPARK_LOG_LEVEL", "DEBUG")) 
    try:
        spark.sparkContext.setCheckpointDir("hdfs:///checkpoint/spark/")
    except Exception as exception:
        warnings.warn("Unable to set spark checkpoint directory !")
    return spark, parameters, logger 
Common model inspections on binary classification test set
 def tabulate(label, proba, **kwargs):
    """ Compute common statistics on a binary classification problem
    given the true labels and the class probabilities.
    """
    assert proba.shape[1] == 2
    assert len(label) == len(proba)
    R = SimpleNamespace()
    R.accuracy = (label == proba.argmax(1)).mean()
    R.majority_rule_accuracy = max(1 - label.mean(), label.mean())
    R.log_loss = -np.log(np.select([label==0, label==1],
                                   [proba[:, 0], proba[:, 1]])).mean()
    fpr, tpr, thresholds = skm.roc_curve(label, proba[:, 1])
    R.roc_auc = skm.auc(fpr, tpr)
    try:
        idx = np.where(fpr < 0.05)[0].max()
        R.tp_at_fp_less_than_5_percent = tpr[idx]
        R.fp_at_fp_less_than_5_percent = fpr[idx]
        R.threshold_at_fp_less_than_5_percent = thresholds[idx]
    except ValueError as e:
        print(e)
        pass
    precision, recall, thresholds = skm.precision_recall_curve(label, proba[:, 1])
    R.prauc = skm.auc(recall, precision)
    R.precision = precision
    R.recall = recall
    R.thresholds = thresholds
    idx = np.where(precision > 0.9)[0].min()
    R.smallest_precision_greater_than_90pct = precision[idx]
    R.recall_at_precision_90pct = recall[idx]
    R.threshold_at_precision_90pct = thresholds[idx]

    idx = np.where(precision > 0.5)[0].min()
    R.smallest_precision_greater_than_50pct = precision[idx]
    R.recall_at_precision_50pct = recall[idx]
    R.threshold_at_precision_50pct = thresholds[idx]

    R = R.__dict__
    R.update(kwargs)
    return R 
Boilerplate for configuring logger in python
 import logging
def setup_logger(file_path=None):
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
    )

    if all(not isinstance(e, logging.FileHandler) 
           for e in logger.handlers):
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)
        ch.setFormatter(formatter)
        logger.addHandler(ch)
        logger.info(f"Initialized logger with StreamHandler")
    
    if file_path and all(not isinstance(e, logging.FileHandler)
                         for e in logger.handlers):
        fh = logging.FileHandler(file_path, 'a')
        fh.setLevel(logging.INFO)
        fh.setFormatter(formatter)
        logger.addHandler(fh)
        logger.info(f"Initialized logger with FileHandler({file_path})")

    return 
Java - Python/Numpy Fast Copy-Free Exchange
 /* Java */

        short a = 3; // 2
        long b = 5; // 8
        float c = (float)7.0; // 4
        ByteBuffer bb = ByteBuffer.allocate(14);
        bb.order(ByteOrder.LITTLE_ENDIAN);
        bb.putShort(a);
        pp(bb.position()); // 2
        bb.putLong(b);
        pp(bb.position()); // 10
        bb.putFloat(c);
        pp(bb.position()); // 14
        bb.position(0); // crucial.
        try(RandomAccessFile f = new RandomAccessFile("/tmp/tmp.dat", "rw");
            FileChannel fc = (f).getChannel();) {
                pp(bb.order().toString());
                pp(fc.write(bb));
        }


        ## Python

        from mmap import mmap, PROT_READ
        import os
        import numpy as np
        import sys
        assert sys.byteorder == 'little'
        fd = os.open('/tmp/tmp.dat', os.O_RDONLY)
        buf = mmap(fd, 14, prot=PROT_READ)
        # L = little endian.
        arr1 = np.frombuffer(buf, dtype=np.dtype('int16').newbyteorder('L'), count=1, offset=0)
        arr2 = np.frombuffer(buf, dtype=np.dtype('int64').newbyteorder('L'), count=1, offset=2)
        arr3 = np.frombuffer(buf, dtype=np.dtype('float32').newbyteorder('L'), count=1, offset=10)
        arr1, arr2, arr3 
Java - Simple printing function.
 static void pp(Object format, Object... args) {
        System.out.printf(format.toString(), args);
        System.out.println();
    } 
Json encode numpy objects
 class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        elif type(obj).__module__ == 'numpy':
            if type(obj).__name__.startswith('float'):
                return float(obj)
            elif type(obj).__name__.startswith('int'):
                return int(obj)
            else:
                return bool(obj)
        return json.JSONEncoder.default(self, obj) 
List busy EMR machines and the total reserved machines
 aws emr list-clusters --region us-east-1 --active  > /tmp/tmp1
jq '.Clusters|.[]|.Id' /tmp/tmp1 -r | xargs -n 1 -I % sh -c 'aws emr list-instances --cluster-id % --instance-states RUNNING --region us-east-1' > /tmp/tmp2
jq '.Instances | .[] | .InstanceType' /tmp/tmp2 -r | sort | uniq -c
aws ec2 describe-reserved-instances --region us-east-1 | jq '.ReservedInstances | .[] | [.InstanceType, .InstanceCount, .State]' -c -r | fgrep -v retired | sort 
Excel VBA functions for testing significance of binomial A/B tests. Two-sided ZTest Pvalue from ratios and counts
 'Two-sided ZTest Pvalue from counts
Public Function CountsZTest(count1, nob1, count2, nob2)
    CountsZTest = RatioZTest(count1 * 1# / nob1, nob1, count2 * 1# / nob2, nob2)
End Function
Public Function RatioZTest(p1, nob1, p2, nob2)
    diff = p1 - p2
    p_pooled = (p1 * nob1 + p2 * nob2) * 1# / (nob1 + nob2)
    nobs_2xhm = 1# / nob1 + 1# / nob2
    var1 = p_pooled * (1 - p_pooled) * nobs_2xhm
    std_diff = Sqr(var1)
    RatioZTest = Application.WorksheetFunction.Norm_S_Dist(-Abs(diff / std_diff), True) * 2
End Function 

Technical Notes

Steps for adding a note

Upload a PDF, say note5.pdf to res folder. Clone res/test.html and search-replace any mention of note1.pdf from the html. Finally, link that html file here.

NOTE 6The VW FAQ.
Note 5 The Basics of ZeroMQ.
Note 4 A visual summary of the inequalities govening Entropy, Cross Entropy, Joint Entropy, KL Divergence, and Mutual Information including the Data Processing Inequality.
Note 3 [WIP] A visual proof of the UCB algorithm.
Note 2 A video tutorial about the difference between PnL and Cashflow, and how a company can have positive cash flow but still make loss, without raising debt. (you may need to download the video and play with VLC)
Note 1: Describes how the variance of an AB test can be reduced in the special case when we are comparing two policies with the same small-finite action space.
हिंदी विवरण

हम दो विधियों/treatments के बीच में कितना फर्क है ये पता करना चाहते है। साधारण तरीका होगा AB testing/ randomized control trials जिसमें की हम randomly/बेतरतीब तरिके से आधे लोगो को विधि A आवंटित करते है और आधे को विधि B प्रदान करते है । उसके बाद दोनों दल में औसत फर्क का अंतर हम पता करते है । ये सबसे आसान पद्धति हैं और मानलो की इस पद्धति को इस्तेमाल करने पर हमे 10000 लोगो पे परीक्षण करना पड़ेगा ताकि हम 10% का फर्क दोनो दलों के बीच मे पता कर पाए। जो pdf मैंने भेजी है वो एक विशेष स्थिति का विश्लेषण प्रशेष करती है जो की 25% कम sample इस्तेमाल करती है। ofcourse ये कोई नई तकनीक नही है सिर्फ मैने अपनी समझ के लिये लिखी है।

Trivia

how to add trivia

First compile a pdf, either in overleaf, or using latexmk, then add all the assets, the .tex and .pdf file to res/trivia folder. Then add the iframe with src, loading, width, height attributes.

Hessian with Backprop.

The delta method.

Variational characterization of the absolute value function.

The T distribution and its relation to sampling.