public class RabbitMQTransactionDemo { private static final String QUEUE_NAME = "transaction_queue"; public static void main(String[] args) { try { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); // 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); try { // 开启事务 channel.txSelect(); // 发送消息 String message = "Hello, RabbitMQ!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 提交事务 channel.txCommit(); } catch (Exception e) { // 事务回滚 channel.txRollback(); e.printStackTrace(); } // 关闭信道和连接 channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } }}
public class RocketMQTransactionDemo { public static void main(String[] args) throws Exception { // 创建事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("group_name"); producer.setNamesrvAddr("localhost:9876"); // 设置事务监听器 producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 执行本地事务逻辑,根据业务逻辑结果返回相应的状态 // 返回 LocalTransactionState.COMMIT_MESSAGE 表示事务提交 // 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事务回滚 // 返回 LocalTransactionState.UNKNOW 表示事务状态未知 } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 根据消息的状态,来判断本地事务的最终状态 // 返回 LocalTransactionState.COMMIT_MESSAGE 表示事务提交 // 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事务回滚 // 返回 LocalTransactionState.UNKNOW 表示事务状态未知 } }); // 启动事务消息生产者 producer.start(); // 构造消息 Message msg = new Message("topic_name", "tag_name", "Hello, RocketMQ!".getBytes()); // 发送事务消息 TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println("Send Result: " + sendResult); // 关闭事务消息生产者 producer.shutdown(); }}
public class KafkaTransactionDemo { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id"); Producer<String, String> producer = new KafkaProducer<>(props); // 初始化事务 producer.initTransactions(); try { // 开启事务 producer.beginTransaction(); // 发送消息 ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", "Hello, Kafka!"); producer.send(record); // 提交事务 producer.commitTransaction(); } catch (ProducerFencedException e) { // 处理异常情况 producer.close(); } finally { producer.close(); } }}
RabbitMQ使用ACK(消息确认)机制来确保消息的可靠传递。消费者收到消息后,需要向RabbitMQ发送ACK来确认消息的处理状态。只有在收到ACK后,RabbitMQ才会将消息标记为已成功传递,否则会将消息重新投递给其他消费者或者保留在队列中。
以下是RabbitMQ ACK的Java示例:
public class RabbitMQAckDemo { public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); // 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel(); // 声明队列 String queueName = "queue_name"; channel.queueDeclare(queueName, false, false, false, null); // 创建消费者 String consumerTag = "consumer_tag"; boolean autoAck = false; // 关闭自动ACK // 消费消息 channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 消费消息 String message = new String(body, "UTF-8"); System.out.println("Received message: " + message); try { // 模拟处理消息的业务逻辑 processMessage(message); // 手动发送ACK确认消息 long deliveryTag = envelope.getDeliveryTag(); channel.basicAck(deliveryTag, false); } catch (Exception e) { // 处理消息异常,可以选择重试或者记录日志等操作 System.out.println("Failed to process message: " + message); e.printStackTrace(); // 手动发送NACK拒绝消息,并可选是否重新投递 long deliveryTag = envelope.getDeliveryTag(); boolean requeue = true; // 重新投递消息 channel.basicNack(deliveryTag, false, requeue); } } }); } private static void processMessage(String message) { // 模拟处理消息的业务逻辑 }}
RocketMQ的ACK机制由消费者控制,消费者从消息队列中消费消息后,可以手动发送ACK确认消息的处理状态。只有在收到ACK后,RocketMQ才会将消息标记为已成功消费,否则会将消息重新投递给其他消费者。
public class RocketMQAckDemo { public static void main(String[] args) throws Exception { // 创建消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name"); consumer.setNamesrvAddr("localhost:9876"); // 订阅消息 consumer.subscribe("topic_name", "*"); // 注册消息监听器 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt message : msgs) { try { // 消费消息 String msgBody = new String(message.getBody(), "UTF-8"); System.out.println("Received message: " + msgBody); // 模拟处理消息的业务逻辑 processMessage(msgBody); // 手动发送ACK确认消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 处理消息异常,可以选择重试或者记录日志等操作 System.out.println("Failed to process message: " + new String(message.getBody())); e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动消费者 consumer.start(); } private static void processMessage(String message) { // 模拟处理消息的业务逻辑 }}
Kafka的ACK机制用于控制生产者在发送消息后,需要等待多少个副本确认才视为消息发送成功。这个机制可以通过设置acks参数来进行配置。
下面是一个使用Java编写的Kafka生产者示例代码:
public class KafkaProducerDemo { public static void main(String[] args) { // 配置Kafka生产者的参数 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // Kafka集群的地址和端口 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器 props.put("acks", "all"); // 设置ACK机制为所有副本都确认 // 创建生产者实例 KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 构造消息 String topic = "my_topic"; String key = "my_key"; String value = "Hello, Kafka!"; // 创建消息记录 ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); // 发送消息 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("发送消息出现异常:" + exception.getMessage()); } else { System.out.println("消息发送成功!位于分区 " + metadata.partition() + ",偏移量 " + metadata.offset()); } } }); // 关闭生产者 producer.close(); }}
本文链接:http://www.28at.com/showinfo-26-10561-0.htmlMQ黄金三剑客 Rabbit Rocket Kafka深入解密常见问题及功能对比指南
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com
上一篇: Github的一个奇技淫巧,你学会了吗?
下一篇: 图解「正向代理」的原理 + 实践应用