Java源码示例:io.aeron.FragmentAssembler
示例1
public FixMessageLogger(
final Consumer<String> fixMessageConsumer,
final Aeron.Context context,
final String libraryAeronChannel,
final int inboundStreamId,
final int outboundStreamId,
final int outboundReplayStreamId)
{
aeron = Aeron.connect(context);
inboundSubscription = aeron.addSubscription(libraryAeronChannel, inboundStreamId);
outboundSubscription = aeron.addSubscription(libraryAeronChannel, outboundStreamId);
replaySubscription = aeron.addSubscription(libraryAeronChannel, outboundReplayStreamId);
final LogEntryHandler logEntryHandler = new LogEntryHandler((message, buffer, offset, length, header) ->
fixMessageConsumer.accept(message.body()));
fragmentAssembler = new FragmentAssembler(logEntryHandler);
}
示例2
static void consume(final Subscription subscription, final int count, final String prefix)
{
final MutableInteger received = new MutableInteger(0);
final FragmentHandler fragmentHandler = new FragmentAssembler(
(buffer, offset, length, header) ->
{
final String expected = prefix + received.value;
final String actual = buffer.getStringWithoutLengthAscii(offset, length);
assertEquals(expected, actual);
received.value++;
});
while (received.value < count)
{
if (0 == subscription.poll(fragmentHandler, FRAGMENT_LIMIT))
{
Tests.yield();
}
}
assertEquals(count, received.get());
}
示例3
@SuppressWarnings("PMD.NullAssignment")
public BenchServer() {
running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));
driver = EMBEDDED_MEDIA_DRIVER ? MediaDriver.launchEmbedded() : null;
ctx = new Aeron.Context();
if (EMBEDDED_MEDIA_DRIVER) {
ctx.aeronDirectoryName(driver.aeronDirectoryName());
}
fragmentHandler = new FragmentAssembler(this::onMessage);
aeron = Aeron.connect(ctx);
publication = aeron.addPublication(REP_CHAN, REP_STREAM_ID);
subscription = aeron.addSubscription(REQ_CHAN, REQ_STREAM_ID);
}
示例4
@SuppressWarnings("PMD.NullAssignment")
public BenchClient() {
driver = EMBEDDED_MEDIA_DRIVER ? MediaDriver.launchEmbedded() : null;
ctx = new Aeron.Context().availableImageHandler(this::imageHandler);
if (EMBEDDED_MEDIA_DRIVER) {
ctx.aeronDirectoryName(driver.aeronDirectoryName());
}
fragmentHandler = new FragmentAssembler(this::onMessage);
aeron = Aeron.connect(ctx);
publication = aeron.addPublication(REQ_CHAN, REQ_STREAM_ID);
subscription = aeron.addSubscription(REP_CHAN, REP_STREAM_ID);
}
示例5
public static void main(final String[] args)
{
System.out.format("Subscribing to %s on stream ID %d and stream ID %d%n",
CHANNEL, STREAM_ID_1, STREAM_ID_2);
final Aeron.Context ctx = new Aeron.Context()
.availableImageHandler(MultipleSubscribersWithFragmentAssembly::eventAvailableImage)
.unavailableImageHandler(MultipleSubscribersWithFragmentAssembly::eventUnavailableImage);
final FragmentAssembler dataHandler1 = new FragmentAssembler(reassembledStringMessage1(STREAM_ID_1));
final FragmentAssembler dataHandler2 = new FragmentAssembler(reassembledStringMessage2(STREAM_ID_2));
final AtomicBoolean running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));
try (Aeron aeron = Aeron.connect(ctx);
Subscription subscription1 = aeron.addSubscription(CHANNEL, STREAM_ID_1);
Subscription subscription2 = aeron.addSubscription(CHANNEL, STREAM_ID_2))
{
final IdleStrategy idleStrategy = new BackoffIdleStrategy(
100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100));
int idleCount = 0;
while (running.get())
{
final int fragmentsRead1 = subscription1.poll(dataHandler1, FRAGMENT_COUNT_LIMIT);
final int fragmentsRead2 = subscription2.poll(dataHandler2, FRAGMENT_COUNT_LIMIT);
if ((fragmentsRead1 + fragmentsRead2) == 0)
{
idleStrategy.idle(idleCount++);
}
else
{
idleCount = 0;
}
}
System.out.println("Shutting down...");
}
}
示例6
private static void runPing(final Aeron aeron) throws InterruptedException
{
System.out.println("Publishing Ping at " + PING_CHANNEL + " on stream id " + PING_STREAM_ID);
System.out.println("Subscribing Pong at " + PONG_CHANNEL + " on stream id " + PONG_STREAM_ID);
System.out.println("Message payload length of " + MESSAGE_LENGTH + " bytes");
System.out.println("Using exclusive publications: " + EXCLUSIVE_PUBLICATIONS);
final FragmentAssembler dataHandler = new FragmentAssembler(EmbeddedPingPong::pongHandler);
try (Subscription pongSubscription = aeron.addSubscription(
PONG_CHANNEL, PONG_STREAM_ID, EmbeddedPingPong::availablePongImageHandler, null);
Publication pingPublication = EXCLUSIVE_PUBLICATIONS ?
aeron.addExclusivePublication(PING_CHANNEL, PING_STREAM_ID) :
aeron.addPublication(PING_CHANNEL, PING_STREAM_ID))
{
System.out.println("Waiting for new image from Pong...");
PONG_IMAGE_LATCH.await();
System.out.format("Warming up... %d iterations of %,d messages%n",
WARMUP_NUMBER_OF_ITERATIONS, WARMUP_NUMBER_OF_MESSAGES);
for (int i = 0; i < WARMUP_NUMBER_OF_ITERATIONS; i++)
{
roundTripMessages(dataHandler, pingPublication, pongSubscription, WARMUP_NUMBER_OF_MESSAGES);
Thread.yield();
}
Thread.sleep(100);
final ContinueBarrier barrier = new ContinueBarrier("Execute again?");
do
{
HISTOGRAM.reset();
System.out.format("Pinging %,d messages%n", NUMBER_OF_MESSAGES);
roundTripMessages(dataHandler, pingPublication, pongSubscription, NUMBER_OF_MESSAGES);
System.out.println("Histogram of RTT latencies in microseconds.");
HISTOGRAM.outputPercentileDistribution(System.out, 1000.0);
}
while (barrier.await());
}
}
示例7
/**
* Launch a background thread
* that subscribes to the aeron context
* @throws Exception
*/
public void launch() throws Exception {
if (init.get())
return;
// Register a SIGINT handler for graceful shutdown.
if (!init.get())
init();
log.info("Subscribing to " + channel + " on stream Id " + streamId);
// Register a SIGINT handler for graceful shutdown.
SigInt.register(() -> running.set(false));
// Create an Aeron instance with client-provided context configuration, connect to the
// media driver, and add a subscription for the given channel and stream using the supplied
// dataHandler method, which will be called with new messages as they are received.
// The Aeron and Subscription classes implement AutoCloseable, and will automatically
// clean up resources when this try block is finished.
//Note here that we are either creating 1 or 2 subscriptions.
//The first one is a normal 1 subscription listener.
//The second one is when we want to send responses
boolean started = false;
int numTries = 0;
while (!started && numTries < 3) {
try {
try (final Subscription subscription = aeron.addSubscription(channel, streamId)) {
log.info("Beginning subscribe on channel " + channel + " and stream " + streamId);
AeronUtil.subscriberLoop(new FragmentAssembler(NDArrayResponseFragmentHandler.builder().aeron(aeron)
.context(ctx).streamId(responseStreamId).holder(ndArrayHolder).build()),
fragmentLimitCount, running, launched).accept(subscription);
started = true;
}
} catch (Exception e) {
numTries++;
log.warn("Failed to connect..trying again", e);
}
}
if (numTries >= 3)
throw new IllegalStateException("Was unable to start responder after " + numTries + "tries");
}
示例8
/**
* Launch a background thread
* that subscribes to the aeron context
* @throws Exception
*/
public void launch() throws Exception {
if (init.get())
return;
// Register a SIGINT handler for graceful shutdown.
if (!init.get())
init();
log.info("Subscribing to " + channel + " on stream Id " + streamId);
log.info("Using aeron directory " + ctx.aeronDirectoryName());
// Register a SIGINT handler for graceful shutdown.
SigInt.register(() -> running.set(false));
// Create an Aeron instance with client-provided context configuration, connect to the
// media driver, and add a subscription for the given channel and stream using the supplied
// dataHandler method, which will be called with new messages as they are received.
// The Aeron and Subscription classes implement AutoCloseable, and will automatically
// clean up resources when this try block is finished.
//Note here that we are either creating 1 or 2 subscriptions.
//The first one is a normal 1 subscription listener.
//The second one is when we want to send responses
if (channel == null)
throw new IllegalStateException("No channel for subscriber defined");
if (streamId <= 0)
throw new IllegalStateException("No stream for subscriber defined");
if (aeron == null)
throw new IllegalStateException("No aeron instance defined");
boolean started = false;
while (!started) {
try (final Subscription subscription = aeron.addSubscription(channel, streamId)) {
this.subscription = subscription;
log.info("Beginning subscribe on channel " + channel + " and stream " + streamId);
AeronUtil.subscriberLoop(new FragmentAssembler(new NDArrayFragmentHandler(ndArrayCallback)),
fragmentLimitCount, running, launched).accept(subscription);
started = true;
} catch (Exception e) {
log.warn("Unable to connect...trying again on channel " + channel, e);
}
}
}
示例9
/**
* Launch a background thread
* that subscribes to the aeron context
* @throws Exception
*/
public void launch() throws Exception {
if (init.get())
return;
// Register a SIGINT handler for graceful shutdown.
if (!init.get())
init();
log.info("Subscribing to " + channel + " on stream Id " + streamId);
// Register a SIGINT handler for graceful shutdown.
SigInt.register(() -> running.set(false));
// Create an Aeron instance with client-provided context configuration, connect to the
// media driver, and add a subscription for the given channel and stream using the supplied
// dataHandler method, which will be called with new messages as they are received.
// The Aeron and Subscription classes implement AutoCloseable, and will automatically
// clean up resources when this try block is finished.
//Note here that we are either creating 1 or 2 subscriptions.
//The first one is a normal 1 subscription listener.
//The second one is when we want to send responses
boolean started = false;
int numTries = 0;
while (!started && numTries < 3) {
try {
try (final Subscription subscription = aeron.addSubscription(channel, streamId)) {
log.info("Beginning subscribe on channel " + channel + " and stream " + streamId);
AeronUtil.subscriberLoop(new FragmentAssembler(NDArrayResponseFragmentHandler.builder().aeron(aeron)
.context(ctx).streamId(responseStreamId).holder(ndArrayHolder).build()),
fragmentLimitCount, running, launched).accept(subscription);
started = true;
}
} catch (Exception e) {
numTries++;
log.warn("Failed to connect..trying again", e);
}
}
if (numTries >= 3)
throw new IllegalStateException("Was unable to start responder after " + numTries + "tries");
}
示例10
/**
* Launch a background thread
* that subscribes to the aeron context
* @throws Exception
*/
public void launch() throws Exception {
if (init.get())
return;
// Register a SIGINT handler for graceful shutdown.
if (!init.get())
init();
log.info("Subscribing to " + channel + " on stream Id " + streamId);
log.info("Using aeron directory " + ctx.aeronDirectoryName());
// Register a SIGINT handler for graceful shutdown.
SigInt.register(() -> running.set(false));
// Create an Aeron instance with client-provided context configuration, connect to the
// media driver, and add a subscription for the given channel and stream using the supplied
// dataHandler method, which will be called with new messages as they are received.
// The Aeron and Subscription classes implement AutoCloseable, and will automatically
// clean up resources when this try block is finished.
//Note here that we are either creating 1 or 2 subscriptions.
//The first one is a normal 1 subscription listener.
//The second one is when we want to send responses
if (channel == null)
throw new IllegalStateException("No channel for subscriber defined");
if (streamId <= 0)
throw new IllegalStateException("No stream for subscriber defined");
if (aeron == null)
throw new IllegalStateException("No aeron instance defined");
boolean started = false;
while (!started) {
try (final Subscription subscription = aeron.addSubscription(channel, streamId)) {
this.subscription = subscription;
log.info("Beginning subscribe on channel " + channel + " and stream " + streamId);
AeronUtil.subscriberLoop(new FragmentAssembler(new NDArrayFragmentHandler(ndArrayCallback)),
fragmentLimitCount, running, launched).accept(subscription);
started = true;
} catch (Exception e) {
log.warn("Unable to connect...trying again on channel " + channel, e);
}
}
}