RocketMQ 批量消息
为什么批量发送?
成批发送消息提高了传递消息的性能。
使用限制
同一批消息应该有:相同的主题、相同的waitStoreMsgOK并且不支持定时消息。
此外,一批消息的总大小不应超过1mb。
如何使用批量发送
如果你一次只发送不超过1mB的信息,那么批量发送很容易:
String topic = "BatchTest"; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); try { producer.send(messages); } catch (Exception e) { e.printStackTrace(); //handle the error }
拆分消息
只有当您发送大量批量消息的时候,复杂性才会增加,并且您可能不确定是否超出了大小限制( 1mB )。
此时,你最好用列表将消息拆分开:
public class ListSplitter implements Iterator<List<Message>> { private final int SIZE_LIMIT = 1000 * 1000; private final List<Message> messages; private int currIndex; public ListSplitter(List<Message> messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; //for log overhead if (tmpSize > SIZE_LIMIT) { //it is unexpected that single message exceeds the SIZE_LIMIT //here just let it go, otherwise it will block the splitting process if (nextIndex - currIndex == 0) { //if the next sublist has no element, add this one and then break, otherwise just break nextIndex++; } break; } if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } } //then you could split the large list into small ones: ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); //handle the error } }