Source code for indictrans.trunk.perceptron

#! /usr/bin/env python
# encoding: utf-8

from __future__ import division, print_function, absolute_import

import sys

import numpy as np
from six.moves import xrange
from scipy.sparse import csc_matrix, issparse

from indictrans._decode import DECODERS
from indictrans._utils import count_tranxn, sparse_add


[docs]class StructuredPerceptron(): """Structured perceptron for sequence classification. The implemention is based on average structured perceptron algorithm of M. Collins. Parameters ---------- lr_exp : float, default: 0.1 The Exponent used for inverse scaling of learning rate. Given iteration number t, the effective learning rate is ``1. / (t ** lr_exp)`` n_iter : int, default: 15 Maximum number of epochs of the structured perceptron algorithm random_state : int, RandomState instance or None, optional (default=None) If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by ``np.random``. verbose : int, default: 0 (quiet mode) Verbosity mode. References ---------- M. Collins (2002). Discriminative training methods for hidden Markov models: Theory and experiments with perceptron algorithms. EMNLP. """ def __init__(self, lr_exp=0.1, n_iter=15, random_state=None, verbose=0): self.lr_exp = lr_exp self.n_iter = n_iter self.verbose = verbose self.random_state = random_state def _get_random_state(self): if self.random_state is None: random_state = np.random.mtrand._rand elif isinstance(self.random_state, int): random_state = np.random.RandomState(self.random_state) elif isinstance(self.random_state, np.random.mtrand._rand): random_state = self.random_state else: raise TypeError( "Type {0} not supported.".format(type(self.random_state))) return random_state
[docs] def fit(self, X, y): """Fit the model to the given set of sequences. Parameters ---------- X : {array-like, sparse matrix}, shape (n_sequences, sequence_length, n_features) Feature matrix of train sequences. y : list of arrays, shape (n_sequences, sequence_length) Target labels. Returns ------- self : object Returns self. """ X = np.atleast_2d(X) decoder = DECODERS['viterbi'] classes = np.unique(np.hstack(y)) n_classes = len(set(classes)) class_range = np.arange(n_classes) class_id = dict(zip(set(classes), xrange(n_classes))) id_class = dict(zip(xrange(n_classes), set(classes))) y = np.array([np.array([class_id[it] for it in t]) for t in y]) Y_true = np.array( [np.array(t).reshape(-1, 1) == class_range for t in y]) n_features = X[0][0].shape[1] w = np.zeros((n_classes, n_features), order='F') # first order transition probabilities b_init = np.zeros(n_classes) b_final = np.zeros(n_classes) b_trans = np.zeros((n_classes, n_classes)) w_avg = np.zeros_like(w) b_init_avg = np.zeros_like(b_init) b_final_avg = np.zeros_like(b_final) b_trans_avg = np.zeros_like(b_trans) sequence_ids = np.arange(X.shape[1]) rnd = self._get_random_state() avg_count = 1.0 lr_exp = self.lr_exp for it in xrange(1, self.n_iter + 1): lr = 1.0 / (it ** lr_exp) if self.verbose >= 1: print("Iteration {0}".format(it), end=" ... \n") sys.stdout.flush() rnd.shuffle(sequence_ids) sum_loss = 0 for id_i, i in enumerate(sequence_ids): X_i = X[0][i] if issparse(X_i): score = X_i * w.T else: score = np.dot(X_i, w.T) y_pred = decoder.decode(score, b_trans, b_init, b_final) y_t_i = y[i] loss = (y_pred != y_t_i).sum() # number of wrong predictions if self.verbose >= 3 and not id_i: comp = ' '.join(['-'.join(st) for st in zip( map(str, y_pred), map(str, y_t_i))]) print("First sequence comparision: " "{0} ... loss: {1}".format(comp, loss)) if loss: sum_loss += loss Y_t_i = Y_true[i] Y_pred = y_pred.reshape(-1, 1) == class_range Y_pred = Y_pred.astype(np.float64) Y_diff = csc_matrix(Y_pred - Y_t_i) Y_diff *= -lr w_update = Y_diff.T * X_i t_trans = count_tranxn(y_t_i, n_classes) p_trans = count_tranxn(y_pred, n_classes) b_trans_update = lr * (p_trans - t_trans) b_init_update = lr * (Y_pred[0] - Y_true[i][0]) b_final_update = lr * (Y_pred[-1] - Y_true[i][-1]) if isinstance(w_update, np.ndarray): w += w_update else: sparse_add(w, w_update) b_trans -= b_trans_update b_init -= b_init_update b_final -= b_final_update w_update *= avg_count b_trans_update *= avg_count b_init_update *= avg_count b_final_update *= avg_count if isinstance(w_update, np.ndarray): w_avg += w_update else: sparse_add(w_avg, w_update) b_trans_avg -= b_trans_update b_init_avg -= b_init_update b_final_avg -= b_final_update if self.verbose >= 2: print("Train-set error = {0:.4f}".format(sum_loss / len(X[0]))) sys.stdout.flush() avg_count += 1. w -= w_avg / avg_count b_init -= b_init_avg / avg_count b_trans -= b_trans_avg / avg_count b_final -= b_final_avg / avg_count self.coef_ = csc_matrix(w) self.intercept_init_ = b_init self.intercept_trans_ = b_trans self.intercept_final_ = b_final self.classes_ = id_class return self
[docs] def predict(self, X): """Predict output sequences for input sequences in X. Parameters ---------- X : {array-like, sparse matrix}, shape (n_sequences, sequence_length, n_features) Feature matrix of test sequences. Returns ------- y : array, shape (n_sequences, sequence_length) Labels per sequence in X. """ y = [] decoder = DECODERS['viterbi'] if not isinstance(X, list): X = [X] for x in X: if issparse(x): scores = x.dot(self.coef_.T).toarray() else: scores = self.coef_.dot(x.T).T y_ = decoder.decode(scores, self.intercept_trans_, self.intercept_init_, self.intercept_final_) y.append([self.classes_[pred] for pred in y_]) return y