Changeset 41


Ignore:
Timestamp:
Aug 31, 2012 3:09:57 PM (5 years ago)
Author:
psaiteja
Message:

new push server with minor changes

File:
1 edited

Legend:

Unmodified
Added
Removed
  • VSNPushServer/src/PushServer.java

    r22 r41  
    1414public class PushServer extends Thread 
    1515{ 
    16         public static final int DEFAULT_PORT = 8080; 
     16        public static final int DEFAULT_PORT = 8081; 
     17        public static final int DEFAULT_UDP_PORT = 8091; 
    1718         
    1819        private ServerSocket server = null; 
     
    2526        private float alpha = 1; 
    2627         
     28        private int UDPport = DEFAULT_UDP_PORT; 
     29        UDPServerThread udpthread; 
     30         
    2731        /* here's a main method, in case you want to run this by itself */ 
    2832        public static void main (String args[]) 
    2933        { 
    3034                int port = 5556; 
     35                int udp_port = 5601; 
    3136                 
    3237                // create and start the jProxy thread, using a 20 second timeout 
    3338                // value to keep the threads from piling up too much 
    3439                System.err.println("  **  Starting Server on port " + port + ". Press CTRL-C to end.  **\n"); 
    35                 PushServer jp = new PushServer(port, 5); 
     40                PushServer jp = new PushServer(port,udp_port, 5); 
    3641                jp.setDebug(1, System.out);             // or set the debug level to 2 for tons of output 
    3742                jp.start(); 
     
    5964         */ 
    6065 
    61         public PushServer (int port) 
     66        public PushServer (int port, int udp_port) 
    6267        { 
    6368                thisPort = port; 
    64         } 
    65          
    66         public PushServer (int port, int timeout) 
     69                UDPport = udp_port; 
     70        } 
     71         
     72        public PushServer (int port, int udp_port, int timeout) 
    6773        { 
    6874                thisPort = port; 
     75                UDPport = udp_port; 
    6976                ptTimeout = timeout; 
    7077        } 
     
    8794                return thisPort; 
    8895        } 
    89           
     96         
     97        public int getUDPPort() 
     98        { 
     99                return UDPport; 
     100        } 
    90101         
    91102        /* return whether or not the socket is currently open 
     
    108119                        // close the open server socket 
    109120                        server.close(); 
    110                         // send it a message to make it stop waiting immediately 
    111                         // (not really necessary) 
    112                         /*Socket s = new Socket("localhost", thisPort); 
    113                         OutputStream os = s.getOutputStream(); 
    114                         os.write((byte)0); 
    115                         os.close(); 
    116                         s.close();*/ 
    117121                }  catch(Exception e)  {  
    118122                        if (debugLevel > 0) 
     
    187191                                        System.out.println("Client database Table does not exist"); 
    188192                                        Statement stmt = conn.createStatement(); 
    189                                         stmt.executeUpdate("CREATE TABLE ClientDatabase (IP VARCHAR(16) NOT NULL PRIMARY KEY, Added TIMESTAMP NOT NULL)"); 
     193                                        stmt.executeUpdate("CREATE TABLE ClientDatabase (IP VARCHAR(16) NOT NULL PRIMARY KEY, Port INT DEFAULT 9000, Added TIMESTAMP NOT NULL)"); 
    190194                                        Statement stmt2 = conn.createStatement(); 
    191195                                        stmt2.executeUpdate("CREATE INDEX IPIndex on ClientDatabase(IP ASC)"); 
     
    196200                                        conn = DriverManager.getConnection("jdbc:derby:serverdatabase;create=true"); 
    197201                                        Statement stmt = conn.createStatement(); 
    198                                         stmt.executeUpdate("CREATE TABLE ClientDatabase (IP VARCHAR(16) NOT NULL PRIMARY KEY, Added TIMESTAMP NOT NULL)"); 
     202                                        stmt.executeUpdate("CREATE TABLE ClientDatabase (IP VARCHAR(16) NOT NULL PRIMARY KEY, Port INT DEFAULT 9000, Added TIMESTAMP NOT NULL)"); 
    199203                                        Statement stmt2 = conn.createStatement(); 
    200204                                        stmt2.executeUpdate("CREATE INDEX IPIndex on ClientDatabase(IP ASC)"); 
     
    208212                                debugOut.println("Started server on port " + thisPort); 
    209213                         
    210                          
     214                        try{ 
     215                                System.out.println("Starting UDP server"); 
     216                                udpthread = new UDPServerThread(UDPport,conn); 
     217                                udpthread.start(); 
     218                        } 
     219                        catch(SocketException e){ 
     220                                System.out.println("Could not create UDP server socket"); 
     221                        } 
     222                        catch(Exception e){ 
     223                                e.printStackTrace(); 
     224                        } 
    211225                         
    212226                        while (true) 
     
    220234                                                System.out.println("Client entry exists -  IP: " + rs.getString(1)); 
    221235                                        } else { 
    222                                                 PreparedStatement psInsert = conn.prepareStatement("insert into ClientDatabase values (?,?)"); 
     236                                                PreparedStatement psInsert = conn.prepareStatement("insert into ClientDatabase values (?,?,?)"); 
    223237 
    224238                                                psInsert.setString(1, clientip); 
     239                                                psInsert.setInt(2, DEFAULT_UDP_PORT); 
    225240                                                java.sql.Timestamp currentTimestamp = new java.sql.Timestamp(Calendar.getInstance().getTime().getTime()); 
    226                                                 psInsert.setTimestamp(2, currentTimestamp); 
     241                                                psInsert.setTimestamp(3, currentTimestamp); 
    227242 
    228243                                                psInsert.executeUpdate(); 
     244                                                 
     245                                                udpthread.sendoldhashes(clientip, DEFAULT_UDP_PORT); 
    229246                                        } 
    230247                                        rs.close(); 
     
    232249                                        e.printStackTrace(); 
    233250                                } 
    234                                 PullServerProxyThread t = new PullServerProxyThread(client, conn); 
     251                                PullServerProxyThread t = new PullServerProxyThread(client, conn, udpthread); 
    235252                                t.setDebug(debugLevel, debugOut); 
    236253                                t.setTimeout(ptTimeout); 
     
    247264} 
    248265 
    249  
     266class UDPServerThread extends Thread 
     267{ 
     268        public static final int DEFAULT_UDP_PORT = 9000; 
     269        private int UDPport = DEFAULT_UDP_PORT; 
     270        private DatagramSocket UDPserver = null; 
     271         
     272        private Connection database_conn; 
     273         
     274        public UDPServerThread(int port, Connection conn) throws SocketException{ 
     275                UDPport = port; 
     276                database_conn = conn; 
     277                 
     278                UDPserver = new DatagramSocket(UDPport); 
     279        } 
     280         
     281        public int getUDPPort() 
     282        { 
     283                return UDPport; 
     284        } 
     285         
     286        public void closeserver () 
     287        { 
     288                try { 
     289                        UDPserver.close(); 
     290                }  catch(Exception e)  {  
     291                        e.printStackTrace(); 
     292                }        
     293                UDPserver = null; 
     294        } 
     295         
     296        public void run(){ 
     297                byte[] receiveData;  
     298             
     299                while(true){ 
     300                        receiveData = new byte[1024]; 
     301                        DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); 
     302                        try{ 
     303                                UDPserver.receive(receivePacket); 
     304                                String IPAddress = new String(receivePacket.getAddress().getAddress());  
     305                                int port = receivePacket.getPort();  
     306                                 
     307                                try {            
     308                                        Statement stmt2 = database_conn.createStatement(); 
     309                                        ResultSet rs = stmt2.executeQuery("select * from ClientDatabase where IP='"+IPAddress+"'"); 
     310                                        if (! rs.next()) { 
     311                                                PreparedStatement psInsert = database_conn.prepareStatement("insert into ClientDatabase values (?,?,?)"); 
     312 
     313                                                psInsert.setString(1, IPAddress); 
     314                                                psInsert.setInt(2, port); 
     315                                                java.sql.Timestamp currentTimestamp = new java.sql.Timestamp(Calendar.getInstance().getTime().getTime()); 
     316                                                psInsert.setTimestamp(3, currentTimestamp); 
     317 
     318                                                psInsert.executeUpdate(); 
     319                                                 
     320                                                sendoldhashes(IPAddress, port); 
     321                                        } 
     322                                        rs.close(); 
     323                                } catch (SQLException e) { 
     324                                        System.out.println("Unable to add client details for the UDP packet with IP:"+IPAddress+" port:"+port); 
     325                                        e.printStackTrace(); 
     326                                } 
     327                        } 
     328                        catch(IOException e){ 
     329                                System.out.println("Unable to receive UDP packet"); 
     330                                e.printStackTrace(); 
     331                        } 
     332                } 
     333        } 
     334         
     335        public void sendoldhashes(String ip, int port){ 
     336                try {            
     337                        Statement stmt2 = database_conn.createStatement(); 
     338                        ResultSet rs = stmt2.executeQuery("select * from ServerHashlist"); 
     339                        int num = 0; 
     340                        int MAX_per_packet = 30; //considering 42 bytes (2 sha1 digests+tab+newline) per hash entry, and max 1400 MTU for the UDP packet 
     341                        String sendhashdata = ""; 
     342                        while (rs.next()) { 
     343                                System.out.println("ENTRY EXISTS -  UrlHash: " + rs.getString(1) 
     344                                                + " ObjectHash:" + rs.getString(2) ); 
     345                                sendhashdata += rs.getString(1)+"\t"+rs.getString(2)+"\n"; 
     346                                num++; 
     347                                if(num==MAX_per_packet){ 
     348                                        byte [] sendData = sendhashdata.getBytes(); 
     349                                        DatagramPacket sendPacket = new DatagramPacket(sendData,sendData.length,InetAddress.getByAddress(ip.getBytes()),port); 
     350                                        UDPserver.send(sendPacket); 
     351                                        sendhashdata = ""; 
     352                                        num = 0; 
     353                                } 
     354                        } 
     355                        if(num>0){ 
     356                                byte [] sendData = sendhashdata.getBytes(); 
     357                                DatagramPacket sendPacket = new DatagramPacket(sendData,sendData.length,InetAddress.getByAddress(ip.getBytes()),port); 
     358                                UDPserver.send(sendPacket); 
     359                                sendhashdata = ""; 
     360                                num = 0; 
     361                        } 
     362                         
     363                        rs.close(); 
     364                } catch (Exception e) { 
     365                        System.out.println("Exception in sending old hashes to IP:"+ip); 
     366                        e.printStackTrace(); 
     367                } 
     368        } 
     369         
     370        public void sendhashtoall(String newhash){ 
     371                String clientip; 
     372                int port; 
     373                 
     374                try {            
     375                        Statement stmt = database_conn.createStatement(); 
     376                        ResultSet rs = stmt.executeQuery("select * from ClientDatabase"); 
     377                        while(rs.next()) { 
     378                                clientip = rs.getString(1); 
     379                                port = rs.getInt(2); 
     380                                byte [] sendData = newhash.getBytes(); 
     381                                try{ 
     382                                        DatagramPacket sendPacket = new DatagramPacket(sendData,sendData.length,InetAddress.getByAddress(clientip.getBytes()),port); 
     383                                        UDPserver.send(sendPacket); 
     384                                } 
     385                                catch(Exception e){ 
     386                                        System.out.println("Something wrong in pushing new hash entry to a user"); 
     387                                        e.printStackTrace(); 
     388                                } 
     389                        } 
     390                        rs.close(); 
     391                } catch (SQLException e) { 
     392                        System.out.println("Unable to read data from clientdatabase while sending new hash entry to all users"); 
     393                        e.printStackTrace(); 
     394                } 
     395        } 
     396} 
    250397/*  
    251398 * The ProxyThread will take an HTTP request from the client 
     
    259406        private PrintStream debugOut = System.out; 
    260407        private Connection conn; 
     408        private UDPServerThread udpthread; 
     409         
    261410        // the socketTimeout is used to time out the connection to 
    262411        // the remote server after a certain period of inactivity; 
     
    267416         
    268417         
    269         public PullServerProxyThread(Socket s) 
    270         { 
    271                 pSocket = s; 
    272         } 
    273  
    274         public PullServerProxyThread(Socket s, Connection con2) 
     418//      public PullServerProxyThread(Socket s) 
     419//      { 
     420//              pSocket = s; 
     421//      } 
     422 
     423        public PullServerProxyThread(Socket s, Connection con2, UDPServerThread udp) 
    275424        { 
    276425                pSocket = s; 
    277426                conn = con2; 
     427                udpthread = udp; 
    278428        } 
    279429         
     
    339489                                hostName = hostName.substring(0, pos); 
    340490                        } 
    341                         System.out.println("gotrequest url:"+url+"\nhost:"+host+"\nhostport:"+hostPort); 
    342                         // either forward this request to another proxy server or 
    343                         // send it straight to the Host 
    344                         //System.out.println("contactedorigin:"+contactedorigin.toString()); 
    345                         if(contactedorigin.toString().equalsIgnoreCase("false")){ 
    346                                 MessageDigest sha1 = MessageDigest.getInstance("SHA-1"); 
    347                                 byte [] udigest = sha1.digest((host.toString()+url.toString()).getBytes()); 
    348                          
    349                         StringBuffer urldigest = new StringBuffer(); 
    350                         for (int i=0;i<udigest.length;i++) { 
    351                                 urldigest.append(Integer.toHexString(0xFF & udigest[i])); 
    352                         } 
    353                          
    354                         //System.out.println("computed urlhash:"+urldigest.toString()); 
    355                         try {            
    356                                         Statement stmt2 = conn.createStatement(); 
    357                                         ResultSet rs = stmt2.executeQuery("select * from ServerHashlist where UrlHash='"+urldigest+"'"); 
    358                                         int num = 0; 
    359                                         if (rs.next()) { 
    360                                                 System.out.println("ENTRY EXISTS -  UrlHash: " + rs.getString(1) 
    361                                                                 + " ObjectHash:" + rs.getString(2) + " Time:" 
    362                                                                 + rs.getString(3)+ " Count:"+rs.getInt(4)); 
    363                                                 System.out.println("redirecting user to origin"); 
    364                                                 int occur = rs.getInt(4); 
    365                                                 Statement stmt3 = conn.createStatement(); 
    366                                                 int updatecount = stmt3.executeUpdate("UPDATE ServerHashlist SET Occurrence="+(occur+1)+" WHERE UrlHash='"+urldigest+"'"); 
    367                                                  
    368                                                 String redirectmsg ="VSNRedirect:true\r\nVSNurldigest:"+rs.getString(1)+"\r\nVSNobjectdigest:"+rs.getString(2)+"\r\ncontent-length:0\r\n\r\n"; 
    369                                                 clientOut.write(redirectmsg.getBytes()); 
    370                                                 clientOut.flush(); 
    371                                                  
    372                                                 clientOut.close(); 
    373                                                 clientIn.close(); 
    374                                                 pSocket.close(); 
    375                                                 return; 
    376                                         } 
    377                                         rs.close(); 
    378                                 } catch (Exception e) { 
    379                                         e.printStackTrace(); 
    380                                 } 
    381                         } 
    382                          
     491                        System.out.println("gotrequest url:"+url+"\nhost:"+host+"\nhostport:"+hostPort);                         
    383492                 
    384493                        try 
     
    407516                                BufferedOutputStream serverOut = new BufferedOutputStream(server.getOutputStream()); 
    408517                                try{ 
    409                                          
    410                                          
    411                                         // send the request out 
    412518                                        serverOut.write(request, 0, requestLength); 
    413519                                        serverOut.flush(); 
    414520                                 
    415                                 // and get the response; if we're not at a debug level that 
    416                                 // requires us to return the data in the response, just stream 
    417                                 // it back to the client to save ourselves from having to 
    418                                 // create and destroy an unnecessary byte array. Also, we 
    419                                 // should set the waitForDisconnect parameter to 'true', 
    420                                 // because some servers (like Google) don't always set the 
    421                                 // Content-Length header field, so we have to listen until 
    422                                 // they decide to disconnect (or the connection times out). 
    423                                 //if (debugLevel > 1) 
    424                                 //{ 
    425                                         //response = getHTTPData(serverIn, true); 
    426                                         //responseLength = Array.getLength(response); 
    427                                 //}  else  { 
    428521                                        responseLength = streamHTTPData2(serverIn, clientOut,host,url, true); 
    429522                                } 
     
    431524                                { 
    432525                                        System.out.println ("Socket timeout occurred - killing connection"); 
    433                                         String errMsg = "HTTP/1.0 504 Gateway Time-out\r\nContent-Type: text/html\r\n\r\n" +  
    434                                                         "<html><body>Error connecting to the server:\n" + ste + "\n</body></html>\r\n"; 
     526                                        String errMsg_body = "<html><head><title>504 Gateway Time-out</title></head>"+ 
     527                                                "<body><h1>VSN ERROR MESSAGE: Gateway Time-out</h1>"+ 
     528                                                "<p>Connecting to the server("+hostName+") timedout as the server was not responsive. Please try again later.</p>"+ 
     529                                                "</body></html>\r\n"; 
     530                                        String errMsg_header = "HTTP/1.0 504 Gateway Time-out\r\n"+ 
     531                                        "Content-Type: text/html\r\n"+ 
     532                                                        "Content-Length: "+errMsg_body.length()+"\r\n"+ 
     533                                                "Connection: close\r\n\r\n"; 
     534                                        String errMsg = errMsg_header+errMsg_body; 
     535//                                      String errMsg = "HTTP/1.0 504 Gateway Time-out\r\nContent-Type: text/html\r\n\r\n" +  
     536//                                                      "<html><body>Error connecting to the server:\n" + ste + "\n</body></html>\r\n"; 
    435537                                        clientOut.write(errMsg.getBytes(), 0, errMsg.length()); 
    436538                                } 
    437                                 //} 
    438539                                 
    439540                                serverIn.close(); 
    440541                                serverOut.close(); 
    441542                        } 
    442                          
    443                         // send the response back to the client, if we haven't already 
    444                         //if (debugLevel > 1){ 
    445                         //      clientOut.write(response, 0, responseLength); 
    446                         //      clientOut.flush(); 
    447                         //} 
    448543                         
    449544                         
     
    477572        } 
    478573         
    479          
    480         private byte[] getHTTPData (InputStream in, boolean waitForDisconnect) 
    481         { 
    482                 // get the HTTP data from an InputStream, and return it as 
    483                 // a byte array 
    484                 // the waitForDisconnect parameter tells us what to do in case 
    485                 // the HTTP header doesn't specify the Content-Length of the 
    486                 // transmission 
    487                 StringBuffer host = new StringBuffer(""); 
    488                 StringBuffer url = new StringBuffer(""); 
    489                 StringBuffer contactedorigin = new StringBuffer(""); 
    490                 return getHTTPData(in, host, url,contactedorigin, waitForDisconnect); 
    491         } 
    492          
    493          
    494574        private byte[] getHTTPData (InputStream in, StringBuffer host, StringBuffer url, StringBuffer contactedorigin, boolean waitForDisconnect) 
    495575        { 
     
    502582                streamHTTPData(in, bs, host, url, contactedorigin, waitForDisconnect); 
    503583                return bs.toByteArray(); 
    504         } 
    505          
    506  
    507         private int streamHTTPData (InputStream in, OutputStream out, boolean waitForDisconnect) 
    508         { 
    509                 StringBuffer host = new StringBuffer(""); 
    510                 StringBuffer url = new StringBuffer(""); 
    511                 StringBuffer contactedorigin = new StringBuffer(""); 
    512                 return streamHTTPData(in, out, host, url, contactedorigin, waitForDisconnect); 
    513584        } 
    514585         
     
    533604                                //temp_header.append(data + "\r\n"); 
    534605                                pos = data.indexOf(" "); 
    535                                 if ((data.toLowerCase().startsWith("http")) && (pos >= 0) 
    536                                                 && (data.indexOf(" ", pos + 1) >= 0)) { 
    537                                         String rcString = data.substring(pos + 1, 
    538                                                         data.indexOf(" ", pos + 1)); 
    539                                         try { 
    540                                                 responseCode = Integer.parseInt(rcString); 
    541                                         } catch (Exception e) { 
    542                                                 if (debugLevel > 0) 
    543                                                         debugOut.println("Error parsing response code " 
    544                                                                         + rcString); 
    545                                         } 
    546                                 } else { 
     606//                              if ((data.toLowerCase().startsWith("http")) && (pos >= 0) 
     607//                                              && (data.indexOf(" ", pos + 1) >= 0)) { 
     608//                                      String rcString = data.substring(pos + 1, 
     609//                                                      data.indexOf(" ", pos + 1)); 
     610//                                      try { 
     611//                                              responseCode = Integer.parseInt(rcString); 
     612//                                      } catch (Exception e) { 
     613//                                              if (debugLevel > 0) 
     614//                                                      debugOut.println("Error parsing response code " 
     615//                                                                      + rcString); 
     616//                                      } 
     617//                              } else { 
    547618                                        if ((pos >= 0) && (data.indexOf(" ", pos + 1) >= 0)) { 
    548619                                                pre_url.setLength(0); 
     
    553624                                                post_url.append(data.substring(data.indexOf(" ", pos + 1)+1)); 
    554625                                        } 
    555                                 } 
     626                                //} 
    556627                        } 
    557628 
     
    781852                                                                                        //              + " ObjectHash:" + rs.getString(2) + " Time:" 
    782853                                                                                        //              + rs.getString(3)+ " Count:"+rs.getInt(4)); 
     854                                                                                        String old_objecthash = rs.getString(2); 
    783855                                                                                        int occur = rs.getInt(4); 
    784856                                                                                                                                                                                 
     
    791863                                                                                        psUpdate.executeUpdate(); 
    792864                                                                                         
     865                                                                                        if(!old_objecthash.equalsIgnoreCase(objectdigest.toString())){ 
     866                                                                                                udpthread.sendhashtoall(urldigest.toString()+"\t"+objectdigest.toString()); 
     867                                                                                        } 
    793868                                                                                } else { 
    794869                                                                                        PreparedStatement psInsert = conn 
     
    801876                                            psInsert.setInt(4, 0); 
    802877                                                                                        psInsert.executeUpdate(); 
     878                                                                                         
     879                                                                                        udpthread.sendhashtoall(urldigest.toString()+"\t"+objectdigest.toString()); 
    803880                                                                                } 
    804881                                                                                rs.close(); 
     882                                                                                 
    805883                                                                        } catch (Exception e) { 
    806884                                                                                e.printStackTrace(); 
Note: See TracChangeset for help on using the changeset viewer.