Java源码示例:org.apache.bookkeeper.client.LedgerEntry
示例1
private void simpleReadEntries(LedgerHandle lh, long fromEntryId, long untilEntryId) throws Exception {
Enumeration<LedgerEntry> entries = lh.readEntries(fromEntryId, untilEntryId);
long i = fromEntryId;
System.out.println("Entries:");
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
System.out.println("\t" + i + "(eid=" + entry.getEntryId() + ")\t: ");
Entry.Reader reader = Entry.newBuilder()
.setLogSegmentInfo(0L, 0L)
.setEntryId(entry.getEntryId())
.setEntry(entry.getEntryBuffer())
.setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion))
.buildReader();
entry.getEntryBuffer().release();
printEntry(reader);
++i;
}
}
示例2
void processReadEntries(int rc,
LedgerHandle lh,
Enumeration<LedgerEntry> entries,
Object ctx) {
if (isDone()) {
return;
}
if (!checkReturnCodeAndHandleFailure(rc, false)) {
return;
}
LedgerEntry entry = null;
while (entries.hasMoreElements()) {
// more entries are returned
if (null != entry) {
completeExceptionally(BKException.Code.UnexpectedConditionException);
return;
}
entry = entries.nextElement();
}
if (null == entry || entry.getEntryId() != entryId) {
completeExceptionally(BKException.Code.UnexpectedConditionException);
return;
}
complete(entry);
}
示例3
void processReadEntry(int rc,
long entryId,
LedgerEntry entry,
Object ctx) {
if (isDone()) {
return;
}
if (!checkReturnCodeAndHandleFailure(rc, true)) {
return;
}
if (null != entry && this.entryId == entryId) {
complete(entry);
return;
}
// the long poll is timeout or interrupted; we will retry it again.
issueRead(this);
}
示例4
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> enumeration, Object ctx) {
this.rc = rc;
if (BKException.Code.OK == rc && enumeration.hasMoreElements()) {
entry = enumeration.nextElement();
} else {
entry = null;
}
isDone = true;
// construct a new read request
long nextEntry = nextReadEntry.getAndIncrement();
if (nextEntry <= lac) {
PendingReadRequest nextRead =
new PendingReadRequest(nextEntry);
pendingReads.add(nextRead);
nextRead.read();
}
triggerCallbacks();
}
示例5
/**
* Async read last confirmed and entry
*
* @param ledgerDesc
* ledger descriptor
* @param entryId
* entry id to read
* @param timeOutInMillis
* time out if no newer entry available
* @param parallel
* whether to read from replicas in parallel
*/
public Future<Pair<Long, LedgerEntry>> asyncReadLastConfirmedAndEntry(
LedgerDescriptor ledgerDesc,
long entryId,
long timeOutInMillis,
boolean parallel) {
RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc);
if (null == refHandle) {
LOG.error("Accessing ledger {} without opening.", ledgerDesc);
return Future.exception(BKException.create(BKException.Code.UnexpectedConditionException));
}
final Promise<Pair<Long, LedgerEntry>> promise = new Promise<Pair<Long, LedgerEntry>>();
refHandle.handle.asyncReadLastConfirmedAndEntry(entryId, timeOutInMillis, parallel,
new AsyncCallback.ReadLastConfirmedAndEntryCallback() {
@Override
public void readLastConfirmedAndEntryComplete(int rc, long lac, LedgerEntry ledgerEntry, Object ctx) {
if (BKException.Code.OK == rc) {
promise.setValue(Pair.of(lac, ledgerEntry));
} else {
promise.setException(BKException.create(rc));
}
}
}, null);
return promise;
}
示例6
/**
* Async Read Entries
*
* @param ledgerDesc
* ledger descriptor
* @param first
* first entry
* @param last
* second entry
*/
public Future<Enumeration<LedgerEntry>> asyncReadEntries(
LedgerDescriptor ledgerDesc, long first, long last) {
RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc);
if (null == refHandle) {
LOG.error("Accessing ledger {} without opening.", ledgerDesc);
return Future.exception(BKException.create(BKException.Code.UnexpectedConditionException));
}
final Promise<Enumeration<LedgerEntry>> promise = new Promise<Enumeration<LedgerEntry>>();
refHandle.handle.asyncReadEntries(first, last, new AsyncCallback.ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) {
if (BKException.Code.OK == rc) {
promise.setValue(entries);
} else {
promise.setException(BKException.create(rc));
}
}
}, null);
return promise;
}
示例7
private void simpleReadEntries(LedgerHandle lh, long fromEntryId, long untilEntryId) throws Exception {
Enumeration<LedgerEntry> entries = lh.readEntries(fromEntryId, untilEntryId);
long i = fromEntryId;
System.out.println("Entries:");
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
System.out.println("\t" + i + "(eid=" + entry.getEntryId() + ")\t: ");
Entry.Reader reader = Entry.newBuilder()
.setLogSegmentInfo(0L, 0L)
.setEntryId(entry.getEntryId())
.setInputStream(entry.getEntryInputStream())
.setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion))
.buildReader();
printEntry(reader);
++i;
}
}
示例8
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> enumeration, Object ctx) {
this.rc = rc;
if (BKException.Code.OK == rc && enumeration.hasMoreElements()) {
entry = enumeration.nextElement();
} else {
entry = null;
}
isDone = true;
// construct a new read request
long nextEntry = nextReadEntry.getAndIncrement();
if (nextEntry <= lac) {
PendingReadRequest nextRead =
new PendingReadRequest(nextEntry);
pendingReads.add(nextRead);
nextRead.read();
}
triggerCallbacks();
}
示例9
private static CompletableFuture<MessageIdData> readOneMessageId(LedgerHandle lh, long entryId) {
CompletableFuture<MessageIdData> promise = new CompletableFuture<>();
lh.asyncReadEntries(entryId, entryId,
(rc, _lh, seq, ctx) -> {
if (rc != BKException.Code.OK) {
promise.completeExceptionally(BKException.create(rc));
} else {
// Need to release buffers for all entries in the sequence
if (seq.hasMoreElements()) {
LedgerEntry entry = seq.nextElement();
try (RawMessage m = RawMessageImpl.deserializeFrom(entry.getEntryBuffer())) {
entry.getEntryBuffer().release();
while (seq.hasMoreElements()) {
seq.nextElement().getEntryBuffer().release();
}
promise.complete(m.getMessageIdData());
}
} else {
promise.completeExceptionally(new NoSuchElementException(
String.format("No such entry %d in ledger %d", entryId, lh.getId())));
}
}
}, null);
return promise;
}
示例10
@Test
void whenWriteAndReadEntries_thenSuccess() throws Exception {
LedgerHandle lh = createLedger(bk, "myledger", ledgerPassword);
long start = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
byte[] data = new String("message-" + i).getBytes();
lh.append(data);
}
lh.close();
long elapsed = System.currentTimeMillis() - start;
LOG.info("Entries added to ledgerId " + lh.getId() + ", elapsed=" + elapsed);
Long ledgerId = findLedgerByName(bk, "myledger").orElse(null);
assertNotNull(ledgerId);
lh = bk.openLedger(ledgerId, BookKeeper.DigestType.MAC, ledgerPassword);
long lastId = lh.readLastConfirmed();
Enumeration<LedgerEntry> entries = lh.readEntries(0, lastId);
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
String msg = new String(entry.getEntry());
LOG.info("Entry: id=" + entry.getEntryId() + ", data=" + msg);
}
}
示例11
protected void repairLogSegment(BookKeeperAdmin bkAdmin,
MetadataUpdater metadataUpdater,
LogSegmentMetadata segment) throws Exception {
if (segment.isInProgress()) {
System.out.println("Skip inprogress log segment " + segment);
return;
}
LedgerHandle lh = bkAdmin.openLedger(segment.getLogSegmentId());
long lac = lh.getLastAddConfirmed();
Enumeration<LedgerEntry> entries = lh.readEntries(lac, lac);
if (!entries.hasMoreElements()) {
throw new IOException("Entry " + lac + " isn't found for " + segment);
}
LedgerEntry lastEntry = entries.nextElement();
Entry.Reader reader = Entry.newBuilder()
.setLogSegmentInfo(segment.getLogSegmentSequenceNumber(), segment.getStartSequenceId())
.setEntryId(lastEntry.getEntryId())
.setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(segment.getVersion()))
.setEntry(lastEntry.getEntryBuffer())
.buildReader();
lastEntry.getEntryBuffer().release();
LogRecordWithDLSN record = reader.nextRecord();
LogRecordWithDLSN lastRecord = null;
while (null != record) {
lastRecord = record;
record = reader.nextRecord();
}
if (null == lastRecord) {
throw new IOException("No record found in entry " + lac + " for " + segment);
}
System.out.println("Updating last record for " + segment + " to " + lastRecord);
if (!IOUtils.confirmPrompt("Do you want to make this change (Y/N): ")) {
return;
}
metadataUpdater.updateLastRecord(segment, lastRecord);
}
示例12
void complete(LedgerEntry entry) {
// the reader is already closed
if (isClosed()) {
release(entry);
}
synchronized (this) {
if (done) {
return;
}
this.rc = BKException.Code.OK;
this.entry = entry;
}
setDone(true);
}
示例13
@Override
public void readComplete(int rc,
LedgerHandle lh,
Enumeration<LedgerEntry> entries,
Object ctx) {
if (failureInjector.shouldInjectCorruption(entryId, entryId)) {
rc = BKException.Code.DigestMatchException;
}
processReadEntries(rc, lh, entries, ctx);
}
示例14
@Override
public void readLastConfirmedAndEntryComplete(int rc,
long entryId,
LedgerEntry entry,
Object ctx) {
if (failureInjector.shouldInjectCorruption(this.entryId, this.entryId)) {
rc = BKException.Code.DigestMatchException;
}
processReadEntry(rc, entryId, entry, ctx);
}
示例15
Entry.Reader processReadEntry(LedgerEntry entry) throws IOException {
return Entry.newBuilder()
.setLogSegmentInfo(lssn, startSequenceId)
.setEntryId(entry.getEntryId())
.setEnvelopeEntry(envelopeEntries)
.deserializeRecordSet(deserializeRecordSet)
.setEntry(entry.getEntryBuffer())
.buildReader();
}
示例16
Entry.Reader processReadEntry(LedgerEntry entry) throws IOException {
return Entry.newBuilder()
.setLogSegmentInfo(lssn, startSequenceId)
.setEntryId(entry.getEntryId())
.setEnvelopeEntry(envelopeEntries)
.deserializeRecordSet(deserializeRecordSet)
.setEntry(entry.getEntryBuffer())
.buildReader();
}
示例17
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) {
CompletableFuture<List<Entry.Reader>> promise = (CompletableFuture<List<Entry.Reader>>) ctx;
if (BKException.Code.OK == rc) {
List<Entry.Reader> entryList = Lists.newArrayList();
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
try {
entryList.add(processReadEntry(entry));
} catch (IOException ioe) {
// release the buffers
while (entries.hasMoreElements()) {
LedgerEntry le = entries.nextElement();
le.getEntryBuffer().release();
}
FutureUtils.completeExceptionally(promise, ioe);
return;
} finally {
entry.getEntryBuffer().release();
}
}
FutureUtils.complete(promise, entryList);
} else {
FutureUtils.completeExceptionally(promise,
new BKTransmitException("Failed to read entries :", rc));
}
}
示例18
@Override
public void run() {
long lac = lh.getLastAddConfirmed();
long entryId = 0L;
while (entryId <= lac) {
long startEntryId = entryId;
long endEntryId = Math.min(startEntryId + batchSize - 1, lac);
Enumeration<LedgerEntry> entries = null;
while (null == entries) {
try {
entries = lh.readEntries(startEntryId, endEntryId);
} catch (BKException bke) {
logger.error("Encountered exceptions on reading [ {} - {} ] ",
new Object[] { startEntryId, endEntryId, bke });
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
if (null == entries) {
break;
}
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
readEntryListener.onEntryComplete(BKException.Code.OK, lh, entry, null);
}
entryId = endEntryId + 1;
}
}
示例19
private void issueReadLastConfirmedAndEntry(final boolean parallel,
final long lastAddConfirmed) {
final String ctx = String.format("ReadLastConfirmedAndEntry(%s, %d)", parallel? "Parallel":"Sequential", lastAddConfirmed);
final ReadLastConfirmedAndEntryCallbackWithNotification callback =
new ReadLastConfirmedAndEntryCallbackWithNotification(lastAddConfirmed, this, ctx);
boolean callbackImmediately = setMetadataNotification(callback);
handleCache.asyncReadLastConfirmedAndEntry(
currentLH,
nextReadAheadPosition.getEntryId(),
conf.getReadLACLongPollTimeout(),
parallel
).addEventListener(new FutureEventListener<Pair<Long, LedgerEntry>>() {
@Override
public void onSuccess(Pair<Long, LedgerEntry> lacAndEntry) {
callback.readLastConfirmedAndEntryComplete(
BKException.Code.OK,
lacAndEntry.getLeft(),
lacAndEntry.getRight(),
ctx);
}
@Override
public void onFailure(Throwable cause) {
callback.readLastConfirmedAndEntryComplete(
FutureUtils.bkResultCode(cause),
lastAddConfirmed,
null,
ctx);
}
});
callback.callbackImmediately(callbackImmediately);
readAheadReadLACAndEntryCounter.inc();
}
示例20
@Override
public void readLastConfirmedAndEntryComplete(int rc, long lac, LedgerEntry entry, Object ctx) {
if (called.compareAndSet(false, true)) {
// clear the notification when callback
synchronized (notificationLock) {
metadataNotification = null;
}
this.cb.readLastConfirmedAndEntryComplete(rc, lac, entry, ctx);
}
}
示例21
protected void repairLogSegment(BookKeeperAdmin bkAdmin,
MetadataUpdater metadataUpdater,
LogSegmentMetadata segment) throws Exception {
if (segment.isInProgress()) {
System.out.println("Skip inprogress log segment " + segment);
return;
}
LedgerHandle lh = bkAdmin.openLedger(segment.getLedgerId(), true);
long lac = lh.getLastAddConfirmed();
Enumeration<LedgerEntry> entries = lh.readEntries(lac, lac);
if (!entries.hasMoreElements()) {
throw new IOException("Entry " + lac + " isn't found for " + segment);
}
LedgerEntry lastEntry = entries.nextElement();
Entry.Reader reader = Entry.newBuilder()
.setLogSegmentInfo(segment.getLogSegmentSequenceNumber(), segment.getStartSequenceId())
.setEntryId(lastEntry.getEntryId())
.setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(segment.getVersion()))
.setInputStream(lastEntry.getEntryInputStream())
.buildReader();
LogRecordWithDLSN record = reader.nextRecord();
LogRecordWithDLSN lastRecord = null;
while (null != record) {
lastRecord = record;
record = reader.nextRecord();
}
if (null == lastRecord) {
throw new IOException("No record found in entry " + lac + " for " + segment);
}
System.out.println("Updating last record for " + segment + " to " + lastRecord);
if (!IOUtils.confirmPrompt("Do you want to make this change (Y/N): ")) {
return;
}
metadataUpdater.updateLastRecord(segment, lastRecord);
}
示例22
/**
* Process each record using LogRecordSelector.
*
* @param streamName
* fully qualified stream name (used for logging)
* @param logSegmentSeqNo
* ledger sequence number
* @param entry
* ledger entry
* @param context
* scan context
* @return log record with dlsn inside the ledger entry
* @throws IOException
*/
private static void visitEntryRecords(
String streamName,
LogSegmentMetadata metadata,
long logSegmentSeqNo,
LedgerEntry entry,
ScanContext context,
LogRecordSelector selector) throws IOException {
Entry.Reader reader = Entry.newBuilder()
.setLogSegmentInfo(logSegmentSeqNo, metadata.getStartSequenceId())
.setEntryId(entry.getEntryId())
.setEnvelopeEntry(metadata.getEnvelopeEntries())
.setInputStream(entry.getEntryInputStream())
.buildReader();
LogRecordWithDLSN nextRecord = reader.nextRecord();
while (nextRecord != null) {
LogRecordWithDLSN record = nextRecord;
nextRecord = reader.nextRecord();
context.numRecordsScanned.incrementAndGet();
if (!context.includeControl && record.isControl()) {
continue;
}
if (!context.includeEndOfStream && record.isEndOfStream()) {
continue;
}
selector.process(record);
}
}
示例23
@Test(timeout = 60000)
public void testBadVersionOnTwoAllocators() throws Exception {
String allocationPath = "/allocation-bad-version";
zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Stat stat = new Stat();
byte[] data = zkc.get().getData(allocationPath, false, stat);
Versioned<byte[]> allocationData = new Versioned<byte[]>(data, new ZkVersion(stat.getVersion()));
SimpleLedgerAllocator allocator1 =
new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
SimpleLedgerAllocator allocator2 =
new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
allocator1.allocate();
// wait until allocated
ZKTransaction txn1 = newTxn();
LedgerHandle lh = FutureUtils.result(allocator1.tryObtain(txn1, NULL_LISTENER));
allocator2.allocate();
ZKTransaction txn2 = newTxn();
try {
FutureUtils.result(allocator2.tryObtain(txn2, NULL_LISTENER));
fail("Should fail allocating on second allocator as allocator1 is starting allocating something.");
} catch (ZKException zke) {
assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode());
}
FutureUtils.result(txn1.execute());
Utils.close(allocator1);
Utils.close(allocator2);
long eid = lh.addEntry("hello world".getBytes());
lh.close();
LedgerHandle readLh = bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
Enumeration<LedgerEntry> entries = readLh.readEntries(eid, eid);
int i = 0;
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
assertEquals("hello world", new String(entry.getEntry(), UTF_8));
++i;
}
assertEquals(1, i);
}
示例24
@Override
public boolean nextKeyValue()
throws IOException, InterruptedException {
LogRecordWithDLSN record;
currentRecord = null;
if (null != reader) {
record = reader.nextRecord();
if (null != record) {
currentRecord = record;
readPos = record.getPositionWithinLogSegment();
return true;
} else {
return false;
}
}
++entryId;
if (entryId > lh.getLastAddConfirmed()) {
return false;
}
try {
Enumeration<LedgerEntry> entries =
lh.readEntries(entryId, entryId);
if (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
reader = Entry.newBuilder()
.setLogSegmentInfo(metadata.getLogSegmentSequenceNumber(),
metadata.getStartSequenceId())
.setEntryId(entry.getEntryId())
.setEnvelopeEntry(
LogSegmentMetadata.supportsEnvelopedEntries(metadata.getVersion()))
.deserializeRecordSet(true)
.setInputStream(entry.getEntryInputStream())
.buildReader();
}
return nextKeyValue();
} catch (BKException e) {
throw new IOException(e);
}
}
示例25
@Override
public void run() {
long lac = lh.getLastAddConfirmed();
long entryId = 0L;
while (entryId <= lac) {
long startEntryId = entryId;
long endEntryId = Math.min(startEntryId + batchSize - 1, lac);
Enumeration<LedgerEntry> entries = null;
while (null == entries) {
try {
entries = lh.readEntries(startEntryId, endEntryId);
} catch (BKException bke) {
logger.error("Encountered exceptions on reading [ {} - {} ] ",
new Object[] { startEntryId, endEntryId, bke });
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
if (null == entries) {
break;
}
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
readEntryListener.onEntryComplete(BKException.Code.OK, lh, entry, null);
}
entryId = endEntryId + 1;
}
}
示例26
private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long from, long to) {
CompletableFuture<Enumeration<LedgerEntry>> promise = new CompletableFuture<>();
lh.asyncReadEntries(from, to,
(rc, _lh, seq, ctx) -> {
if (rc != BKException.Code.OK) {
promise.completeExceptionally(BKException.create(rc));
} else {
promise.complete(seq);
}
}, null);
return promise.thenApply(
(seq) -> {
List<Entry> entries = new ArrayList<Entry>();
while (seq.hasMoreElements()) {
ByteBuf buf = seq.nextElement().getEntryBuffer();
try (RawMessage m = RawMessageImpl.deserializeFrom(buf)) {
entries.add(EntryImpl.create(m.getMessageIdData().getLedgerId(),
m.getMessageIdData().getEntryId(),
m.getHeadersAndPayload()));
} finally {
buf.release();
}
}
return entries;
});
}
示例27
static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle ledger, long entry) {
final CompletableFuture<LedgerEntry> future = new CompletableFuture<>();
ledger.asyncReadEntries(entry, entry,
(rc, handle, entries, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId(), entry));
} else {
future.complete(entries.nextElement());
}
}, null
);
return future;
}
示例28
static CompletableFuture<SchemaStorageFormat.SchemaEntry> parseSchemaEntry(LedgerEntry ledgerEntry) {
CompletableFuture<SchemaStorageFormat.SchemaEntry> result = new CompletableFuture<>();
try {
result.complete(SchemaStorageFormat.SchemaEntry.parseFrom(ledgerEntry.getEntry()));
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}
示例29
private List<String> compactAndVerify(String topic, Map<String, byte[]> expected) throws Exception {
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null, Optional.empty(), null);
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
long compactedLedgerId = compactor.compact(topic).get();
LedgerHandle ledger = bk.openLedger(compactedLedgerId,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
Assert.assertEquals(ledger.getLastAddConfirmed() + 1, // 0..lac
expected.size(),
"Should have as many entries as there is keys");
List<String> keys = new ArrayList<>();
Enumeration<LedgerEntry> entries = ledger.readEntries(0, ledger.getLastAddConfirmed());
while (entries.hasMoreElements()) {
ByteBuf buf = entries.nextElement().getEntryBuffer();
RawMessage m = RawMessageImpl.deserializeFrom(buf);
String key = extractKey(m);
keys.add(key);
ByteBuf payload = extractPayload(m);
byte[] bytes = new byte[payload.readableBytes()];
payload.readBytes(bytes);
Assert.assertEquals(bytes, expected.remove(key),
"Compacted version should match expected version");
m.close();
}
Assert.assertTrue(expected.isEmpty(), "All expected keys should have been found");
return keys;
}
示例30
void release(LedgerEntry entry) {
if (null != entry) {
entry.getEntryBuffer().release();
}
}