import torch
import torch.nn as nn
import torch.nn.functional as F
[docs]
class Embedder(nn.Module):
def __init__(self,
dims,
infeatures_dim=0,
num_rels=20,
num_bases=None,
conv_output=True,
self_loop=True,
verbose=False):
"""
This is an exemple RGCN for unsupervised learning, going from one element of "dims" to the other
It maps the "features" of an input graph to an "h" node attribute and returns the corresponding tensor.
:param dims: The succesive dimensions of the embeddings, should be an iterable or an int
:param infeatures_dim: The dimension of the input features
:param num_rels: The number of relations that are to be found in the graphs. Defaults to the 20 base pair types
:param num_bases: This is to use the basis sharing trick used in RGCN in general
:param conv_output: Whether to use a convolution at the end of the embedding or simply a linear layer
:param self_loop: Whether each node is also connected to itself
:param verbose: blah
"""
super(Embedder, self).__init__()
self.dims = dims
self.use_node_features = (infeatures_dim != 0)
self.in_dim = 1 if infeatures_dim == 0 else infeatures_dim
self.conv_output = conv_output
self.num_rels = num_rels
self.num_bases = num_bases
self.self_loop = self_loop
self.verbose = verbose
self.layers = self.build_model()
if self.verbose:
print(self.layers)
print("Num rels: ", self.num_rels)
[docs]
def build_model(self):
layers = nn.ModuleList()
short = self.dims[:-1]
last_hidden, last = self.dims[-2:]
if self.verbose:
print("short, ", short)
print("last_hidden, last ", last_hidden, last)
# input feature is just node degree
i2h = RelGraphConv(in_feat=self.in_dim,
out_feat=self.dims[0],
num_rels=self.num_rels,
num_bases=self.num_bases,
activation=F.relu,
self_loop=self.self_loop)
layers.append(i2h)
for dim_in, dim_out in zip(short, short[1:]):
h2h = RelGraphConv(in_feat=dim_in,
out_feat=dim_out,
num_rels=self.num_rels,
num_bases=self.num_bases,
activation=F.relu,
self_loop=self.self_loop)
layers.append(h2h)
# hidden to output
if self.conv_output:
h2o = RelGraphConv(in_feat=last_hidden,
out_feat=last,
num_rels=self.num_rels,
num_bases=self.num_bases,
self_loop=self.self_loop,
activation=None)
else:
h2o = nn.Linear(last_hidden, last)
layers.append(h2o)
return layers
@property
def current_device(self):
"""
:return: current device this model is on
"""
return next(self.parameters()).device
[docs]
def forward(self, g):
if self.use_node_features:
h = g.ndata['nt_features']
else:
# h = g.in_degrees().view(-1, 1).float().to(self.current_device)
h = torch.ones(len(g.nodes())).view(-1, 1).to(self.current_device)
for i, layer in enumerate(self.layers):
if not self.conv_output and (i == len(self.layers) - 1):
h = layer(h)
else:
h = layer(g=g, feat=h, etypes=g.edata['edge_type'])
g.ndata['h'] = h
return g.ndata['h']
[docs]
class Classifier(nn.Module):
def __init__(self,
embedder,
classif_dims=None,
num_rels=20,
num_bases=None,
conv_output=True,
self_loop=True,
verbose=False):
"""
This is an exemple RGCN for supervised learning, that uses the previous Embedder network
:param embedder: An embedder network as defined above
:param classif_dims: An iterable of the successive embedding dimensions, similarly to the dims of the Embedder
:param num_rels: The number of relations that are to be found in the graphs. Defaults to the 20 base pair types
:param num_bases: This is to use the basis sharing trick used in RGCN in general
:param conv_output: Whether to use a convolution at the end of the embedding or simply a linear layer
:param self_loop: Whether each node is also connected to itself
:param verbose: blah
"""
super(Classifier, self).__init__()
self.num_rels = num_rels
self.num_bases = num_bases
self.self_loop = self_loop
self.conv_output = conv_output
self.embedder = embedder
self.last_dim_embedder = embedder.dims[-1]
self.classif_dims = classif_dims
self.classif_layers = self.build_model()
self.verbose = verbose
if self.verbose:
print(self.layers)
print("Num rels: ", self.num_rels)
[docs]
def build_model(self):
if self.classif_dims is None:
return []
classif_layers = nn.ModuleList()
# Just one convolution
if len(self.classif_dims) == 1:
if self.conv_output:
h2o = RelGraphConv(in_feat=self.last_dim_embedder,
out_feat=self.classif_dims[0],
num_rels=self.num_rels,
num_bases=self.num_bases,
self_loop=self.self_loop,
# Old fix for a bug in dgl<0.6
# self_loop=self.self_loop and self.classif_dims[0] > 1,
activation=None)
else:
h2o = nn.Linear(self.last_dim_embedder, self.classif_dims[0])
classif_layers.append(h2o)
return classif_layers
# The supervised is more than one layer
else:
i2h = RelGraphConv(in_feat=self.last_dim_embedder,
out_feat=self.classif_dims[0],
num_rels=self.num_rels,
num_bases=self.num_bases,
activation=F.relu,
self_loop=self.self_loop)
classif_layers.append(i2h)
last_hidden, last = self.classif_dims[-2:]
short = self.classif_dims[:-1]
for dim_in, dim_out in zip(short, short[1:]):
h2h = RelGraphConv(in_feat=dim_in,
out_feat=dim_out,
num_rels=self.num_rels,
num_bases=self.num_bases,
activation=F.relu,
self_loop=self.self_loop)
classif_layers.append(h2h)
# hidden to output
if self.conv_output:
h2o = RelGraphConv(in_feat=last_hidden,
out_feat=last,
num_rels=self.num_rels,
num_bases=self.num_bases,
self_loop=self.self_loop,
activation=None)
else:
h2o = nn.Linear(last_hidden, last)
classif_layers.append(h2o)
return classif_layers
@property
def current_device(self):
"""
:return: current device this model is on
"""
return next(self.parameters()).device
[docs]
def forward(self, g):
h = self.embedder(g)
for i, layer in enumerate(self.classif_layers):
# if this is the last layer and we want to use a linear layer, the call is different
if (i == len(self.classif_layers) - 1) and not self.conv_output:
h = layer(h)
# Convolution layer
else:
h = layer(g, h, g.edata['edge_type'])
g.ndata['h'] = h
return g.ndata['h']
[docs]
class DotPredictor(nn.Module):
def __init__(self):
"""
Given node embeddings and a connectivity, predict a dot product score for each edge
"""
super(DotPredictor, self).__init__()
self.norm = torch.nn.Sigmoid()
[docs]
def forward(self, g, h):
with g.local_scope():
g.ndata['h'] = h
# Compute a new edge feature named 'score' by a dot-product between the
# source node feature 'h' and destination node feature 'h'.
g.apply_edges(fn.u_dot_v('h', 'h', 'score'))
g.edata['score'] = self.norm(g.edata['score'])
# u_dot_v returns a 1-element vector for each edge so you need to squeeze it.
return g.edata['score'][:, 0]
[docs]
class BasePairPredictor(nn.Module):
def __init__(self, encoder, decoder=DotPredictor()):
"""This is an exemple RGCN for link prediction, that uses the previous Embedder network
Predict the probability that two nucleotides are base paired, based on the dot product of the node embeddings
:param encoder: An Embedder network as defined above
:param decoder: A tool to compute the dot products of a given connectivity.
"""
super(BasePairPredictor, self).__init__()
self.encoder = encoder
self.decoder = decoder
[docs]
def forward(self, g, negative_graph=None):
"""
Predicts the probability that each edge exists.
If negative graph is not None, we embed the real graph and then predict the negative graph connectivity
:param g: The real graph to compute node embeddings and edge likelihood over
:param negative_graph: A decoy connectivity to compute edge likelihood over
:return: The score for the edge likelihood
"""
with g.local_scope():
h = self.encoder(g)
if negative_graph is not None:
return self.decoder(negative_graph, h)
return self.decoder(g, h)