Kafka

prdp8dxp  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(320)

我正在寻找参考,在那里我可以得到一个简单的程序发送snmp陷阱到ApacheKafka主题使用ApacheCamel。
请帮助我,如果有人可以解释它使用简单的java程序。
我的routebuilder配置

import org.apache.camel.builder.RouteBuilder;

public class SimpleRouteBuilder extends RouteBuilder{

    @Override
    public void configure() throws Exception {

        String topicName = "topic=first_topic";
        String kafkaServer = "kafka:localhost:9092";
        String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181";
        String serializerClass = "serializerClass=kafka.serializer.StringEncoder";

        String toKafka = new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&")
                .append(zooKeeperHost).append("&").append(serializerClass).toString();
    System.out.println(toKafka);

    from("snmp:127.0.0.1:161?protocol=udp&type=POLL&oids=1.3.6.1.2.1.1.5.0").split().tokenize("\n").to(toKafka);
    }
}

主要方法

import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
import org.snmp4j.Snmp;

public class MainApp {

public static void main(String[] args) {
    SimpleRouteBuilder routeBuilder = new SimpleRouteBuilder();
    CamelContext ctx = new DefaultCamelContext();
    try {
        ctx.addRoutes(routeBuilder);
        ctx.start();
        Thread.sleep(5 * 60 * 1000);
        ctx.stop();
    }
    catch (Exception e) {
        e.printStackTrace();
    }

}
}
nwlqm0z1

nwlqm0z11#

我走错了方向。写入方向如下-
创建陷阱发送器程序。
创建陷阱接收器/侦听器程序。
在trap接收器或侦听器内部,接收trap并通过apachecamel将其发送给apachekafka主题。
pom.xml文件
添加以下依赖项-
Camel 核
snmp4j型
Camel Kafka
陷阱发送器程序

package <>;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.snmp4j.*;
import org.snmp4j.event.ResponseEvent;
import org.snmp4j.mp.MPv2c;
import org.snmp4j.mp.MPv3;
import org.snmp4j.mp.SnmpConstants;
import org.snmp4j.security.*;
import org.snmp4j.smi.*;
import org.snmp4j.transport.DefaultUdpTransportMapping;

import java.util.Date;

public class Trapsender {

public static final String community = "public";

public static final String Oid = ".1.3.6.1.2.1.1.8";
public static final String ipAddress = "127.0.0.1";
public static final int port = 162;

public static void main(String[] args) {
    Trapsender trapv3 = new Trapsender();
    trapv3.sendTrap_Version3();
}

public void sendTrap_Version3() {
    try {
        // Create Transport Mapping
        TransportMapping transport = new DefaultUdpTransportMapping();
        transport.listen();

        // Create Target
        CommunityTarget cTarget = new CommunityTarget();
        cTarget.setCommunity(new OctetString(community));
        cTarget.setVersion(SnmpConstants.version2c);
        cTarget.setAddress(new UdpAddress(ipAddress + "/" + port));
        cTarget.setRetries(2);
        cTarget.setTimeout(10000);

        // Create PDU for V3
        PDU pdu = new PDU();
        pdu.setType(PDU.TRAP);

        // need to specify the system up time
        pdu.add(new VariableBinding(SnmpConstants.sysUpTime, new OctetString(new Date().toString())));
        pdu.add(new VariableBinding(SnmpConstants.snmpTrapOID, new OID(Oid)));
        pdu.add(new VariableBinding(new OID(Oid), new OctetString("Major")));

        // Send the PDU
        Snmp snmp = new Snmp(transport);
        System.out.println("Sending V2 Trap... Check Wheather NMS is Listening or not? ");
        ResponseEvent send = snmp.send(pdu, cTarget);
                  snmp.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

使用apache camel的接收器陷阱

package <>;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.snmp4j.*;
 import org.snmp4j.mp.MPv1;
 import org.snmp4j.mp.MPv2c;
 import org.snmp4j.security.Priv3DES;
 import org.snmp4j.security.SecurityProtocols;
 import org.snmp4j.smi.OctetString;
 import org.snmp4j.smi.TcpAddress;
 import org.snmp4j.smi.TransportIpAddress;
 import org.snmp4j.smi.UdpAddress;
 import org.snmp4j.transport.AbstractTransportMapping;
 import org.snmp4j.transport.DefaultTcpTransportMapping;
 import org.snmp4j.transport.DefaultUdpTransportMapping;
 import org.snmp4j.util.MultiThreadedMessageDispatcher;
 import org.snmp4j.util.ThreadPool;

 import java.io.IOException;

 public class Trapreceiver implements CommandResponder {

public static CamelContext ctx=null;
public static ProducerTemplate producer=null;

public static void main(String[] args) {

   Trapreceiver snmp4jTrapReceiver = new Trapreceiver();
   SimpleRouteBuilder routeBuilder = new SimpleRouteBuilder();
   ctx = new DefaultCamelContext();
   producer = ctx.createProducerTemplate();
   try {
       ctx.addRoutes(routeBuilder);
       ctx.start();
   }
   catch (Exception e) {
       e.printStackTrace();
   }

  // producer.sendBody("direct:start", snmp);
    try {
        snmp4jTrapReceiver.listen(new UdpAddress("localhost/162"), producer);
    } catch (IOException e) {
        e.printStackTrace();
    }

}

/**
 * Trap Listner
 */
public synchronized void listen(TransportIpAddress address, ProducerTemplate producer)
        throws IOException {
    AbstractTransportMapping transport;
    if (address instanceof TcpAddress) {
        transport = new DefaultTcpTransportMapping((TcpAddress) address);
    } else {
        transport = new DefaultUdpTransportMapping((UdpAddress) address);
    }

    ThreadPool threadPool = ThreadPool.create("DispatcherPool", 10);
    MessageDispatcher mDispathcher = new MultiThreadedMessageDispatcher(
            threadPool, new MessageDispatcherImpl());

    // add message processing models
    mDispathcher.addMessageProcessingModel(new MPv1());
    mDispathcher.addMessageProcessingModel(new MPv2c());

    // add all security protocols
    SecurityProtocols.getInstance().addDefaultProtocols();
    SecurityProtocols.getInstance().addPrivacyProtocol(new Priv3DES());

    // Create Target
    CommunityTarget target = new CommunityTarget();
    target.setCommunity(new OctetString("public"));

    Snmp snmp = new Snmp(mDispathcher, transport);
    snmp.addCommandResponder(this);

    transport.listen();
    System.out.println("Listening on " + address);

    try {
        this.wait();
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}

/**
 * This method will be called whenever a pdu is received on the given port
 * specified in the listen() method
 */
public synchronized void processPdu(CommandResponderEvent cmdRespEvent) {
    System.out.println("Received PDU...");
    PDU pdu = cmdRespEvent.getPDU();
    if (pdu != null) {
        System.out.println("Trap Type = " + pdu.getType());
        System.out.println("Variables = " + pdu.getVariableBindings());
        producer.sendBody("direct:start","Variables = " + pdu.getVariableBindings() );
    }
}

}

相关问题