Java源码示例:com.alibaba.rocketmq.store.config.StorePathConfigHelper
示例1
private boolean loadConsumeQueue() {
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(lStorePath));
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
for (File fileTopic : fileTopicList) {
String topic = fileTopic.getName();
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {
int queueId = Integer.parseInt(fileQueueId.getName());
ConsumeQueue logic = new ConsumeQueue(//
topic,//
queueId,//
StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),//
lSize,//
null);
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {
return false;
}
}
}
}
}
System.out.println("load logics queue all over, OK");
return true;
}
示例2
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentHashMap<Integer, ConsumeQueue> newMap =
new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
}
else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(//
topic,//
queueId,//
StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),//
lSize,//
null);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
}
else {
logic = newLogic;
}
}
return logic;
}
示例3
private String diskUtil() {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
String storePathIndex =
StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
}
示例4
public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
this.storePath =
StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
示例5
public void shutdown() {
if (!this.shutdown) {
this.shutdown = true;
this.scheduledExecutorService.shutdown();
try {
Thread.sleep(1000 * 3);
}
catch (InterruptedException e) {
log.error("shutdown Exception, ", e);
}
if (this.scheduleMessageService != null) {
this.scheduleMessageService.shutdown();
}
this.haService.shutdown();
this.storeStatsService.shutdown();
this.indexService.shutdown();
this.flushConsumeQueueService.shutdown();
this.commitLog.shutdown();
this.reputMessageService.shutdown();
this.allocateMapedFileService.shutdown();
this.storeCheckpoint.flush();
this.storeCheckpoint.shutdown();
if (this.runningFlags.isWriteable()) {
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
}
else {
log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
}
}
}
示例6
public void destroy() {
this.destroyLogics();
this.commitLog.destroy();
this.indexService.destroy();
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
}
示例7
@Override
public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
//获取commitlog文件夹占用磁盘资源百分百
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
}
{
String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
//consumeQueue文件夹占用磁盘资源百分比
result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
}
{ //获取存在于延时队列offset中还未到时间的消息的offset,以及把到时间到消息移到commitlog和consumeQueue队列后的offset获取出来
if (this.scheduleMessageService != null) {
this.scheduleMessageService.buildRunningStats(result);
}
}
//commilog目录文件的最小offset和最大offset
result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));
return result;
}
示例8
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
}
else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(//
topic,//
queueId,//
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),//
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
}
else {
logic = newLogic;
}
}
return logic;
}
示例9
private void createTempFile() throws IOException {
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
MapedFile.ensureDirOK(file.getParent());
boolean result = file.createNewFile();
log.info(fileName + (result ? " create OK" : " already exists"));
}
示例10
/**
*
[[email protected] topic-prod-xxxxxservice-xxxxx]# pwd
/data/store/consumequeue/topic-prod-xxxxxservice-xxxxx
[[email protected] topic-prod-xxxxxservice-xxxxx]# ls
0 1 2 3 4 5 6 7 8 9
[[email protected] topic-prod-xxxxxservice-xxxxx]# ls 0/
00000000000048000000 00000000000054000000 00000000000060000000 00000000000066000000 00000000000072000000 00000000000078000000 00000000000084000000 00000000000090000000 00000000000096000000 00000000000102000000
[[email protected] topic-prod-xxxxxservice-xxxxx]#
* 消费队列的目录结构
* /topic/queueid/mapfile 其中mapfile的文件名是?
* Consume Queue存储消息在Commit Log中的位置信息
* @return
*/
// DefaultMessageStore.loadConsumeQueue broker起来后,从/data/store/consumequeue路径读取对应topic中各个队列的commit log索引信息
private boolean loadConsumeQueue() {
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
for (File fileTopic : fileTopicList) {
String topic = fileTopic.getName();
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {
int queueId = Integer.parseInt(fileQueueId.getName());
ConsumeQueue logic = new ConsumeQueue(//
topic,//
queueId,//
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),//
this);
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {
return false;
}
}
}
}
}
log.info("load logics queue all over, OK");
return true;
}
示例11
private boolean loadConsumeQueue() {
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(lStorePath));
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
for (File fileTopic : fileTopicList) {
String topic = fileTopic.getName();
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {
int queueId = Integer.parseInt(fileQueueId.getName());
ConsumeQueue logic = new ConsumeQueue(//
topic,//
queueId,//
StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),//
lSize,//
null);
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {
return false;
}
}
}
}
}
System.out.println("load logics queue all over, OK");
return true;
}
示例12
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentHashMap<Integer, ConsumeQueue> newMap =
new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(//
topic,//
queueId,//
StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),//
lSize,//
null);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
示例13
private String diskUtil() {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
String storePathIndex =
StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
}
示例14
public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
this.storePath =
StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
示例15
/**
*/
public void shutdown() {
if (!this.shutdown) {
this.shutdown = true;
this.scheduledExecutorService.shutdown();
try {
Thread.sleep(1000 * 3);
} catch (InterruptedException e) {
log.error("shutdown Exception, ", e);
}
if (this.scheduleMessageService != null) {
this.scheduleMessageService.shutdown();
}
this.haService.shutdown();
this.storeStatsService.shutdown();
this.indexService.shutdown();
this.flushConsumeQueueService.shutdown();
this.commitLog.shutdown();
this.reputMessageService.shutdown();
this.allocateMapedFileService.shutdown();
this.storeCheckpoint.flush();
this.storeCheckpoint.shutdown();
if (this.runningFlags.isWriteable()) {
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
} else {
log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
}
}
}
示例16
public void destroy() {
this.destroyLogics();
this.commitLog.destroy();
this.indexService.destroy();
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
}
示例17
@Override
public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
}
{
String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
}
{
if (this.scheduleMessageService != null) {
this.scheduleMessageService.buildRunningStats(result);
}
}
result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));
return result;
}
示例18
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(//
topic, //
queueId, //
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
示例19
/**
*
* @throws IOException
*/
private void createTempFile() throws IOException {
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
MapedFile.ensureDirOK(file.getParent());
boolean result = file.createNewFile();
log.info(fileName + (result ? " create OK" : " already exists"));
}
示例20
private boolean loadConsumeQueue() {
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
for (File fileTopic : fileTopicList) {
String topic = fileTopic.getName();
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {
int queueId;
try {
queueId = Integer.parseInt(fileQueueId.getName());
}catch (NumberFormatException e) {
continue;
}
ConsumeQueue logic = new ConsumeQueue(//
topic, //
queueId, //
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
this);
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {
return false;
}
}
}
}
}
log.info("load logics queue all over, OK");
return true;
}
示例21
private boolean loadConsumeQueue() {
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(lStorePath));
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
// TOPIC 遍历
for (File fileTopic : fileTopicList) {
String topic = fileTopic.getName();
// TOPIC 下队列遍历
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {
int queueId = Integer.parseInt(fileQueueId.getName());
ConsumeQueue logic = new ConsumeQueue(//
topic, //
queueId, //
StorePathConfigHelper.getStorePathConsumeQueue(lStorePath), //
lSize, //
null);
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {
return false;
}
}
}
}
}
System.out.println("load logics queue all over, OK");
return true;
}
示例22
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentHashMap<Integer, ConsumeQueue> newMap =
new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
}
else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(//
topic, //
queueId, //
StorePathConfigHelper.getStorePathConsumeQueue(lStorePath), //
lSize, //
null);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
}
else {
logic = newLogic;
}
}
return logic;
}
示例23
private String diskUtil() {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
String storePathLogis = StorePathConfigHelper
.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
String storePathIndex = StorePathConfigHelper
.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
}
示例24
public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
this.storePath =
StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
示例25
/**
* 关闭存储服务
*/
public void shutdown() {
if (!this.shutdown) {
this.shutdown = true;
this.scheduledExecutorService.shutdown();
try {
// 等待其他调用停止
Thread.sleep(1000 * 3);
}
catch (InterruptedException e) {
log.error("shutdown Exception, ", e);
}
if (this.scheduleMessageService != null) {
this.scheduleMessageService.shutdown();
}
this.haService.shutdown();
this.storeStatsService.shutdown();
this.dispatchMessageService.shutdown();
this.indexService.shutdown();
this.flushConsumeQueueService.shutdown();
this.commitLog.shutdown();
this.allocateMapedFileService.shutdown();
if (this.reputMessageService != null) {
this.reputMessageService.shutdown();
}
this.storeCheckpoint.flush();
this.storeCheckpoint.shutdown();
// 正常关闭 删除存储根目录下的abort文件
this.deleteFile(
StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
}
}
示例26
public void destroy() {
this.destroyLogics();
this.commitLog.destroy();
this.indexService.destroy();
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
this.deleteFile(
StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
}
示例27
@Override
public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
// 检测物理文件磁盘空间
{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
}
// 检测逻辑文件磁盘空间
{
String storePathLogics = StorePathConfigHelper
.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
}
// 延时进度
{
if (this.scheduleMessageService != null) {
this.scheduleMessageService.buildRunningStats(result);
}
}
result.put(RunningStats.commitLogMinOffset.name(),
String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
result.put(RunningStats.commitLogMaxOffset.name(),
String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));
return result;
}
示例28
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentHashMap<Integer, ConsumeQueue> newMap =
new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
}
else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(//
topic, //
queueId, //
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
}
else {
logic = newLogic;
}
}
return logic;
}
示例29
/**
* 启动服务后,在存储根目录创建临时文件,类似于 UNIX VI编辑工具
*
* @throws IOException
*/
private void createTempFile() throws IOException {
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
MapedFile.ensureDirOK(file.getParent());
boolean result = file.createNewFile();
log.info(fileName + (result ? " create OK" : " already exists"));
}
示例30
private boolean loadConsumeQueue() {
File dirLogic = new File(
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
// TOPIC 遍历
for (File fileTopic : fileTopicList) {
String topic = fileTopic.getName();
// TOPIC 下队列遍历
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {
int queueId = Integer.parseInt(fileQueueId.getName());
ConsumeQueue logic = new ConsumeQueue(//
topic, //
queueId, //
StorePathConfigHelper
.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
this);
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {
return false;
}
}
}
}
}
log.info("load logics queue all over, OK");
return true;
}