Java源码示例:org.apache.bookkeeper.client.LedgerEntry

示例1
private void simpleReadEntries(LedgerHandle lh, long fromEntryId, long untilEntryId) throws Exception {
    Enumeration<LedgerEntry> entries = lh.readEntries(fromEntryId, untilEntryId);
    long i = fromEntryId;
    System.out.println("Entries:");
    while (entries.hasMoreElements()) {
        LedgerEntry entry = entries.nextElement();
        System.out.println("\t" + i  + "(eid=" + entry.getEntryId() + ")\t: ");
        Entry.Reader reader = Entry.newBuilder()
                .setLogSegmentInfo(0L, 0L)
                .setEntryId(entry.getEntryId())
                .setEntry(entry.getEntryBuffer())
                .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion))
                .buildReader();
        entry.getEntryBuffer().release();
        printEntry(reader);
        ++i;
    }
}
 
示例2
void processReadEntries(int rc,
                        LedgerHandle lh,
                        Enumeration<LedgerEntry> entries,
                        Object ctx) {
    if (isDone()) {
        return;
    }
    if (!checkReturnCodeAndHandleFailure(rc, false)) {
        return;
    }
    LedgerEntry entry = null;
    while (entries.hasMoreElements()) {
        // more entries are returned
        if (null != entry) {
            completeExceptionally(BKException.Code.UnexpectedConditionException);
            return;
        }
        entry = entries.nextElement();
    }
    if (null == entry || entry.getEntryId() != entryId) {
        completeExceptionally(BKException.Code.UnexpectedConditionException);
        return;
    }
    complete(entry);
}
 
示例3
void processReadEntry(int rc,
                      long entryId,
                      LedgerEntry entry,
                      Object ctx) {
    if (isDone()) {
        return;
    }
    if (!checkReturnCodeAndHandleFailure(rc, true)) {
        return;
    }
    if (null != entry && this.entryId == entryId) {
        complete(entry);
        return;
    }
    // the long poll is timeout or interrupted; we will retry it again.
    issueRead(this);
}
 
示例4
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> enumeration, Object ctx) {
    this.rc = rc;
    if (BKException.Code.OK == rc && enumeration.hasMoreElements()) {
        entry = enumeration.nextElement();
    } else {
        entry = null;
    }
    isDone = true;
    // construct a new read request
    long nextEntry = nextReadEntry.getAndIncrement();
    if (nextEntry <= lac) {
        PendingReadRequest nextRead =
                new PendingReadRequest(nextEntry);
        pendingReads.add(nextRead);
        nextRead.read();
    }
    triggerCallbacks();
}
 
示例5
/**
 * Async read last confirmed and entry
 *
 * @param ledgerDesc
 *          ledger descriptor
 * @param entryId
 *          entry id to read
 * @param timeOutInMillis
 *          time out if no newer entry available
 * @param parallel
 *          whether to read from replicas in parallel
 */
public Future<Pair<Long, LedgerEntry>> asyncReadLastConfirmedAndEntry(
        LedgerDescriptor ledgerDesc,
        long entryId,
        long timeOutInMillis,
        boolean parallel) {
    RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc);
    if (null == refHandle) {
        LOG.error("Accessing ledger {} without opening.", ledgerDesc);
        return Future.exception(BKException.create(BKException.Code.UnexpectedConditionException));
    }
    final Promise<Pair<Long, LedgerEntry>> promise = new Promise<Pair<Long, LedgerEntry>>();
    refHandle.handle.asyncReadLastConfirmedAndEntry(entryId, timeOutInMillis, parallel,
            new AsyncCallback.ReadLastConfirmedAndEntryCallback() {
                @Override
                public void readLastConfirmedAndEntryComplete(int rc, long lac, LedgerEntry ledgerEntry, Object ctx) {
                    if (BKException.Code.OK == rc) {
                        promise.setValue(Pair.of(lac, ledgerEntry));
                    } else {
                        promise.setException(BKException.create(rc));
                    }
                }
            }, null);
    return promise;
}
 
示例6
/**
 * Async Read Entries
 *
 * @param ledgerDesc
 *          ledger descriptor
 * @param first
 *          first entry
 * @param last
 *          second entry
 */
public Future<Enumeration<LedgerEntry>> asyncReadEntries(
        LedgerDescriptor ledgerDesc, long first, long last) {
    RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc);
    if (null == refHandle) {
        LOG.error("Accessing ledger {} without opening.", ledgerDesc);
        return Future.exception(BKException.create(BKException.Code.UnexpectedConditionException));
    }
    final Promise<Enumeration<LedgerEntry>> promise = new Promise<Enumeration<LedgerEntry>>();
    refHandle.handle.asyncReadEntries(first, last, new AsyncCallback.ReadCallback() {
        @Override
        public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) {
            if (BKException.Code.OK == rc) {
                promise.setValue(entries);
            } else {
                promise.setException(BKException.create(rc));
            }
        }
    }, null);
    return promise;
}
 
示例7
private void simpleReadEntries(LedgerHandle lh, long fromEntryId, long untilEntryId) throws Exception {
    Enumeration<LedgerEntry> entries = lh.readEntries(fromEntryId, untilEntryId);
    long i = fromEntryId;
    System.out.println("Entries:");
    while (entries.hasMoreElements()) {
        LedgerEntry entry = entries.nextElement();
        System.out.println("\t" + i  + "(eid=" + entry.getEntryId() + ")\t: ");
        Entry.Reader reader = Entry.newBuilder()
                .setLogSegmentInfo(0L, 0L)
                .setEntryId(entry.getEntryId())
                .setInputStream(entry.getEntryInputStream())
                .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion))
                .buildReader();
        printEntry(reader);
        ++i;
    }
}
 
示例8
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> enumeration, Object ctx) {
    this.rc = rc;
    if (BKException.Code.OK == rc && enumeration.hasMoreElements()) {
        entry = enumeration.nextElement();
    } else {
        entry = null;
    }
    isDone = true;
    // construct a new read request
    long nextEntry = nextReadEntry.getAndIncrement();
    if (nextEntry <= lac) {
        PendingReadRequest nextRead =
                new PendingReadRequest(nextEntry);
        pendingReads.add(nextRead);
        nextRead.read();
    }
    triggerCallbacks();
}
 
示例9
private static CompletableFuture<MessageIdData> readOneMessageId(LedgerHandle lh, long entryId) {
    CompletableFuture<MessageIdData> promise = new CompletableFuture<>();

    lh.asyncReadEntries(entryId, entryId,
                        (rc, _lh, seq, ctx) -> {
                            if (rc != BKException.Code.OK) {
                                promise.completeExceptionally(BKException.create(rc));
                            } else {
                                // Need to release buffers for all entries in the sequence
                                if (seq.hasMoreElements()) {
                                    LedgerEntry entry = seq.nextElement();
                                    try (RawMessage m = RawMessageImpl.deserializeFrom(entry.getEntryBuffer())) {
                                        entry.getEntryBuffer().release();
                                        while (seq.hasMoreElements()) {
                                            seq.nextElement().getEntryBuffer().release();
                                        }
                                        promise.complete(m.getMessageIdData());
                                    }
                                } else {
                                    promise.completeExceptionally(new NoSuchElementException(
                                            String.format("No such entry %d in ledger %d", entryId, lh.getId())));
                                }
                            }
                        }, null);
    return promise;
}
 
示例10
@Test
void whenWriteAndReadEntries_thenSuccess() throws Exception {
    LedgerHandle lh = createLedger(bk, "myledger", ledgerPassword);
    long start = System.currentTimeMillis();
    for (int i = 0; i < 1000; i++) {
        byte[] data = new String("message-" + i).getBytes();
        lh.append(data);
    }
    lh.close();
    long elapsed = System.currentTimeMillis() - start;
    LOG.info("Entries added to ledgerId " + lh.getId() + ", elapsed=" + elapsed);

    Long ledgerId = findLedgerByName(bk, "myledger").orElse(null);
    assertNotNull(ledgerId);
    lh = bk.openLedger(ledgerId, BookKeeper.DigestType.MAC, ledgerPassword);
    long lastId = lh.readLastConfirmed();
    Enumeration<LedgerEntry> entries = lh.readEntries(0, lastId);
    while (entries.hasMoreElements()) {
        LedgerEntry entry = entries.nextElement();
        String msg = new String(entry.getEntry());
        LOG.info("Entry: id=" + entry.getEntryId() + ", data=" + msg);
    }
}
 
示例11
protected void repairLogSegment(BookKeeperAdmin bkAdmin,
                                MetadataUpdater metadataUpdater,
                                LogSegmentMetadata segment) throws Exception {
    if (segment.isInProgress()) {
        System.out.println("Skip inprogress log segment " + segment);
        return;
    }
    LedgerHandle lh = bkAdmin.openLedger(segment.getLogSegmentId());
    long lac = lh.getLastAddConfirmed();
    Enumeration<LedgerEntry> entries = lh.readEntries(lac, lac);
    if (!entries.hasMoreElements()) {
        throw new IOException("Entry " + lac + " isn't found for " + segment);
    }
    LedgerEntry lastEntry = entries.nextElement();
    Entry.Reader reader = Entry.newBuilder()
            .setLogSegmentInfo(segment.getLogSegmentSequenceNumber(), segment.getStartSequenceId())
            .setEntryId(lastEntry.getEntryId())
            .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(segment.getVersion()))
            .setEntry(lastEntry.getEntryBuffer())
            .buildReader();
    lastEntry.getEntryBuffer().release();
    LogRecordWithDLSN record = reader.nextRecord();
    LogRecordWithDLSN lastRecord = null;
    while (null != record) {
        lastRecord = record;
        record = reader.nextRecord();
    }
    if (null == lastRecord) {
        throw new IOException("No record found in entry " + lac + " for " + segment);
    }
    System.out.println("Updating last record for " + segment + " to " + lastRecord);
    if (!IOUtils.confirmPrompt("Do you want to make this change (Y/N): ")) {
        return;
    }
    metadataUpdater.updateLastRecord(segment, lastRecord);
}
 
示例12
void complete(LedgerEntry entry) {
    // the reader is already closed
    if (isClosed()) {
        release(entry);
    }
    synchronized (this) {
        if (done) {
            return;
        }
        this.rc = BKException.Code.OK;
        this.entry = entry;
    }
    setDone(true);
}
 
示例13
@Override
public void readComplete(int rc,
                         LedgerHandle lh,
                         Enumeration<LedgerEntry> entries,
                         Object ctx) {
    if (failureInjector.shouldInjectCorruption(entryId, entryId)) {
        rc = BKException.Code.DigestMatchException;
    }
    processReadEntries(rc, lh, entries, ctx);
}
 
示例14
@Override
public void readLastConfirmedAndEntryComplete(int rc,
                                              long entryId,
                                              LedgerEntry entry,
                                              Object ctx) {
    if (failureInjector.shouldInjectCorruption(this.entryId, this.entryId)) {
        rc = BKException.Code.DigestMatchException;
    }
    processReadEntry(rc, entryId, entry, ctx);
}
 
示例15
Entry.Reader processReadEntry(LedgerEntry entry) throws IOException {
    return Entry.newBuilder()
            .setLogSegmentInfo(lssn, startSequenceId)
            .setEntryId(entry.getEntryId())
            .setEnvelopeEntry(envelopeEntries)
            .deserializeRecordSet(deserializeRecordSet)
            .setEntry(entry.getEntryBuffer())
            .buildReader();
}
 
示例16
Entry.Reader processReadEntry(LedgerEntry entry) throws IOException {
    return Entry.newBuilder()
            .setLogSegmentInfo(lssn, startSequenceId)
            .setEntryId(entry.getEntryId())
            .setEnvelopeEntry(envelopeEntries)
            .deserializeRecordSet(deserializeRecordSet)
            .setEntry(entry.getEntryBuffer())
            .buildReader();
}
 
示例17
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) {
    CompletableFuture<List<Entry.Reader>> promise = (CompletableFuture<List<Entry.Reader>>) ctx;
    if (BKException.Code.OK == rc) {
        List<Entry.Reader> entryList = Lists.newArrayList();
        while (entries.hasMoreElements()) {
            LedgerEntry entry = entries.nextElement();
            try {
                entryList.add(processReadEntry(entry));
            } catch (IOException ioe) {
                // release the buffers
                while (entries.hasMoreElements()) {
                    LedgerEntry le = entries.nextElement();
                    le.getEntryBuffer().release();
                }
                FutureUtils.completeExceptionally(promise, ioe);
                return;
            } finally {
                entry.getEntryBuffer().release();
            }
        }
        FutureUtils.complete(promise, entryList);
    } else {
        FutureUtils.completeExceptionally(promise,
                new BKTransmitException("Failed to read entries :", rc));
    }
}
 
示例18
@Override
public void run() {
    long lac = lh.getLastAddConfirmed();

    long entryId = 0L;

    while (entryId <= lac) {
        long startEntryId = entryId;
        long endEntryId = Math.min(startEntryId + batchSize - 1, lac);

        Enumeration<LedgerEntry> entries = null;
        while (null == entries) {
            try {
                entries = lh.readEntries(startEntryId, endEntryId);
            } catch (BKException bke) {
                logger.error("Encountered exceptions on reading [ {} - {} ] ",
                        new Object[] { startEntryId, endEntryId, bke });
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        if (null == entries) {
            break;
        }

        while (entries.hasMoreElements()) {
            LedgerEntry entry = entries.nextElement();
            readEntryListener.onEntryComplete(BKException.Code.OK, lh, entry, null);
        }

        entryId = endEntryId + 1;
    }

}
 
示例19
private void issueReadLastConfirmedAndEntry(final boolean parallel,
                                            final long lastAddConfirmed) {
    final String ctx = String.format("ReadLastConfirmedAndEntry(%s, %d)", parallel? "Parallel":"Sequential", lastAddConfirmed);
    final ReadLastConfirmedAndEntryCallbackWithNotification callback =
        new ReadLastConfirmedAndEntryCallbackWithNotification(lastAddConfirmed, this, ctx);
    boolean callbackImmediately = setMetadataNotification(callback);
    handleCache.asyncReadLastConfirmedAndEntry(
            currentLH,
            nextReadAheadPosition.getEntryId(),
            conf.getReadLACLongPollTimeout(),
            parallel
    ).addEventListener(new FutureEventListener<Pair<Long, LedgerEntry>>() {
        @Override
        public void onSuccess(Pair<Long, LedgerEntry> lacAndEntry) {
            callback.readLastConfirmedAndEntryComplete(
                    BKException.Code.OK,
                    lacAndEntry.getLeft(),
                    lacAndEntry.getRight(),
                    ctx);
        }

        @Override
        public void onFailure(Throwable cause) {
            callback.readLastConfirmedAndEntryComplete(
                    FutureUtils.bkResultCode(cause),
                    lastAddConfirmed,
                    null,
                    ctx);
        }
    });
    callback.callbackImmediately(callbackImmediately);
    readAheadReadLACAndEntryCounter.inc();
}
 
示例20
@Override
public void readLastConfirmedAndEntryComplete(int rc, long lac, LedgerEntry entry, Object ctx) {
    if (called.compareAndSet(false, true)) {
        // clear the notification when callback
        synchronized (notificationLock) {
            metadataNotification = null;
        }
        this.cb.readLastConfirmedAndEntryComplete(rc, lac, entry, ctx);
    }
}
 
示例21
protected void repairLogSegment(BookKeeperAdmin bkAdmin,
                                MetadataUpdater metadataUpdater,
                                LogSegmentMetadata segment) throws Exception {
    if (segment.isInProgress()) {
        System.out.println("Skip inprogress log segment " + segment);
        return;
    }
    LedgerHandle lh = bkAdmin.openLedger(segment.getLedgerId(), true);
    long lac = lh.getLastAddConfirmed();
    Enumeration<LedgerEntry> entries = lh.readEntries(lac, lac);
    if (!entries.hasMoreElements()) {
        throw new IOException("Entry " + lac + " isn't found for " + segment);
    }
    LedgerEntry lastEntry = entries.nextElement();
    Entry.Reader reader = Entry.newBuilder()
            .setLogSegmentInfo(segment.getLogSegmentSequenceNumber(), segment.getStartSequenceId())
            .setEntryId(lastEntry.getEntryId())
            .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(segment.getVersion()))
            .setInputStream(lastEntry.getEntryInputStream())
            .buildReader();
    LogRecordWithDLSN record = reader.nextRecord();
    LogRecordWithDLSN lastRecord = null;
    while (null != record) {
        lastRecord = record;
        record = reader.nextRecord();
    }
    if (null == lastRecord) {
        throw new IOException("No record found in entry " + lac + " for " + segment);
    }
    System.out.println("Updating last record for " + segment + " to " + lastRecord);
    if (!IOUtils.confirmPrompt("Do you want to make this change (Y/N): ")) {
        return;
    }
    metadataUpdater.updateLastRecord(segment, lastRecord);
}
 
示例22
/**
 * Process each record using LogRecordSelector.
 *
 * @param streamName
 *          fully qualified stream name (used for logging)
 * @param logSegmentSeqNo
 *          ledger sequence number
 * @param entry
 *          ledger entry
 * @param context
 *          scan context
 * @return log record with dlsn inside the ledger entry
 * @throws IOException
 */
private static void visitEntryRecords(
        String streamName,
        LogSegmentMetadata metadata,
        long logSegmentSeqNo,
        LedgerEntry entry,
        ScanContext context,
        LogRecordSelector selector) throws IOException {
    Entry.Reader reader = Entry.newBuilder()
            .setLogSegmentInfo(logSegmentSeqNo, metadata.getStartSequenceId())
            .setEntryId(entry.getEntryId())
            .setEnvelopeEntry(metadata.getEnvelopeEntries())
            .setInputStream(entry.getEntryInputStream())
            .buildReader();
    LogRecordWithDLSN nextRecord = reader.nextRecord();
    while (nextRecord != null) {
        LogRecordWithDLSN record = nextRecord;
        nextRecord = reader.nextRecord();
        context.numRecordsScanned.incrementAndGet();
        if (!context.includeControl && record.isControl()) {
            continue;
        }
        if (!context.includeEndOfStream && record.isEndOfStream()) {
            continue;
        }
        selector.process(record);
    }
}
 
示例23
@Test(timeout = 60000)
public void testBadVersionOnTwoAllocators() throws Exception {
    String allocationPath = "/allocation-bad-version";
    zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    Stat stat = new Stat();
    byte[] data = zkc.get().getData(allocationPath, false, stat);
    Versioned<byte[]> allocationData = new Versioned<byte[]>(data, new ZkVersion(stat.getVersion()));

    SimpleLedgerAllocator allocator1 =
            new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
    SimpleLedgerAllocator allocator2 =
            new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
    allocator1.allocate();
    // wait until allocated
    ZKTransaction txn1 = newTxn();
    LedgerHandle lh = FutureUtils.result(allocator1.tryObtain(txn1, NULL_LISTENER));
    allocator2.allocate();
    ZKTransaction txn2 = newTxn();
    try {
        FutureUtils.result(allocator2.tryObtain(txn2, NULL_LISTENER));
        fail("Should fail allocating on second allocator as allocator1 is starting allocating something.");
    } catch (ZKException zke) {
        assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode());
    }
    FutureUtils.result(txn1.execute());
    Utils.close(allocator1);
    Utils.close(allocator2);

    long eid = lh.addEntry("hello world".getBytes());
    lh.close();
    LedgerHandle readLh = bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
    Enumeration<LedgerEntry> entries = readLh.readEntries(eid, eid);
    int i = 0;
    while (entries.hasMoreElements()) {
        LedgerEntry entry = entries.nextElement();
        assertEquals("hello world", new String(entry.getEntry(), UTF_8));
        ++i;
    }
    assertEquals(1, i);
}
 
示例24
@Override
public boolean nextKeyValue()
        throws IOException, InterruptedException {
    LogRecordWithDLSN record;
    currentRecord = null;
    if (null != reader) {
        record = reader.nextRecord();
        if (null != record) {
            currentRecord = record;
            readPos = record.getPositionWithinLogSegment();
            return true;
        } else {
            return false;
        }
    }
    ++entryId;
    if (entryId > lh.getLastAddConfirmed()) {
        return false;
    }
    try {
        Enumeration<LedgerEntry> entries =
                lh.readEntries(entryId, entryId);
        if (entries.hasMoreElements()) {
            LedgerEntry entry = entries.nextElement();
            reader = Entry.newBuilder()
                    .setLogSegmentInfo(metadata.getLogSegmentSequenceNumber(),
                            metadata.getStartSequenceId())
                    .setEntryId(entry.getEntryId())
                    .setEnvelopeEntry(
                            LogSegmentMetadata.supportsEnvelopedEntries(metadata.getVersion()))
                    .deserializeRecordSet(true)
                    .setInputStream(entry.getEntryInputStream())
                    .buildReader();
        }
        return nextKeyValue();
    } catch (BKException e) {
        throw new IOException(e);
    }
}
 
示例25
@Override
public void run() {
    long lac = lh.getLastAddConfirmed();

    long entryId = 0L;

    while (entryId <= lac) {
        long startEntryId = entryId;
        long endEntryId = Math.min(startEntryId + batchSize - 1, lac);

        Enumeration<LedgerEntry> entries = null;
        while (null == entries) {
            try {
                entries = lh.readEntries(startEntryId, endEntryId);
            } catch (BKException bke) {
                logger.error("Encountered exceptions on reading [ {} - {} ] ",
                        new Object[] { startEntryId, endEntryId, bke });
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        if (null == entries) {
            break;
        }

        while (entries.hasMoreElements()) {
            LedgerEntry entry = entries.nextElement();
            readEntryListener.onEntryComplete(BKException.Code.OK, lh, entry, null);
        }

        entryId = endEntryId + 1;
    }

}
 
示例26
private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long from, long to) {
    CompletableFuture<Enumeration<LedgerEntry>> promise = new CompletableFuture<>();

    lh.asyncReadEntries(from, to,
                        (rc, _lh, seq, ctx) -> {
                            if (rc != BKException.Code.OK) {
                                promise.completeExceptionally(BKException.create(rc));
                            } else {
                                promise.complete(seq);
                            }
                        }, null);
    return promise.thenApply(
            (seq) -> {
                List<Entry> entries = new ArrayList<Entry>();
                while (seq.hasMoreElements()) {
                    ByteBuf buf = seq.nextElement().getEntryBuffer();
                    try (RawMessage m = RawMessageImpl.deserializeFrom(buf)) {
                        entries.add(EntryImpl.create(m.getMessageIdData().getLedgerId(),
                                                     m.getMessageIdData().getEntryId(),
                                                     m.getHeadersAndPayload()));
                    } finally {
                        buf.release();
                    }
                }
                return entries;
            });
}
 
示例27
static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle ledger, long entry) {
    final CompletableFuture<LedgerEntry> future = new CompletableFuture<>();
    ledger.asyncReadEntries(entry, entry,
        (rc, handle, entries, ctx) -> {
            if (rc != BKException.Code.OK) {
                future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId(), entry));
            } else {
                future.complete(entries.nextElement());
            }
        }, null
    );
    return future;
}
 
示例28
static CompletableFuture<SchemaStorageFormat.SchemaEntry> parseSchemaEntry(LedgerEntry ledgerEntry) {
    CompletableFuture<SchemaStorageFormat.SchemaEntry> result = new CompletableFuture<>();
    try {
        result.complete(SchemaStorageFormat.SchemaEntry.parseFrom(ledgerEntry.getEntry()));
    } catch (IOException e) {
        result.completeExceptionally(e);
    }
    return result;
}
 
示例29
private List<String> compactAndVerify(String topic, Map<String, byte[]> expected) throws Exception {
    BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
            this.conf, null, Optional.empty(), null);
    Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
    long compactedLedgerId = compactor.compact(topic).get();

    LedgerHandle ledger = bk.openLedger(compactedLedgerId,
                                        Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
                                        Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
    Assert.assertEquals(ledger.getLastAddConfirmed() + 1, // 0..lac
                        expected.size(),
                        "Should have as many entries as there is keys");

    List<String> keys = new ArrayList<>();
    Enumeration<LedgerEntry> entries = ledger.readEntries(0, ledger.getLastAddConfirmed());
    while (entries.hasMoreElements()) {
        ByteBuf buf = entries.nextElement().getEntryBuffer();
        RawMessage m = RawMessageImpl.deserializeFrom(buf);
        String key = extractKey(m);
        keys.add(key);

        ByteBuf payload = extractPayload(m);
        byte[] bytes = new byte[payload.readableBytes()];
        payload.readBytes(bytes);
        Assert.assertEquals(bytes, expected.remove(key),
                            "Compacted version should match expected version");
        m.close();
    }
    Assert.assertTrue(expected.isEmpty(), "All expected keys should have been found");
    return keys;
}
 
示例30
void release(LedgerEntry entry) {
    if (null != entry) {
        entry.getEntryBuffer().release();
    }
}