亚洲欧美第一页_禁久久精品乱码_粉嫩av一区二区三区免费野_久草精品视频

? 歡迎來到蟲蟲下載站! | ?? 資源下載 ?? 資源專輯 ?? 關于我們
? 蟲蟲下載站

?? collector.java

?? 接收網絡設備上NetFlow工具導出的NetFlow數據
?? JAVA
字號:
package cai.flow.collector;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.ResourceBundle;
import java.util.StringTokenizer;

import cai.flow.collector.interpretator.IpSegmentManager;
import cai.flow.packets.FlowPacket;
import cai.flow.packets.V1_Packet;
import cai.flow.packets.V5_Packet;
import cai.flow.packets.V7_Packet;
import cai.flow.packets.V8_Packet;
import cai.flow.packets.V9_Packet;
import cai.utils.DoneException;
import cai.utils.Params;
import cai.utils.Resources;
import cai.utils.ServiceThread;
import cai.utils.SuperString;
import cai.utils.Syslog;
import cai.utils.Util;

class Collector {

	static Resources resources;

	static InetAddress localHost;

	static int localPort;

	static int receiveBufferSize;

	static boolean[] isVersionEnabled;

	static int max_queue_length;

	static int collector_thread;

	static final int MAX_VERION = 9;

	static Hashtable routers;

	static {
		resources = new Resources("NetFlow");
		IpSegmentManager.getInstance();
		receiveBufferSize = resources.integer("net.receive.buffer.size");
		localPort = resources.integer("net.bind.port");

		String local = resources.get("net.bind.host");
		Params.v9TemplateOverwrite = resources
				.isTrue("flow.collector.V9.template.overwrite");

		Params.template_refreshFromHD = resources
				.isTrue("flow.collector.template.refreshFromHD");
		Params.ip2ipsConvert=resources.isTrue("flow.ip2ipsConvert");
		String ipSrcEx = resources.getAndTrim("ip.source.excludes");
		String ipSrcIn = resources.getAndTrim("ip.source.includes");
		StringTokenizer tknz = new StringTokenizer(ipSrcEx,",");
		Params.ipSrcExcludes=new long[tknz.countTokens()];
		int idxOfEx = 0;
		while(tknz.hasMoreElements()){
			long tmpl=Util.convertIPS2Long(tknz.nextToken());
			Params.ipSrcExcludes[idxOfEx++]=tmpl;//注意,非法地址會將所有地址打開--0.0.0.0
		}
		tknz=new StringTokenizer(ipSrcIn,",");
		Params.ipSrcIncludes=new long[tknz.countTokens()];
		int idxOfIn = 0;
		while(tknz.hasMoreElements()){
			long tmpl=Util.convertIPS2Long(tknz.nextToken());
			Params.ipSrcIncludes[idxOfIn++]=tmpl;//注意,非法地址會將所有地址打開--0.0.0.0
		}

		String ipDstEx = resources.getAndTrim("ip.dst.excludes");
		String ipDstIn = resources.getAndTrim("ip.dst.includes");
		tknz = new StringTokenizer(ipDstEx,",");
		Params.ipDstExcludes=new long[tknz.countTokens()];
		idxOfEx = 0;
		while(tknz.hasMoreElements()){
			long tmpl=Util.convertIPS2Long(tknz.nextToken());
			Params.ipDstExcludes[idxOfEx++]=tmpl;//注意,非法地址會將所有地址打開--0.0.0.0
		}
		tknz=new StringTokenizer(ipDstIn,",");
		Params.ipDstIncludes=new long[tknz.countTokens()];
		idxOfIn = 0;
		while(tknz.hasMoreElements()){
			long tmpl=Util.convertIPS2Long(tknz.nextToken());
			Params.ipDstIncludes[idxOfIn++]=tmpl;//注意,非法地址會將所有地址打開--0.0.0.0
		}

		if (local.equals("any"))
			localHost = null;
		else {
			try {
				localHost = InetAddress.getByName(local);
			} catch (UnknownHostException e) {
				localHost = null;
			}

			if (localHost == null)
				resources.error("unknown host `" + local + "'");
		}

		isVersionEnabled = new boolean[MAX_VERION];
		isVersionEnabled[0] = resources.isTrue("flow.collector.V1.enabled");
		isVersionEnabled[1] = false;
		isVersionEnabled[2] = false;
		isVersionEnabled[3] = false;
		isVersionEnabled[4] = resources.isTrue("flow.collector.V5.enabled");
		isVersionEnabled[5] = false;
		isVersionEnabled[6] = resources.isTrue("flow.collector.V7.enabled");
		isVersionEnabled[7] = resources.isTrue("flow.collector.V8.enabled");
		isVersionEnabled[8] = resources.isTrue("flow.collector.V9.enabled");

		max_queue_length = resources.integer("flow.collector.max_queue_length");
		collector_thread = resources.integer("flow.collector.collector.thread");

		if (collector_thread < 1)
			resources.error("key `" + collector_thread + "' bust be great one");

		routers = new Hashtable();

		ResourceBundle bundle = resources.getResourceBundle();
		String prefix = "flow.collector.router.group.";
		int prefix_len = prefix.length();

		for (Enumeration e = bundle.getKeys(); e.hasMoreElements();) {
			String entry = (String) e.nextElement();

			if (!entry.startsWith(prefix))
				continue;

			InetAddress router_group = null;
			boolean putted = false;

			try {
				router_group = InetAddress.getByName(entry
						.substring(prefix_len));
			} catch (UnknownHostException e1) {
				resources.error("unknown host `" + entry.substring(prefix_len)
						+ "' in `" + entry + "'");
			}

			String the_routers = (String) bundle.getString(entry);

			for (StringTokenizer st = new StringTokenizer(the_routers); st
					.hasMoreElements();) {
				String router_name = st.nextToken();
				InetAddress router = null;

				try {
					router = InetAddress.getByName(router_name);
				} catch (UnknownHostException e2) {
					resources.error("unknown host `" + router_name + "' in `"
							+ entry + "'");
				}

				routers.put(router, router_group);
				putted = true;
			}

			if (!putted)
				resources.error("key `" + the_routers
						+ "' -- no routers in group");
		}
	}

	Syslog syslog;

	LinkedList data_queue;

	Aggregate aggregator;

	long queued = 0, processed = 0;

	int sampleRate = 1;

	int stat_interval;

	public Collector() {
		sampleRate = resources.integer("sample.rate");
		if (sampleRate == 0) {
			sampleRate = 1;
		}
		byte logLevel = Syslog.translatePriority(resources
				.get("flow.collector.syslog.level"));
		byte logOptions = Syslog.translateOptions(resources
				.get("flow.collector.syslog.options"));
		short logFacility = Syslog.translateFacility(resources
				.get("flow.collector.syslog.facility"));

		stat_interval = resources
				.getInterval("flow.collector.statistics.interval");

		if (logLevel == Syslog.LOG_ILLEGAL_P)
			resources.error("illegal flow.collector.syslog.level value");

		if (logOptions == Syslog.LOG_ILLEGAL_O)
			resources.error("illegal flow.collector.syslog.options value");

		if (logFacility == Syslog.LOG_ILLEGAL_F)
			resources.error("illegal flow.collector.syslog.facility value");

		syslog = new Syslog("NetFlow", logOptions, logFacility);
		syslog.setlogmask(Syslog.LOG_UPTO(logLevel));
		syslog.syslog(Syslog.LOG_DEBUG, "Syslog created: " + syslog.toString());

		aggregator = new Aggregate(resources);// 所有的歸并線程和SQL
		data_queue = new LinkedList();
	}

	/**
	 * 創好庫以后,主要工作在這里
	 *
	 */
	void go() {
		/**
		 * 最高優先級,讀UDP包線程
		 */
		ServiceThread rdr = new ServiceThread(this, syslog, "Reader at "
				+ (localHost == null ? "any" : "" + localHost) + ":"
				+ localPort, "Reader") {
			public void exec() throws Throwable {
				((Collector) o).reader_loop();
			}
		};
		rdr.setPriority(Thread.MAX_PRIORITY);
		rdr.setDaemon(true);
		rdr.start();

		ServiceThread statistics;
		/**
		 * 統計線程僅僅做統計和log
		 */
		if (stat_interval != 0 && syslog.need(Syslog.LOG_NOTICE)) {
			statistics = new ServiceThread(this, syslog, "Statistics over "
					+ Util.toInterval(stat_interval), "Statistics") {
				public void exec() throws Throwable {
					((Collector) o).statistics_loop();
				}
			};

			statistics.setDaemon(true);
			statistics.start();
		}

		ServiceThread[] cols = new ServiceThread[collector_thread];

		for (int i = 0; i < collector_thread; i++) {
			String title = new String("Collector #" + (i + 1));
			ServiceThread col = new ServiceThread(this, syslog, title, title) {
				public void exec() {
					((Collector) o).collector_loop();
				}
			};

			cols[i] = col;
			col.start();
		}

		try {
			for (int i = 0; i < collector_thread; i++)
				cols[i].join();
		} catch (InterruptedException e) {
			syslog.syslog(Syslog.LOG_CRIT,
					"Collector - InterruptedException in main thread, exit");
		}
	}

	/**
	 * 統計線程的主方法
	 *
	 * @throws Throwable
	 */
	public void statistics_loop() throws Throwable {
		long start = System.currentTimeMillis();

		while (true) {
			try {
				Thread.sleep(stat_interval * 1000);
			} catch (InterruptedException e) {
			}

			long u = System.currentTimeMillis() - start;
			String s = "" + ((float) queued * 1000 / u);
			int i = s.indexOf('.') + 3;

			if (i < s.length())
				s = s.substring(0, i);

			syslog.syslog(Syslog.LOG_NOTICE, "Pkts " + queued + "/" + processed
					+ ", " + s + " pkts/sec, " + Util.uptime_short(u / 1000));
		}
	}

	// 僅僅做實驗
	// static DatagramPacket tmpPacket = null;
	// 僅僅做實驗
	SampleManager sampler = null;
	{
		sampler = new SampleManager(sampleRate);
	}

	/**
	 * 讀取UDP包
	 *
	 * @throws Throwable
	 */
	public void reader_loop() throws Throwable {
		DatagramSocket socket;

		try {
			try {
				socket = new DatagramSocket(localPort, localHost);
				socket.setReceiveBufferSize(receiveBufferSize);
			} catch (IOException exc) {
				syslog.syslog(Syslog.LOG_CRIT, "Reader - socket create error: "
						+ localHost + " - "
						+ SuperString.exceptionMsg(exc.toString()));
				throw exc;
			}

			while (true) {
				byte[] buf = new byte[2048];// 效率在這個地方可以提高
				DatagramPacket p = null;
				// 僅僅做實驗
				// if (tmpPacket!=null) {
				// System.out.println("直接從"+tmpPacket+"中取數據");
				// p = tmpPacket;
				// }
				// 僅僅做實驗
				if (p == null) {
					p = new DatagramPacket(buf, buf.length);

					try {
						socket.receive(p);
					} catch (IOException exc) {
						syslog.syslog(Syslog.LOG_CRIT,
								"Reader - socket read error: "
										+ SuperString.exceptionMsg(exc
												.toString()));
						exc.printStackTrace();
						put_to_queue(null);// 表示notifyAll
						break;
					}
				}
				// 僅僅做實驗
				// if (tmpPacket==null) {
				// tmpPacket=p;
				// }
				// Thread.sleep(1000);
				// 僅僅做實驗
				if (this.sampler.shouldDue()) {
					put_to_queue(p);
				}
				p = null;
			}
		} catch (Throwable e) {
			syslog.syslog(Syslog.LOG_CRIT,
					"Reader: exception, trying to abort collector");
			e.printStackTrace();
			put_to_queue(null);
			throw e;
		}
	}

	/**
	 * UDP包的緩存
	 *
	 * @param p
	 */
	void put_to_queue(final DatagramPacket p) {
		InetAddress router = p.getAddress();
		InetAddress group = (InetAddress) routers.get(router);

		if (group == null) {
			syslog.syslog(Syslog.LOG_ERR,
					"A packet from an unauthorized router " + router
							+ " is ignored");
			return;
		}

		syslog.syslog(Syslog.LOG_DEBUG, "Packet from " + router
				+ " is moved to group " + group);
		p.setAddress(group);// 把真實router的地址改成group的地址

		if (data_queue.size() > max_queue_length)
			syslog.syslog(Syslog.LOG_WARNING,
					"Reader - the queue is bigger then max_queue_length "
							+ data_queue.size() + "/" + max_queue_length);

		synchronized (data_queue) {
			data_queue.addLast(p);
			queued++;

			if (p == null)
				data_queue.notifyAll();// 如果出錯了,那么
			else
				data_queue.notify();// 喚醒
		}
	}

	/**
	 * 眾多采集線程的主方法
	 *
	 */
	void collector_loop() {
		boolean no_data = true;

		while (true) {
			Object p = null;

			synchronized (data_queue) {
				try {
					if (data_queue.getFirst() != null)
						p = data_queue.removeFirst();// 取出第一個UDP包

					no_data = false;
				} catch (NoSuchElementException ex) {
				}
			}

			if (no_data) {
				synchronized (data_queue) {
					try {
						data_queue.wait();// 等待被reader_loop notify
					} catch (InterruptedException e) {
					}
				}
			} else {
				no_data = true;

				if (p == null)// UDP出現了錯誤
					break;

				processPacket((DatagramPacket) p);
			}
		}
	}

	/**
	 * 處理一個UDP包,由多線程無狀態的并發
	 *
	 * @param p
	 */
	private synchronized void processPacket(final DatagramPacket p) {
		final byte[] buf = p.getData();
		int len = p.getLength();
		String addr = p.getAddress().getHostAddress().trim();
//                p.getAddress().getAddress();
		boolean need_log = syslog.need(Syslog.LOG_INFO);

		synchronized (data_queue) {
			processed++;
		}

		if (need_log)
			syslog.syslog(Syslog.LOG_INFO, addr + "("
					+ p.getAddress().getHostName() + ") " + len + " bytes");

		try {
			if (len < 2)
				throw new DoneException("  * too short packet *");

			short version = (short) Util.to_number(buf, 0, 2);

			if (version > MAX_VERION || version <= 0)
				throw new DoneException("  * unsupported version *");

			if (!isVersionEnabled[version - 1])
				throw new DoneException("  * version " + version
						+ " disabled *");

			if (need_log)
				syslog.syslog(Syslog.LOG_INFO, "  version: " + version);

			FlowPacket packet;

			switch (version) {
			case 1:
				packet = (FlowPacket) new V1_Packet(addr, buf, len);
				break;
			case 5:
				packet = (FlowPacket) new V5_Packet(addr, buf, len);
				break;
			case 7:
				packet = (FlowPacket) new V7_Packet(addr, resources, buf, len);
				break;
			case 8:
				packet = (FlowPacket) new V8_Packet(addr, buf, len);
				break;
			case 9:
				packet = (FlowPacket) new V9_Packet(addr, buf, len);
				break;
			default:
				syslog.syslog(Syslog.LOG_CRIT,
						"Collector - BUG: Version problem, version=" + version);
				return;
			}

			aggregator.process(packet);
		} catch (DoneException e) {
			e.printStackTrace();
			if (need_log)
				syslog.syslog(Syslog.LOG_INFO, e.toString());
		}
	}
}

?? 快捷鍵說明

復制代碼 Ctrl + C
搜索代碼 Ctrl + F
全屏模式 F11
切換主題 Ctrl + Shift + D
顯示快捷鍵 ?
增大字號 Ctrl + =
減小字號 Ctrl + -
亚洲欧美第一页_禁久久精品乱码_粉嫩av一区二区三区免费野_久草精品视频
久久久影院官网| 韩国av一区二区三区在线观看| 日韩精品成人一区二区三区| 国产风韵犹存在线视精品| 91行情网站电视在线观看高清版| 欧美成人官网二区| 亚洲aaa精品| 色综合中文字幕| 国产网站一区二区| 国内精品伊人久久久久av影院| 欧美亚洲综合一区| 中文字幕亚洲一区二区av在线| 久久精品国产一区二区三区免费看| 91麻豆文化传媒在线观看| 久久久午夜精品| 国内一区二区在线| 日韩欧美国产电影| 日本在线观看不卡视频| 欧美日韩国产一级| 亚洲高清在线视频| 欧美日韩一区二区在线观看 | 精品日韩一区二区三区免费视频| 一区二区三区在线视频免费| 高清不卡一区二区在线| 久久影音资源网| 精品一区二区三区香蕉蜜桃| 欧美一区二区人人喊爽| 亚洲成国产人片在线观看| 欧美在线观看一区二区| 亚洲乱码中文字幕综合| 99精品一区二区三区| 国产精品久久毛片av大全日韩| 高清不卡一二三区| 最新热久久免费视频| 国产成人午夜精品5599| 中文字幕国产一区二区| 99久久婷婷国产精品综合| 亚洲人成网站影音先锋播放| 在线免费亚洲电影| 偷拍自拍另类欧美| 精品精品欲导航| 国产在线精品不卡| 中文字幕第一页久久| 成人自拍视频在线观看| 1024亚洲合集| 欧美日韩一二三区| 久久精品国产亚洲一区二区三区| 精品电影一区二区| 99久免费精品视频在线观看| 亚洲五月六月丁香激情| 91精品欧美综合在线观看最新| 麻豆精品在线播放| 国产精品国产三级国产有无不卡 | 中文av字幕一区| 99国产精品久久| 天天做天天摸天天爽国产一区| 日韩三级电影网址| 成人av在线影院| 亚洲成a人片在线不卡一二三区| 4438成人网| 大胆欧美人体老妇| 亚洲成人在线网站| 国产午夜亚洲精品理论片色戒| 91视频在线看| 蜜臀av性久久久久蜜臀av麻豆| 国产欧美精品一区| 欧美男人的天堂一二区| 国产精品2024| 亚洲国产精品尤物yw在线观看| 日韩免费性生活视频播放| 波多野结衣亚洲一区| 日韩av高清在线观看| 中文字幕乱码久久午夜不卡 | 国产精品每日更新| 欧美精品777| a4yy欧美一区二区三区| 久久精品免费看| 一区二区三区视频在线看| 久久久亚洲精品一区二区三区| 欧美最猛黑人xxxxx猛交| 国产超碰在线一区| 久久精品999| 亚洲国产日韩av| 亚洲欧美怡红院| 久久免费的精品国产v∧| 欧美片网站yy| 色94色欧美sute亚洲13| 成人在线视频一区二区| 久久精品国产精品亚洲综合| 亚洲午夜私人影院| 亚洲欧美激情视频在线观看一区二区三区| 日韩精品自拍偷拍| 91麻豆精品国产自产在线 | 日韩欧美中文字幕制服| 欧洲一区在线电影| 99re热视频这里只精品| 国产成人精品午夜视频免费| 久久国产精品色| 午夜精品久久久久| 一区二区三区成人| 亚洲精品精品亚洲| 亚洲日穴在线视频| 国产精品乱码人人做人人爱| 国产网站一区二区| 国产日韩av一区| 久久免费美女视频| 久久久久久久久久久久久女国产乱| 91精品国产一区二区三区蜜臀 | 亚洲永久免费视频| 亚洲精品一二三| 亚洲美女电影在线| 成人欧美一区二区三区白人| 国产精品天美传媒| 国产欧美一区二区精品久导航 | 亚洲男人都懂的| 综合在线观看色| 亚洲免费av网站| 亚洲一区在线免费观看| 亚洲一区二区在线免费观看视频| 一区二区三区在线免费播放| 亚洲第一狼人社区| 麻豆91精品91久久久的内涵| 美女视频黄 久久| 国产一区二区三区四| 成人激情动漫在线观看| 91热门视频在线观看| 欧美在线视频不卡| 日韩精品专区在线| 欧美激情在线一区二区三区| 亚洲色图一区二区三区| 亚洲国产精品久久人人爱| 香蕉影视欧美成人| 国产一区二区在线观看视频| 成人网页在线观看| 欧美最猛黑人xxxxx猛交| 欧美一区二区三区爱爱| 国产亚洲成aⅴ人片在线观看| 国产欧美精品区一区二区三区| 国产精品成人一区二区三区夜夜夜| 亚洲免费观看高清完整版在线观看熊| 亚洲一级二级三级| 精品一区二区三区免费观看| jlzzjlzz国产精品久久| 欧美日韩电影一区| 欧美国产乱子伦| 五月婷婷激情综合| 粉嫩aⅴ一区二区三区四区 | 久久美女高清视频| 亚洲美女屁股眼交| 黄网站免费久久| 欧美性猛交xxxx黑人交| 欧美精品一区二区在线播放 | 欧美视频一区二区三区在线观看| 欧美一级免费观看| 日韩毛片一二三区| 美女mm1313爽爽久久久蜜臀| 成人黄色av电影| 91精品国产91热久久久做人人 | 久久婷婷国产综合精品青草| 中文字幕综合网| 国产乱人伦偷精品视频不卡| 欧美日韩日日摸| 国产精品欧美一级免费| 美国毛片一区二区| 欧洲精品一区二区| 国产精品欧美综合在线| 看电视剧不卡顿的网站| 一本一本久久a久久精品综合麻豆| 日韩欧美中文一区二区| 夜夜嗨av一区二区三区| 国产.精品.日韩.另类.中文.在线.播放| 在线中文字幕不卡| 国产精品免费视频观看| 国产一区二区视频在线播放| 欧美日韩国产美| 夜色激情一区二区| 91亚洲精品久久久蜜桃| 久久久国产精品不卡| 美女看a上一区| 欧美一区三区二区| 亚洲国产欧美日韩另类综合| 91影视在线播放| 欧美激情在线观看视频免费| 狠狠色丁香久久婷婷综合丁香| 欧美日本一道本| 一区二区三区.www| 一本大道av一区二区在线播放| 亚洲欧美一区二区视频| 成人97人人超碰人人99| 国产精品乱码人人做人人爱| 国产精品一区二区在线观看不卡| 欧美一级国产精品| 日韩成人午夜精品| 宅男噜噜噜66一区二区66| 日韩主播视频在线| 欧美一级日韩免费不卡| 蜜桃在线一区二区三区| 日韩精品一区二区三区swag| 精品综合久久久久久8888| 精品久久五月天|