Newsgroups : Borland : borland.public.delphi.internet.winsock : 2006 Apr : Re: TIdTCPServer - threads, global data contd.

www.cryer.info
Managed Newsgroup Archive

Re: TIdTCPServer - threads, global data contd.

Subject:Re: TIdTCPServer - threads, global data contd.
Posted by:"Martin James" (mjames_falc..@dial.pipex.com)
Date:Sun, 16 Apr 2006 06:11:18

> The purpose of the Application is to work as a sort of Router, which means
> that I have to send data from Clients connected, but I also have data
> initiated by the server (or other Clients to be sent to a given Client)

So, a 'chat' type app.

> As I read the thread the conclusion is that data should be stored in eg. a
> TThreadList, or in a special strucuture with a critical section.

Producer-Consumer queues can be useful.

> A couple of
> questions:
>
> Reg. the OnExecute Event handler:
> 1a) Can I from the main thread get access to a list of connected clients
> (ie. with connected clients)

Mostly, yes.  Indy servers have a lockable list of client contexts.  The is,
AFAIK,  an area of uncertainty in that, for a short period, a connection may
be disconnected but not yet removed from the list.  You need to bear that in
mind - you might occasionally get an exception you do not want <g>

> 1b) What initiatiates the OnExecute Event?

It is called in a continuous loop.  The caller is a thread, either directly
or, in the case of Indy 10 supercore, via a fiber.

> 1c) How can I (eg. from the main thread) "provoke" the OnExecute Event
> handler of a given client to be executed in order to empty an outbound
info
> queue?

One way is to conver the socket to non-blocking, (see 'Interrupting
ReadFromStack call?' thread from Aart).  This allows a thread to wait on
both a socket and a synchro object, (eg. a semaphore), associated with an
object queue.

> 1d) How would you store data in relation to a specific client for use
during
> later Execute handles (or another thread)?

Well, this is the thing.  The data can be stored in a buffer field of a
data-carrier class. A client reads data into an instance,  arranges for it
to be 'routed/distributed/queued' and creates, (or depools), another one to
receive data during the next read.

The awkward issue is controlling the lifetime of the objects if they are
queued to multiple threads.  A reference count in the object could allow the
object to be freed when the last thread has finished using it.  The object
would need a CS to protect its refCount.  The refCount would also need to be
decremented upon a client disconnect so that buffers are not left lying
around by disappearing clients.

If your 'router' only routes between pairs of clients, and does not
'broadcast', then the issue does not arise.

> Thread safe global data.
> You mentioned the TThreadList as an example of storing threadsafe data.
And
> alternatively to make ones own class with a critical section.
> 2a) Do you have an example of such a class implementation for my
> inspiration?
> 2b) Do you Mathijs have any working code example based on the discussion
you
> had with Martin and Remy that you could share?
>

A P-C queue class that a thread can wait on for objects is fairly easily
built, (see class at end of post).  The difficulty is that the client socket
is normally set to blocking and so does not allow the thread calling
'onExecute' to wait on both the socket and the object queue. Aart gets round
this by setting the socket to non-blocking for the wait and setting it back
to blocking again once the wait has been signalled.  This involves extra
kernel calls, but appears to work OK.  The socket could be left as
non-blocking, but the effect on Indy of such a design is somewhat unknown -
ReadFromStack calls would presumably be OK, (since the socket would be
guaranteed to be readable if an FD_READ status was returned after the socket
wait was signalled), but, as Aart says, it is possible that writes would
return with a 'operation would block' exception, so requiring
non-blocking-type state-machine coding or some bodge, eg. setting the socket
to blocking mode only after such an exception and re-writing.

Rgds,
Martin


unit MinimalSemaphorePCqueue;

{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore.

The semaphore count reflects the queue count
'push' will always succeed unless memory runs out, then you're stuft anyway.
'pop' has a timeout parameter as well as the address of where any received
object is to be put.
'pop' returns immediately with 'true' if there is an object on the queue
available for it.
'pop' blocks the caller if the queue is empty and the timeout is not 0.
'pop' returns false if the timeout is exceeded before an object is available
from the queue.
'pop' returns true if an object is available from the queue before the
timeout
is exceeded.
If multiple threads have called 'pop' and are blocked because the queue is
empty, a single 'push' will make only one of the waiting threads ready.


Methods to push/pop from the queue
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call.
When the handle is signaled, the 'peek' method will retrieve the queued
object.
}
interface

uses
  Windows, Messages, SysUtils, Classes,syncObjs,contnrs;


type

pObject=^Tobject;

TiterateEvent=function(thisObject:TObject):boolean of object;

TsemaphoreMailbox=class(TobjectQueue)
private
  countSema:Thandle;
protected
  access:TcriticalSection;
public
  property semaHandle:Thandle read countSema;
  constructor create; virtual;
  procedure push(aObject:Tobject); virtual;
  function pop(pResObject:pObject;timeout:DWORD):boolean;  virtual;
  function peek(pResObject:pObject):boolean;  virtual;
  destructor destroy; override;
end;


implementation

{ TsemaphoreMailbox }

constructor TsemaphoreMailbox.create;
begin
  inherited create;
  access:=TcriticalSection.create;
  countSema:=createSemaphore(nil,0,maxInt,nil);
end;

destructor TsemaphoreMailbox.destroy;
begin
  access.free;
  closeHandle(countSema);
  inherited;
end;

function TsemaphoreMailbox.pop(pResObject: pObject;
  timeout: DWORD): boolean;
// dequeues an object, if one is available on the queue.  If the queue is
empty,
// the caller is blocked until either an object is pushed on or the timeout
// period expires
begin // wait for a unit from the semaphore
  result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout));
  if result then // if a unit was supplied before the timeout,
  begin
    access.acquire;
    try
      pResObject^:=inherited pop; // get an object from the queue
    finally
      access.release;
    end;
  end;
end;


procedure TsemaphoreMailbox.push(aObject: Tobject);
// pushes an object onto the queue.  If threads are waiting in a 'pop' call,
// one of them is made ready.
begin
  access.acquire;
  try
    inherited push(aObject); // shove the object onto the queue
  finally
    access.release;
  end;
  releaseSemaphore(countSema,1,nil); // release one unit to semaphore
end;

function TsemaphoreMailbox.peek(pResObject: pObject): boolean;
begin
  access.acquire;
  try
    result:=(count>0);
    if result then pResObject^:=inherited pop; // get an object from the
queue
  finally
    access.release;
  end;
end;

end.

Replies:

In response to:

www.cryer.info
Managed Newsgroup Archive