@Override
void writePacket(DataTransferPacket packet) throws IOException {
if (!writtenHeader) {
writeChecksumHeader(checksum);
writtenHeader = true;
}
boolean forceSync = packet.isForceSync();
int dataLength = packet.dataLength;
if (dataLength <= 0) {
LOG.warn("NTar: writePacket: Receiving empty packet:" +
packet + " for block " + block);
} else {
dataLength = (int) Math.min(dataLength, this.length - this.writtenLength);
setBlockPosition(offsetInBlock); // adjust file position
offsetInBlock += dataLength;
this.writtenLength += dataLength;
int checksumLength = (dataLength + bytesPerChecksum -1)/bytesPerChecksum
* checksumSize;
byte[] pktBuf = new byte[checksumLength + dataLength];
Arrays.fill(pktBuf, (byte)0);
System.arraycopy(packet.buffer, 0, pktBuf,
checksumLength, dataLength);
ChecksumUtil.updateChunkChecksum(pktBuf, 0,
checksumLength, dataLength, checksum);
try {
if (!finalized) {
long writeStartTime = System.currentTimeMillis();
//finally write to the disk :
out.write(pktBuf, checksumLength, dataLength);
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
if (partialCrc != null) {
if (dataLength > bytesPerChecksum) {
throw new IOException("Got wrong length during mergeBlock(" +
block + ") from " + childAddrs + " " +
"A packet can have only one partial chunk."+
" len = " + dataLength +
" bytesPerChecksum " + bytesPerChecksum);
}
partialCrc.update(pktBuf, checksumLength, dataLength);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
checksumOut.write(buf);
LOG.debug("Writing out partial crc for data len " + dataLength);
partialCrc = null;
} else {
checksumOut.write(pktBuf, 0, checksumLength);
}
datanode.myMetrics.bytesWritten.inc(dataLength);
flush(forceSync);
this.replicaBeingWritten.setBytesOnDisk(offsetInBlock);
// Record time taken to write packet
long writePacketDuration = System.currentTimeMillis() - writeStartTime;
datanode.myMetrics.writePacketLatency.inc(writePacketDuration);
}
} catch (ClosedByInterruptException cix) {
LOG.warn( "NTar: Thread interrupted when flushing bytes to disk."
+ "Might cause inconsistent sates", cix);
throw cix;
} catch (InterruptedIOException iix) {
LOG.warn(
"NTar: InterruptedIOException when flushing bytes to disk."
+ "Might cause inconsistent sates", iix);
throw iix;
} catch (IOException iex) {
datanode.checkDiskError(iex);
throw iex;
}
}
}