/**
* Create the mpi channel
*
* @param config configuration
* @param wController controller
*/
public TWSMPIChannel(Config config,
IWorkerController wController) {
Object commObject = wController.getRuntimeObject("comm");
if (commObject == null) {
this.comm = MPI.COMM_WORLD;
} else {
this.comm = (Intracomm) commObject;
}
int pendingSize = CommunicationContext.networkChannelPendingSize(config);
this.pendingSends = new ArrayBlockingQueue<>(pendingSize);
this.registeredReceives = new ArrayList<>(1024);
this.groupedRegisteredReceives = new Int2ObjectArrayMap<>();
this.waitForCompletionSends = new IterativeLinkedList<>();
this.workerId = wController.getWorkerInfo().getWorkerID();
}
/** */
public Int2ObjectArrayMap<IntSet> indexesMap() {
Int2ObjectArrayMap<IntSet> res = new Int2ObjectArrayMap<>();
for (Integer row : sto.keySet())
res.put(row.intValue(), (IntSet)sto.get(row).keySet());
return res;
}
/**
* Create the TCP channel
* @param config configuration
* @param wController controller
*/
public TWSTCPChannel(Config config,
IWorkerController wController) {
int index = wController.getWorkerInfo().getWorkerID();
int workerPort = wController.getWorkerInfo().getPort();
String localIp = wController.getWorkerInfo().getWorkerIP();
workerController = wController;
maxConnEstTime = TCPContext.maxConnEstTime(config);
channel = createChannel(config, localIp, workerPort, index);
// now lets start listening before sending the ports to master,
channel.startListening();
// wait for everyone to start the job master
try {
workerController.waitOnBarrier();
} catch (TimeoutException timeoutException) {
LOG.log(Level.SEVERE, timeoutException.getMessage(), timeoutException);
throw new Twister2RuntimeException(timeoutException);
}
// now talk to a central server and get the information about the worker
// this is a synchronization step
List<JobMasterAPI.WorkerInfo> wInfo = workerController.getJoinedWorkers();
// lets start the client connections now
List<NetworkInfo> nInfos = new ArrayList<>();
for (JobMasterAPI.WorkerInfo w : wInfo) {
NetworkInfo networkInfo = new NetworkInfo(w.getWorkerID());
networkInfo.addProperty(TCPContext.NETWORK_PORT, w.getPort());
networkInfo.addProperty(TCPContext.NETWORK_HOSTNAME, w.getWorkerIP());
nInfos.add(networkInfo);
}
// start the connections
channel.startConnections(nInfos);
// now lets wait for connections to be established
channel.waitForConnections(maxConnEstTime);
int pendingSize = CommunicationContext.networkChannelPendingSize(config);
this.pendingSends = new ArrayBlockingQueue<>(pendingSize);
this.registeredReceives = new ArrayList<>(1024);
this.groupedRegisteredReceives = new Int2ObjectArrayMap<>();
this.waitForCompletionSends = new IterativeLinkedList<>();
this.executor = wController.getWorkerInfo().getWorkerID();
}
/** */
public Int2ObjectArrayMap<IntSet> indexesMap() {
return ((SparseMatrixStorage)getStorage()).indexesMap();
}
/**
* Compute the probability of the alleles segregating given the genotype likelihoods of the samples in vc
*
* @param vc the VariantContext holding the alleles and sample information. The VariantContext
* must have at least 1 alternative allele
* @return result (for programming convenience)
*/
public AFCalculationResult calculate(final VariantContext vc, final int defaultPloidy) {
Utils.nonNull(vc, "VariantContext cannot be null");
final int numAlleles = vc.getNAlleles();
final List<Allele> alleles = vc.getAlleles();
Utils.validateArg( numAlleles > 1, () -> "VariantContext has only a single reference allele, but getLog10PNonRef requires at least one at all " + vc);
final double[] priorPseudocounts = alleles.stream()
.mapToDouble(a -> a.isReference() ? refPseudocount : (a.length() == vc.getReference().length() ? snpPseudocount : indelPseudocount)).toArray();
double[] alleleCounts = new double[numAlleles];
final double flatLog10AlleleFrequency = -MathUtils.log10(numAlleles); // log10(1/numAlleles)
double[] log10AlleleFrequencies = new IndexRange(0, numAlleles).mapToDouble(n -> flatLog10AlleleFrequency);
for (double alleleCountsMaximumDifference = Double.POSITIVE_INFINITY; alleleCountsMaximumDifference > THRESHOLD_FOR_ALLELE_COUNT_CONVERGENCE; ) {
final double[] newAlleleCounts = effectiveAlleleCounts(vc, log10AlleleFrequencies);
alleleCountsMaximumDifference = Arrays.stream(MathArrays.ebeSubtract(alleleCounts, newAlleleCounts)).map(Math::abs).max().getAsDouble();
alleleCounts = newAlleleCounts;
final double[] posteriorPseudocounts = MathArrays.ebeAdd(priorPseudocounts, alleleCounts);
// first iteration uses flat prior in order to avoid local minimum where the prior + no pseudocounts gives such a low
// effective allele frequency that it overwhelms the genotype likelihood of a real variant
// basically, we want a chance to get non-zero pseudocounts before using a prior that's biased against a variant
log10AlleleFrequencies = new Dirichlet(posteriorPseudocounts).log10MeanWeights();
}
double[] log10POfZeroCountsByAllele = new double[numAlleles];
double log10PNoVariant = 0;
final boolean spanningDeletionPresent = alleles.contains(Allele.SPAN_DEL);
final Map<Integer, int[]> nonVariantIndicesByPloidy = new Int2ObjectArrayMap<>();
// re-usable buffers of the log10 genotype posteriors of genotypes missing each allele
final List<DoubleArrayList> log10AbsentPosteriors = IntStream.range(0,numAlleles).mapToObj(n -> new DoubleArrayList()).collect(Collectors.toList());
for (final Genotype g : vc.getGenotypes()) {
if (!g.hasLikelihoods()) {
continue;
}
final int ploidy = g.getPloidy() == 0 ? defaultPloidy : g.getPloidy();
final GenotypeLikelihoodCalculator glCalc = GL_CALCS.getInstance(ploidy, numAlleles);
final double[] log10GenotypePosteriors = log10NormalizedGenotypePosteriors(g, glCalc, log10AlleleFrequencies);
//the total probability
if (!spanningDeletionPresent) {
log10PNoVariant += log10GenotypePosteriors[HOM_REF_GENOTYPE_INDEX];
} else {
nonVariantIndicesByPloidy.computeIfAbsent(ploidy, p -> genotypeIndicesWithOnlyRefAndSpanDel(p, alleles));
final int[] nonVariantIndices = nonVariantIndicesByPloidy.get(ploidy);
final double[] nonVariantLog10Posteriors = MathUtils.applyToArray(nonVariantIndices, n -> log10GenotypePosteriors[n]);
// when the only alt allele is the spanning deletion the probability that the site is non-variant
// may be so close to 1 that finite precision error in log10SumLog10 yields a positive value,
// which is bogus. Thus we cap it at 0.
log10PNoVariant += Math.min(0,MathUtils.log10SumLog10(nonVariantLog10Posteriors));
}
// if the VC is biallelic the allele-specific qual equals the variant qual
if (numAlleles == 2) {
continue;
}
// for each allele, we collect the log10 probabilities of genotypes in which the allele is absent, then add (in log space)
// to get the log10 probability that the allele is absent in this sample
log10AbsentPosteriors.forEach(DoubleArrayList::clear); // clear the buffers. Note that this is O(1) due to the primitive backing array
for (int genotype = 0; genotype < glCalc.genotypeCount(); genotype++) {
final double log10GenotypePosterior = log10GenotypePosteriors[genotype];
glCalc.genotypeAlleleCountsAt(genotype).forEachAbsentAlleleIndex(a -> log10AbsentPosteriors.get(a).add(log10GenotypePosterior), numAlleles);
}
final double[] log10PNoAllele = log10AbsentPosteriors.stream()
.mapToDouble(buffer -> MathUtils.log10SumLog10(buffer.toDoubleArray()))
.map(x -> Math.min(0, x)).toArray(); // if prob of non hom ref > 1 due to finite precision, short-circuit to avoid NaN
// multiply the cumulative probabilities of alleles being absent, which is addition of logs
MathUtils.addToArrayInPlace(log10POfZeroCountsByAllele, log10PNoAllele);
}
// for biallelic the allele-specific qual equals the variant qual, and we short-circuited the calculation above
if (numAlleles == 2) {
log10POfZeroCountsByAllele[1] = log10PNoVariant;
}
// unfortunately AFCalculationResult expects integers for the MLE. We really should emit the EM no-integer values
// which are valuable (eg in CombineGVCFs) as the sufficient statistics of the Dirichlet posterior on allele frequencies
final int[] integerAlleleCounts = Arrays.stream(alleleCounts).mapToInt(x -> (int) Math.round(x)).toArray();
final int[] integerAltAlleleCounts = Arrays.copyOfRange(integerAlleleCounts, 1, numAlleles);
//skip the ref allele (index 0)
final Map<Allele, Double> log10PRefByAllele = IntStream.range(1, numAlleles).boxed()
.collect(Collectors.toMap(alleles::get, a -> log10POfZeroCountsByAllele[a]));
return new AFCalculationResult(integerAltAlleleCounts, alleles, log10PNoVariant, log10PRefByAllele);
}
public EntityToSpotListMap() {
map = new Int2ObjectArrayMap<String>(1000000);
}