?? opcconnector.java
字號:
package com.zcsoft.opc;
/**
* <p>Title: 現場總線通訊 </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2005-2006</p>
* <p>Company: Zhicheng Software&Service Co. Ltd.</p>
* @author 蔣智湘
* @version 1.0
*/
import java.util.*;
import java.net.*;
import java.io.*;
/**
* 同OPC控制中心應用程序(OpcCtrl.exe)進行控制通訊的連接類
* 使用方法示例:
* <pre>
* OpcConnector oc = new OpcConnector();
* oc.setProtocol("S7");
* oc.setTopic("demo");
* Item item1 = new Item();
* item1.setID("DB101,DBB1,1");
* Item item2 = new Item();
* item2.setID("DB101,DBB2,1");
* ...
* ItemGroup ig1 = new ItemGroup();
* ig1.setName("grp1");
* ig1.addItem(item1);
* ig1.addItem(item2);
* ...
* oc.addGroup(ig1);
* ...
* oc.connect();
* </pre>
* 當前版本支持的最大加入的變量組的個數上限為32765。對于組上的變量個數的上限也是32765。
*
*/
public class OpcConnector implements Runnable, CommandSent
{
/** 通信協議,如S7。該屬性值將作為實際變量名的一部分 */
private String protocol;
/** 通信主題,或應用程序名。該屬性值將作為實際變量名的一部分 */
private String topic;
/** 是否已經同OPC控制中心建立好了連接 */
private boolean connected = false;
/** 所有數據項組的集合 */
private List groups = new ArrayList(6);
/** OPC控制中心程序所在機器的IP地址 */
private InetAddress hostRequest;
/** OPC控制中心程序接收控制請求的TCP端口 */
private int portRequest = 8089;
/** 接收UDP報文的線程 */
private Thread threadRcv;
/** 接收OPC控制中心程序發送過來的UDP報文的套接字 */
private DatagramSocket socketRcvUDP;
/**
* 構造一個在8087端口上監聽UDP報文的實例
*/
public OpcConnector() throws SocketException
{
this(8087);
}
/**
*
* @param portReceive 該實例使用哪個本機端口接收控制中心發送過來的UDP通知報文
*/
public OpcConnector(int portReceive) throws SocketException
{
socketRcvUDP = new DatagramSocket(portReceive);
threadRcv = new Thread(this);
threadRcv.start();
}
public void run()
{
byte[] buffer;
try
{
buffer = new byte[Math.min(4096, socketRcvUDP.getReceiveBufferSize())];
}
catch (SocketException ex)
{
ex.printStackTrace();
return;
}
while (true)
{
try
{
DatagramPacket dp = new DatagramPacket(buffer, buffer.length);
socketRcvUDP.receive(dp);
int readed = dp.getLength();
int offset = dp.getOffset();
byte[] data = dp.getData();
byte firstByte = data[offset++];
if (firstByte == 'C')//最頻繁出現的數值變化通知
{
dataChanged(data, offset, readed - 1);
}
else if (firstByte == 'W')//異步寫完成
{
writeCompleted(data, offset, readed - 1);
}
else if (firstByte == 'R')//異步讀完成
{
dataChanged(data, offset, readed - 1);
}
else if(firstByte == 'E'//OPC控制中心退出
&& bytesABeginWithB(data, offset, new byte[]{'x', 'i', 't'}))
{
connected = false;
}
else if(firstByte == 'S'//OPC控制中心重新啟動了,就再連上它
&& bytesABeginWithB(data, offset, new byte[]{'t', 'a', 'r', 't', 'u', 'p'}))
{
try
{
connectGroups(groups.size(), true);
}
catch (IOException ex)
{
ex.printStackTrace();
}
}
else if (firstByte == 'N')//異步操作取消執行完成通知
{
}
else
{
System.err.println("Unknown packet");
}
}
catch (IOException ex)
{
System.err.println(ex);
break;
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
threadRcv = null;
}
/**
* 判定字節數組A的前B.length各字節同字節數組B的對應字節相同
*/
static boolean bytesABeginWithB(byte[] A, int offset, byte[] B)
{
if (A.length - offset < B.length) return false;
for (int i = 0; i < B.length; i++)
{
if (A[offset + i] != B[i]) return false;
}
return true;
}
public void setProtocol(String protocol)
{
checkStatus();
this.protocol = protocol;
}
public void setTopic(String topic)
{
checkStatus();
this.topic = topic;
}
/**
* 在連接前添加一個變量值實例。
*
* @param aGroup 不為null的ItemGroup實例
*/
public final void addGroup(ItemGroup aGroup)
{
checkStatus();
aGroup.cmdSentHook = this;
this.groups.add(aGroup);
}
private void checkStatus() throws IllegalStateException
{
if (connected)
{
throw new IllegalStateException("connected");
}
}
/**
* 建立本實例同OPC控制中心之間的連接。
* @throws IOException 連接過程可能出現的套接字IO異常
*/
public void connect() throws IOException
{
if (this.topic == null)
{
throw new IllegalStateException("Not specify topic");
}
if (this.protocol == null)
{
throw new IllegalStateException("Not specify protocol");
}
int cntGroups = groups.size();
if (cntGroups == 0)
{
throw new OpcException("none group");
}
ItemGroup ig;
int groupIndex;
//首先查詢組是否被加入
TcpConnection conn = getConnection();
conn.writeHeader("Q?" + DELIMETER + cntGroups);
for (groupIndex = 0; groupIndex < cntGroups; groupIndex++)
{
ig = (ItemGroup)groups.get(groupIndex);
conn.writeHeader(ig.getName());
}
//conn.flushHeanders();
String hdr = conn.readHeader();
int index = getResultCode(hdr);
int cntGroupsNoFounded = 0;
if (index == -1)
{
cntGroupsNoFounded = cntGroups;
}
else if (index == 0)
{
for (groupIndex = 0; (hdr = conn.readHeader()) != null && groupIndex < cntGroups; groupIndex++)
{
ig = (ItemGroup)groups.get(groupIndex);
ig.index = index = Integer.parseInt(hdr);
// debug(ig.getName() + "->index = " + index);
if (index < 0) ++cntGroupsNoFounded;//累加未加入的組
//else 記錄已經加入的組,準備調用刷新操作
}
}
else
{
conn.close();
throw new OpcException(hdr);
}
conn.close();
//如果存在已經加入的組,則刷新變量值
if (cntGroupsNoFounded != cntGroups)
{
refreshGroups();
}
//如果還有沒加入組,則嘗試加入
if (cntGroupsNoFounded > 0)
{
connectGroups(cntGroupsNoFounded, false);
}
connected = true;
}
public TcpConnection getConnection() throws IOException
{
if (hostRequest == null) hostRequest = InetAddress.getByName(null);
return new TcpConnection(hostRequest, portRequest);
}
/**
* 將變量組加到OPC控制中心
* @param cntGroupsToConnect 待連接的組的個數
* @param refresh 是否對那些已經確定索引值的組的索引值進行再確定
*/
void connectGroups(int cntGroupsToConnect, boolean refresh) throws IOException
{
TcpConnection conn = getConnection();
String header;
header = "AD" + DELIMETER + cntGroupsToConnect + DELIMETER + this.socketRcvUDP.getLocalPort();
conn.writeHeader(header);
//System.out.println(header);
String idPrefix = this.protocol + ":[" + this.topic + "]";
Item item;
int itemIndex, itemCount;
int groupIndex, cntGroups = groups.size();
ItemGroup ig;
for (groupIndex = 0; groupIndex < cntGroups; groupIndex++)
{
ig = (ItemGroup)groups.get(groupIndex);
if (!refresh && ig.index != -1) continue;
itemCount = ig.itemCount();
header = ig.getName() + DELIMETER + 'D'//(ig.isActive()?'A':'D')
//只能在加入后單獨設定激活狀態才有效。不然在這個方法還沒有執行完成前,就收到了dataChanged報文
//此時getGroupAt(int)很可能返回null
+ DELIMETER + itemCount
+ DELIMETER + ig.getUpdateRate();
conn.writeHeader(header);
//System.out.println("\t" + header);
for (itemIndex = 0; itemIndex < itemCount; itemIndex++)
{
item = ig.getItem(itemIndex);
header = idPrefix.concat(item.ID)
+ DELIMETER + (item.active?'A':'D')
+ DELIMETER + item.value.vt;
conn.writeHeader(header);
//System.out.println("\t\t" + header);
}
}
//conn.flushHeanders();
String hdr = conn.readHeader();
if (getResultCode(hdr) != 0)
{
conn.close();
throw new OpcException(hdr);
}
for (groupIndex = 0; groupIndex < cntGroups; groupIndex++)
{
ig = (ItemGroup)groups.get(groupIndex);
if (!refresh && ig.index != -1) continue;
hdr = conn.readHeader();
if (hdr != null)
{
ig.index = Integer.parseInt(hdr);
// debug(ig.getName() + ".index = " + ig.index);
}
}
conn.close();
//激活實際需要激活的變量組
activateGroups();
}
void activateGroups() throws IOException
{
int groupIndex, cntGroups = groups.size();
ItemGroup ig;
TcpConnection conn = getConnection();
conn.writeHeader("AC A" + DELIMETER + cntGroups);
for (groupIndex = 0; groupIndex < cntGroups; groupIndex++)
{
ig = (ItemGroup)groups.get(groupIndex);
if (ig.index >= 0 && ig.isActive())
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -