Java源码示例:io.nats.client.NUID
示例1
/**
* Properties-based constructor for StanBench.
*
* @param properties configuration properties
*/
public StanBench(Properties properties) {
urls = properties.getProperty("bench.stan.servers", urls);
clientId = properties.getProperty("bench.streaming.client.id", clientId);
clusterId = properties.getProperty("bench.stan.cluster.id", clusterId);
secure = Boolean.parseBoolean(
properties.getProperty("bench.stan.secure", Boolean.toString(secure)));
numMsgs = Integer.parseInt(
properties.getProperty("bench.stan.msg.count", Integer.toString(numMsgs)));
maxPubAcksInFlight = Integer.parseInt(properties.getProperty("bench.stan.pub.maxpubacks",
Integer.toString(maxPubAcksInFlight)));
size = Integer
.parseInt(properties.getProperty("bench.stan.msg.size", Integer.toString(numSubs)));
numPubs = Integer
.parseInt(properties.getProperty("bench.stan.pubs", Integer.toString(numPubs)));
numSubs = Integer
.parseInt(properties.getProperty("bench.stan.subs", Integer.toString(numSubs)));
csvFileName = properties.getProperty("bench.stan.csv.filename", null);
subject = properties.getProperty("bench.stan.subject", NUID.nextGlobal());
async = Boolean.parseBoolean(
properties.getProperty("bench.stan.pub.async", Boolean.toString(async)));
ignoreOld = Boolean.parseBoolean(
properties.getProperty("bench.stan.sub.ignoreold", Boolean.toString(ignoreOld)));
}
示例2
StreamingConnectionImpl(String clusterId, String clientId, Options opts) {
this.clusterId = clusterId;
this.clientId = clientId;
this.nuid = new NUID();
this.opts = opts;
this.connectionId = this.nuid.next();
if (opts == null) {
opts = new Options.Builder().build();
}
// Check if the user has provided a connection as an option
if (this.opts.getNatsConn() != null) {
setNatsConnection(this.opts.getNatsConn());
}
}
示例3
void onMessage(String subject, byte[] data) {
String payload = new String(data);
logger.info(String.format("Received message for %s: %s", subject, payload));
Message dstMsg = new Message();
dstMsg.setId(NUID.nextGlobal());
dstMsg.setPayload(payload);
messages.add(dstMsg);
}
示例4
@Test
public void testStanBenchProperties() {
try (NatsStreamingTestServer srv = new NatsStreamingTestServer(clusterId, false)) {
Properties props = new Properties();
String client = NUID.nextGlobal();
props.setProperty("bench.stan.servers", srv.getURI());
props.setProperty("bench.stan.cluster.id", "my_test_cluster");
props.setProperty("bench.streaming.client.id", client);
props.setProperty("bench.stan.secure", "false");
props.setProperty("bench.stan.msg.count", "100");
props.setProperty("bench.stan.msg.size", "0");
props.setProperty("bench.stan.secure", "false");
props.setProperty("bench.stan.pubs", "1");
props.setProperty("bench.stan.subs", "0");
props.setProperty("bench.stan.subject", "foo");
props.setProperty("bench.stan.pub.maxpubacks", "1000");
props.setProperty("bench.stan.sub.ignoreold", Boolean.toString(true));
props.setProperty("bench.streaming.async", Boolean.toString(true));
final StanBench bench = new StanBench(props);
try {
bench.run();
} catch (Exception e) {
fail(e.getMessage());
}
}
}
示例5
public Benchmark(String name) {
this(name, NUID.nextGlobal());
}
示例6
private String publish(String subject, byte[] data, AckHandler ah, BlockingQueue<String> ch)
throws IOException, InterruptedException, TimeoutException {
String subj;
String ackSubject;
Duration ackTimeout = opts.getAckTimeout();
BlockingQueue<PubAck> pac;
final AckClosure a= new AckClosure(ah, subject, (ah != null && ah.includeDataWithAck()) ? data : null, ch);
final PubMsg pe;
String guid;
byte[] bytes;
this.lock();
try {
if (getNatsConnection() == null) {
throw new IllegalStateException(NatsStreaming.ERR_CONNECTION_CLOSED);
}
subj = pubPrefix + "." + subject;
guid = NUID.nextGlobal();
PubMsg.Builder pb =
PubMsg.newBuilder().setClientID(clientId).setGuid(guid).setSubject(subject);
if (data != null) {
pb = pb.setData(ByteString.copyFrom(data));
}
pe = pb.build();
bytes = pe.toByteArray();
// Map ack to guid
pubAckMap.put(guid, a);
// snapshot
ackSubject = this.ackSubject;
pac = pubAckChan;
} finally {
this.unlock();
}
// Use the buffered channel to control the number of outstanding acks.
try {
pac.put(PubAck.getDefaultInstance());
} catch (InterruptedException e) {
// TODO: Reevaluate this.
// Eat this because you can't really do anything with it
}
nc.publish(subj, ackSubject, bytes);
// Setup the timer for expiration.
this.lock();
try {
a.ackTask = createAckTimerTask(guid);
ackTimer.schedule(a.ackTask, ackTimeout.toMillis());
} finally {
this.unlock();
}
return guid;
}
示例7
@Test
public void testStanBenchStringArray() {
try (NatsStreamingTestServer srv = new NatsStreamingTestServer(clusterId, false)) {
final String urls = srv.getURI();
final String clientId = NUID.nextGlobal();
final int count = 100;
final int numPubs = 1;
final int numSubs = 1;
final int msgSize = 256;
final boolean secure = false;
final boolean ignoreOld = true;
final boolean async = true;
final String subject = "foo";
List<String> argList = new ArrayList<String>();
argList.addAll(Arrays.asList("-s", urls));
argList.addAll(Arrays.asList("-c", clusterId));
argList.addAll(Arrays.asList("-id", clientId));
argList.addAll(Arrays.asList("-np", Integer.toString(numPubs)));
argList.addAll(Arrays.asList("-ns", Integer.toString(numSubs)));
argList.addAll(Arrays.asList("-n", Integer.toString(count)));
argList.addAll(Arrays.asList("-ms", Integer.toString(msgSize)));
if (secure) {
argList.add("-tls");
}
if (ignoreOld) {
argList.add("-io");
}
if (async) {
argList.add("-a");
}
argList.add(subject);
String[] args = new String[argList.size()];
args = argList.toArray(args);
final StanBench bench = new StanBench(args);
try {
bench.run();
} catch (Exception e) {
fail(e.getMessage());
}
}
}
示例8
public Benchmark(String name) {
this(name, NUID.nextGlobal());
}
示例9
NatsConnection(Options options) {
boolean trace = options.isTraceConnection();
timeTrace(trace, "creating connection object");
this.options = options;
this.statistics = new NatsStatistics(this.options.isTrackAdvancedStats());
this.statusLock = new ReentrantLock();
this.statusChanged = this.statusLock.newCondition();
this.status = Status.DISCONNECTED;
this.reconnectWaiter = new CompletableFuture<>();
this.reconnectWaiter.complete(Boolean.TRUE);
this.dispatchers = new ConcurrentHashMap<>();
this.subscribers = new ConcurrentHashMap<>();
this.responses = new ConcurrentHashMap<>();
this.serverAuthErrors = new HashMap<>();
this.nextSid = new AtomicLong(1);
long start = System.nanoTime();
this.nuid = new NUID();
if (trace) {
long seconds = TimeUnit.SECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
if (seconds > 1L) {
// If you see this trace check: https://github.com/nats-io/nats.java#linux-platform-note
timeTrace(trace, "NUID initialization took long: %d (s)", seconds);
}
}
this.mainInbox = createInbox() + ".*";
this.lastError = new AtomicReference<>();
this.connectError = new AtomicReference<>();
this.serverInfo = new AtomicReference<>();
this.inboxDispatcher = new AtomicReference<>();
this.pongQueue = new ConcurrentLinkedDeque<>();
this.draining = new AtomicReference<>();
this.blockPublishForDrain = new AtomicBoolean();
timeTrace(trace, "creating executors");
this.callbackRunner = Executors.newSingleThreadExecutor();
this.executor = options.getExecutor();
this.connectExecutor = Executors.newSingleThreadExecutor();
timeTrace(trace, "creating reader and writer");
this.reader = new NatsConnectionReader(this);
this.writer = new NatsConnectionWriter(this);
this.needPing = new AtomicBoolean(true);
timeTrace(trace, "connection object created");
}