# =XMPP4R - XMPP Library for Ruby # License:: Ruby's license (see the LICENSE file) or GNU GPL, at your option. # Website::http://home.gna.org/xmpp4r/ require 'callbacks' require 'socket' require 'thread' Thread::abort_on_exception = true require 'xmpp4r/streamparser' require 'xmpp4r/presence' require 'xmpp4r/message' require 'xmpp4r/iq' require 'xmpp4r/errorexception' require 'xmpp4r/debuglog' require 'xmpp4r/idgenerator' module Jabber ## # The stream class manages a connection stream (a file descriptor using which # XML messages are read and sent) # # You may register callbacks for the three Jabber stanzas # (message, presence and iq) and use the send and send_with_id # methods. # # To ensure the order of received stanzas, callback blocks are # launched in the parser thread. If further blocking operations # are intended in those callbacks, run your own thread there. class Stream DISCONNECTED = 1 CONNECTED = 2 # file descriptor used attr_reader :fd # connection status attr_reader :status ## # Create a new stream # (just initializes) def initialize(threaded = true) @fd = nil @status = DISCONNECTED @xmlcbs = CallbackList::new @stanzacbs = CallbackList::new @messagecbs = CallbackList::new @iqcbs = CallbackList::new @presencecbs = CallbackList::new unless threaded $stderr.puts "Non-threaded mode is currently broken, re-enabling threaded" threaded = true end @threaded = threaded @stanzaqueue = [] @stanzaqueue_lock = Mutex::new @exception_block = nil @threadblocks = [] # @pollCounter = 10 @waiting_thread = nil @wakeup_thread = nil @streamid = nil @features_lock = Mutex.new end ## # Start the XML parser on the fd def start(fd) @stream_mechanisms = [] @stream_features = {} @fd = fd @parser = StreamParser.new(@fd, self) @parserThread = Thread.new do begin @parser.parse rescue Exception => e Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") if @exception_block Thread.new { close; @exception_block.call(e, self, :start) } else puts "Exception caught in Parser thread!" close raise end end end # @pollThread = Thread.new do # begin # poll # rescue # puts "Exception caught in Poll thread, dumping backtrace and" + # " exiting...\n" + $!.exception + "\n" # puts $!.backtrace # exit # end # end @status = CONNECTED end def stop @parserThread.kill @parser = nil end ## # Mounts a block to handle exceptions if they occur during the # poll send. This will likely be the first indication that # the socket dropped in a Jabber Session. # # The block has to take three arguments: # * the Exception # * the Jabber::Stream object (self) # * a symbol where it happened, namely :start, :parser, :sending and :end def on_exception(&block) @exception_block = block end ## # This method is called by the parser when a failure occurs def parse_failure(e) Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") # A new thread has to be created because close will cause the thread # to commit suicide(???) if @exception_block # New thread, because close will kill the current thread Thread.new { close @exception_block.call(e, self, :parser) } else puts "Stream#parse_failure was called by XML parser. Dumping " + "backtrace...\n" + e.exception + "\n" puts e.backtrace close raise end end ## # This method is called by the parser upon receiving def parser_end if @exception_block Thread.new { close @exception_block.call(nil, self, :close) } else close end end ## # Returns if this connection is connected to a Jabber service # return:: [Boolean] Connection status def is_connected? return @status == CONNECTED end ## # Returns if this connection is NOT connected to a Jabber service # # return:: [Boolean] Connection status def is_disconnected? return @status == DISCONNECTED end ## # Processes a received REXML::Element and executes # registered thread blocks and filters against it. # # If in threaded mode, a new thread will be spawned # for the call to receive_nonthreaded. # element:: [REXML::Element] The received element def receive(element) if @threaded # Don't spawn a new thread here. An implicit feature # of XMPP is constant order of stanzas. receive_nonthreaded(element) else receive_nonthreaded(element) end end def receive_nonthreaded(element) Jabber::debuglog("RECEIVED:\n#{element.to_s}") case element.prefix when 'stream' case element.name when 'stream' stanza = element @streamid = element.attributes['id'] unless element.attributes['version'] # isn't XMPP compliant, so Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features") @features_lock.unlock # don't wait for end when 'features' stanza = element element.each { |e| if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl' e.each_element('mechanism') { |mech| @stream_mechanisms.push(mech.text) } else @stream_features[e.name] = e.namespace end } Jabber::debuglog("FEATURES: received") @features_lock.unlock else stanza = element end else case element.name when 'message' stanza = Message::import(element) when 'iq' stanza = Iq::import(element) when 'presence' stanza = Presence::import(element) else stanza = element end end # Iterate through blocked threads (= waiting for an answer) # # We're dup'ping the @threadblocks here, so that we won't end up in an # endless loop if Stream#send is being nested. That means, the nested # threadblock won't receive the stanza currently processed, but the next # one. threadblocks = @threadblocks.dup threadblocks.each { |threadblock| exception = nil r = false begin r = threadblock.call(stanza) rescue Exception => e exception = e end if r == true @threadblocks.delete(threadblock) threadblock.wakeup return elsif exception @threadblocks.delete(threadblock) threadblock.raise(exception) end } if @threaded process_one(stanza) else # stanzaqueue will be read when the user call process @stanzaqueue_lock.lock @stanzaqueue.push(stanza) @stanzaqueue_lock.unlock @waiting_thread.wakeup if @waiting_thread end end private :receive_nonthreaded ## # Process |element| until it is consumed. Returns element.consumed? # element The element to process def process_one(stanza) Jabber::debuglog("PROCESSING:\n#{stanza.to_s}") return true if @xmlcbs.process(stanza) return true if @stanzacbs.process(stanza) case stanza when Message return true if @messagecbs.process(stanza) when Iq return true if @iqcbs.process(stanza) when Presence return true if @presencecbs.process(stanza) end end private :process_one ## # Process |max| XML stanzas and call listeners for all of them. # # max:: [Integer] the number of stanzas to process (nil means process # all available) def process(max = nil) n = 0 @stanzaqueue_lock.lock while @stanzaqueue.size > 0 and (max == nil or n < max) e = @stanzaqueue.shift @stanzaqueue_lock.unlock process_one(e) n += 1 @stanzaqueue_lock.lock end @stanzaqueue_lock.unlock n end ## # Process an XML stanza and call the listeners for it. If no stanza is # currently available, wait for max |time| seconds before returning. # # time:: [Integer] time to wait in seconds. If nil, wait infinitely. # all available) def wait_and_process(time = nil) if time == 0 return process(1) end @stanzaqueue_lock.lock if @stanzaqueue.size > 0 e = @stanzaqueue.shift @stanzaqueue_lock.unlock process_one(e) return 1 end @waiting_thread = Thread.current @wakeup_thread = Thread.new { sleep time ; @waiting_thread.wakeup if @waiting_thread } @waiting_thread.stop @wakeup_thread.kill if @wakeup_thread @wakeup_thread = nil @waiting_thread = nil @stanzaqueue_lock.lock if @stanzaqueue.size > 0 e = @stanzaqueue.shift @stanzaqueue_lock.unlock process_one(e) return 1 end return 0 end ## # This is used by Jabber::Stream internally to # keep track of any blocks which were passed to # Stream#send. class ThreadBlock def initialize(block) @thread = Thread.current @block = block end def call(*args) @block.call(*args) end def wakeup # TODO: Handle threadblock removal if !alive? @thread.wakeup if @thread.alive? end def raise(exception) @thread.raise(exception) if @thread.alive? end end ## # Sends XML data to the socket and (optionally) waits # to process received data. # # Do not invoke this in a callback but in a seperate thread # because we may not suspend the parser-thread (in whose # context callbacks are executed). # # xml:: [String] The xml data to send # &block:: [Block] The optional block def send(xml, &block) Jabber::debuglog("SENDING:\n#{xml}") @threadblocks.unshift(ThreadBlock.new(block)) if block Thread.critical = true # we don't want to be interupted before we stop! begin @fd << xml.to_s @fd.flush rescue Exception => e Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") if @exception_block Thread.new { close!; @exception_block.call(e, self, :sending) } else puts "Exception caught while sending!" close! raise end end Thread.critical = false # The parser thread might be running this (think of a callback running send()) # If this is the case, we mustn't stop (or we would cause a deadlock) Thread.stop if block and Thread.current != @parserThread @pollCounter = 10 end ## # Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be # generated by Jabber::IdGenerator if not already set. # # The block will be called once: when receiving a stanza with the # same Jabber::XMPPStanza#id. There is no need to return true to # complete this! Instead the return value of the block will be # returned. # # Be aware that if a stanza with type='error' is received # the function does not yield but raises an ErrorException with # the corresponding error element. # # Please see Stream#send for some implementational details. # # Please read the note about nesting at Stream#send # xml:: [XMLStanza] def send_with_id(xml, &block) if xml.id.nil? xml.id = Jabber::IdGenerator.instance.generate_id end res = nil error = nil send(xml) do |received| if received.kind_of? XMLStanza and received.id == xml.id if received.type == :error error = (received.error ? received.error : Error.new) true else res = yield(received) true end else false end end unless error.nil? raise ErrorException.new(error) end res end ## # Starts a polling thread to send "keep alive" data to prevent # the Jabber connection from closing for inactivity. # # Currently not working! def poll sleep 10 while true sleep 2 # @pollCounter = @pollCounter - 1 # if @pollCounter < 0 # begin # send(" \t ") # rescue # Thread.new {@exception_block.call if @exception_block} # break # end # end end end ## # Adds a callback block to process received XML messages # # priority:: [Integer] The callback's priority, the higher, the sooner # ref:: [String] The callback's reference # &block:: [Block] The optional block def add_xml_callback(priority = 0, ref = nil, &block) @xmlcbs.add(priority, ref, block) end ## # Delete an XML-messages callback # # ref:: [String] The reference of the callback to delete def delete_xml_callback(ref) @xmlcbs.delete(ref) end ## # Adds a callback block to process received Messages # # priority:: [Integer] The callback's priority, the higher, the sooner # ref:: [String] The callback's reference # &block:: [Block] The optional block def add_message_callback(priority = 0, ref = nil, &block) @messagecbs.add(priority, ref, block) end ## # Delete an Message callback # # ref:: [String] The reference of the callback to delete def delete_message_callback(ref) @messagecbs.delete(ref) end ## # Adds a callback block to process received Stanzas # # priority:: [Integer] The callback's priority, the higher, the sooner # ref:: [String] The callback's reference # &block:: [Block] The optional block def add_stanza_callback(priority = 0, ref = nil, &block) @stanzacbs.add(priority, ref, block) end ## # Delete a Stanza callback # # ref:: [String] The reference of the callback to delete def delete_stanza_callback(ref) @stanzacbs.delete(ref) end ## # Adds a callback block to process received Presences # # priority:: [Integer] The callback's priority, the higher, the sooner # ref:: [String] The callback's reference # &block:: [Block] The optional block def add_presence_callback(priority = 0, ref = nil, &block) @presencecbs.add(priority, ref, block) end ## # Delete a Presence callback # # ref:: [String] The reference of the callback to delete def delete_presence_callback(ref) @presencecbs.delete(ref) end ## # Adds a callback block to process received Iqs # # priority:: [Integer] The callback's priority, the higher, the sooner # ref:: [String] The callback's reference # &block:: [Block] The optional block def add_iq_callback(priority = 0, ref = nil, &block) @iqcbs.add(priority, ref, block) end ## # Delete an Iq callback # # ref:: [String] The reference of the callback to delete # def delete_iq_callback(ref) @iqcbs.delete(ref) end ## # Closes the connection to the Jabber service def close close! end def close! @parserThread.kill if @parserThread # @pollThread.kill @fd.close if @fd and !@fd.closed? @status = DISCONNECTED end end end