## Naive Bayes example
Let's explore implementation of Naive Bayes!

In [2]:
# create a simple function to tokenize messages into distinct words
from typing import Set
import re

def tokenize(text: str) -> Set[str]:
 text = text.lower() # Convert to lowercase,
 all_words = re.findall("[a-z0-9']+", text) # extract the words, and
 return set(all_words) # remove duplicates.

assert tokenize("Data Science is science") == {"data", "science", "is"}


In [6]:
# define a type for our training data
from typing import NamedTuple

class Message(NamedTuple):
 text: str
 is_spam: bool

In [12]:
# As our classifier needs to keep track of tokens, counts, and labels from the training data, we’ll make it a class.
from typing import List, Tuple, Dict, Iterable
import math
from collections import defaultdict

class NaiveBayesClassifier:
 def __init__(self, k: float = 0.5) -> None:
 self.k = k # smoothing factor

 self.tokens: Set[str] = set()
 self.token_spam_counts: Dict[str, int] = defaultdict(int)
 self.token_ham_counts: Dict[str, int] = defaultdict(int) # we refer to nonspam emails as ham emails
 self.spam_messages = self.ham_messages = 0

 # Next, we’ll give it a method to train it on a bunch of messages
 def train(self, messages: Iterable[Message]) -> None:
 for message in messages:
 # Increment message counts
 if message.is_spam:
 self.spam_messages += 1
 else:
 self.ham_messages += 1

 # Increment word counts
 for token in tokenize(message.text):
 self.tokens.add(token)
 if message.is_spam:
 self.token_spam_counts[token] += 1
 else:
 self.token_ham_counts[token] += 1
 
 # Ultimately we’ll want to predict P(spam | token). 
 #As we saw earlier, to apply Bayes’s theorem we need to know P(token | spam) and P(token | ham) 
 #for each token in the vocabulary. So we’ll create a “private” helper function to compute those:
 
 def _probabilities(self, token: str) -> Tuple[float, float]:
 """returns P(token | spam) and P(token | ham)"""
 spam = self.token_spam_counts[token]
 ham = self.token_ham_counts[token]

 p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
 p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)

 return p_token_spam, p_token_ham 
 
 # finally we have the predict function
 def predict(self, text: str) -> float:
 text_tokens = tokenize(text)
 log_prob_if_spam = log_prob_if_ham = 0.0

 # Iterate through each word in our vocabulary
 for token in self.tokens:
 prob_if_spam, prob_if_ham = self._probabilities(token)

 # If *token* appears in the message,
 # add the log probability of seeing it
 if token in text_tokens:
 log_prob_if_spam += math.log(prob_if_spam)
 log_prob_if_ham += math.log(prob_if_ham)

 # Otherwise add the log probability of _not_ seeing it,
 # which is log(1 - probability of seeing it)
 else:
 log_prob_if_spam += math.log(1.0 - prob_if_spam)
 log_prob_if_ham += math.log(1.0 - prob_if_ham)

 prob_if_spam = math.exp(log_prob_if_spam)
 prob_if_ham = math.exp(log_prob_if_ham)
 return prob_if_spam / (prob_if_spam + prob_if_ham)

## Testing Our Model
Let’s make sure our model works by writing some unit tests for it.

In [13]:
messages = [Message("spam rules", is_spam=True),
 Message("ham rules", is_spam=False),
 Message("hello ham", is_spam=False)]

model = NaiveBayesClassifier(k=0.5)
model.train(messages)

### First, let’s check that it got the counts right:

In [14]:
assert model.tokens == {"spam", "ham", "rules", "hello"}
assert model.spam_messages == 1
assert model.ham_messages == 2
assert model.token_spam_counts == {"spam": 1, "rules": 1}
assert model.token_ham_counts == {"ham": 2, "rules": 1, "hello": 1}

### Now let’s make a prediction. 
We’ll also (laboriously) go through our Naive Bayes logic by hand, and make sure that we get the same result

In [17]:
text = "hello spam"

probs_if_spam = [
 (1 + 0.5) / (1 + 2 * 0.5), # "spam" (present in the text)
 1 - (0 + 0.5) / (1 + 2 * 0.5), # "ham" (not present in the text)
 1 - (1 + 0.5) / (1 + 2 * 0.5), # "rules" (not present in the text)
 (0 + 0.5) / (1 + 2 * 0.5) # "hello" (present in the text)
]

probs_if_ham = [
 (0 + 0.5) / (2 + 2 * 0.5), # "spam" (present in the text)
 1 - (2 + 0.5) / (2 + 2 * 0.5), # "ham" (not present in the text)
 1 - (1 + 0.5) / (2 + 2 * 0.5), # "rules" (not present in the text)
 (1 + 0.5) / (2 + 2 * 0.5), # "hello" (present in the text)
]

p_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
p_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))


# Should be about 0.83
assert model.predict(text) - p_if_spam / (p_if_spam + p_if_ham) <= 0.0001 
# If this test passes, it seems like our model is doing what we think it is.

## Using Our Model
A popular (if somewhat old) dataset is the SpamAssassin public corpus. We’ll look at the files prefixed with 20021010.
Here is a script that will download and unpack them to the directory of your choice (or you can do it manually):

In [18]:
from io import BytesIO # So we can treat bytes as a file.
import requests # To download the files, which
import tarfile # are in .tar.bz format.

BASE_URL = "https://spamassassin.apache.org/old/publiccorpus"
FILES = ["20021010_easy_ham.tar.bz2",
 "20021010_hard_ham.tar.bz2",
 "20021010_spam.tar.bz2"]

# This is where the data will end up,
# in /spam, /easy_ham, and /hard_ham subdirectories.
# Change this to where you want the data.
OUTPUT_DIR = 'spam_data'

for filename in FILES:
 # Use requests to get the file contents at each URL.
 content = requests.get(f"{BASE_URL}/{filename}").content

 # Wrap the in-memory bytes so we can use them as a "file."
 fin = BytesIO(content)

 # And extract all the files to the specified output dir.
 with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
 tf.extractall(OUTPUT_DIR)

### After downloading the data you should have three folders: spam, easy_ham, and hard_ham. Check it by yourself
To keep things really simple, we’ll just look at the subject lines of each email.

### How do we identify the subject line? 

In [19]:
import glob, re

# modify the path to wherever you've put the files
path = 'spam_data/*/*'

data: List[Message] = []

# glob.glob returns every filename that matches the wildcarded path
for filename in glob.glob(path):
 is_spam = "ham" not in filename

 # There are some garbage characters in the emails; the errors='ignore'
 # skips them instead of raising an exception.
 with open(filename, errors='ignore') as email_file:
 for line in email_file:
 if line.startswith("Subject:"):
 subject = line.lstrip("Subject: ")
 data.append(Message(subject, is_spam))
 break # done with this file

### Now we can split the data into training data and test data, and then we’re ready to build a classifier:

In [21]:
import random
from typing import TypeVar, List, Tuple
X = TypeVar('X') # generic type to represent a data point

def split_data(data: List[X], prob: float) -> Tuple[List[X], List[X]]:
 """Split data into fractions [prob, 1 - prob]"""
 data = data[:] # Make a shallow copy
 random.shuffle(data) # because shuffle modifies the list.
 cut = int(len(data) * prob) # Use prob to find a cutoff
 return data[:cut], data[cut:] # and split the shuffled list there.

random.seed(0) # just so you get the same answers as me
train_messages, test_messages = split_data(data, 0.75)

model = NaiveBayesClassifier()
model.train(train_messages)

### Let’s generate some predictions and check how our model does

In [22]:
from collections import Counter

predictions = [(message, model.predict(message.text))
 for message in test_messages]

# Assume that spam_probability > 0.5 corresponds to spam prediction
# and count the combinations of (actual is_spam, predicted is_spam)
confusion_matrix = Counter((message.is_spam, spam_probability > 0.5)
 for message, spam_probability in predictions)

print(confusion_matrix)



Counter({(False, False): 669, (True, True): 86, (True, False): 40, (False, True): 30})


In [26]:
recall = confusion_matrix[(True,True)] / (confusion_matrix[(True,True)] + confusion_matrix[(True,False)])
print(recall)

precision = confusion_matrix[(True,True)] / (confusion_matrix[(True,True)] + confusion_matrix[(False,True)])
print(precision)

0.6825396825396826
0.7413793103448276


### We can also inspect the model’s innards to see which words are least and most indicative of spam:

In [27]:
def p_spam_given_token(token: str, model: NaiveBayesClassifier) -> float:
 # We probably shouldn't call private methods, but it's for a good cause.
 prob_if_spam, prob_if_ham = model._probabilities(token)

 return prob_if_spam / (prob_if_spam + prob_if_ham)

words = sorted(model.tokens, key=lambda t: p_spam_given_token(t, model))

print("spammiest_words", words[-10:])
print("hammiest_words", words[:10])

spammiest_words ['assistance', '95', 'attn', 'clearance', 'per', 'money', 'sale', 'rates', 'systemworks', 'adv']
hammiest_words ['spambayes', 'users', 'razor', 'zzzzteana', 'sadev', 'apt', 'perl', 'ouch', 'spamassassin', 'bliss']


## How could we get better performance? 
Get more data? Or other ideas? 