.Net BlockingCollection.Take (2): Înlăturarea în siguranță a două articole la un moment dat

După efectuarea unor cercetări, recurg la orice feedback cu privire la modul în care puteți elimina în mod eficient două articole dintr-o colecție concurentă. Situația mea implică mesajele primite peste UDP care sunt plasate în prezent într-o blocare de blocare. Odată ce există două utilizatori în colecție, trebuie să iau în siguranță doi utilizatori și să le procesez. Am văzut mai multe tehnici diferite, inclusiv câteva idei enumerate mai jos. Implementarea mea actuală este mai jos, dar mă gândesc că există o modalitate mai curată de a face acest lucru asigurând în același timp că utilizatorii sunt procesați în grupuri de câte două. Aceasta este singura restricție în acest scenariu.

Implementarea actuală:

    private int userQueueCount = 0;
    public BlockingCollection UserQueue = new BlockingCollection();

    public void JoinQueue(User u)
    {
           UserQueue.Add(u);
           Interlocked.Increment(ref userQueueCount);

           if (userQueueCount > 1)
           {
               IEnumerable users = UserQueue.Take(2);
               if(users.Count==2) {
                 Interlocked.Decrement(ref userQueueCount);
                 Interlocked.Decrement(ref userQueueCount);
                 ... do some work with users but if only one 
                 is removed I'll run into problems
               }

           }
    }

Ceea ce aș dori să fac este ceva de genul asta, dar nu pot încerca acest lucru într-o situație de producție pentru a asigura integritatea.

 Parallel.ForEach(UserQueue.Take(2), (u) => { ... });

Sau mai bine încă:

    public void JoinQueue(User u)
    {
           UserQueue.Add(u);
          //if needed? increment
           Interlocked.Increment(ref userQueueCount);
           UserQueue.CompleteAdding();
    }

Aplicați apoi acest lucru undeva:

        Task.Factory.StartNew(() =>
        {
            while (userQueueCount > 1) OR (UserQueue.Count > 1) If it's safe?
            {
                IEnumerable users = UserQueue.Take(2);
                ... do stuff
            }

        });

The problem with this is that i'm not sure I can guarantee that between the condition (Count > 1) and the Take(2) that i'm ensuring the UserQueue has at least two items to process? Incoming UDP messages are processed in parallel so I need a way to safely pull items off of the Blocking/Concurrent Collection in pairs of two.

Există o modalitate mai bună/mai sigură de a face acest lucru?

Revised Comments: The intented goal of this question is really just to achieve a stable/thread safe method of processing items off of a Concurrent Collection in .Net 4.0. It doesn't have to be pretty, it just has to be stable in the task of processing items in unordered pairs of twos in a parallel environment.

0
@Brunner Nu sunt sigur că înțeleg comentariul dvs.? Utilizatorii intră în sârmă la o rată/secvență necunoscută. Tot ce știu este când vine un nou utilizator.
adăugat autor Chuck D, sursa
@AlexeiLevenkov De acord, încă încerc să-mi dau seama cum să fac acest lucru fără blocare, dar nu pare fezabil.
adăugat autor Chuck D, sursa
@ mike-z nu contează ordinea în care sunt "potrivite" atât timp cât doi utilizatori sunt eliminați în siguranță din colecție și sunt procesați împreună.
adăugat autor Chuck D, sursa
@Brunner sistemul este de potrivire pentru un joc multiplayer, astfel încât, atunci când doi utilizatori sunt ședinței în coadă, s-ar alătura un nou joc împreună. Cred că 100 ms este o latență acceptabilă.
adăugat autor Chuck D, sursa
Codul dvs. este improbabil pentru a fi thread-safe - la multe apeluri separate de sincronizare, astfel încât este greu de dovedit că se comportă corect.
adăugat autor Alexei Levenkov, sursa
@RubberMallet, probabil că ar trebui să vă comenteze obiectivele - pur și simplu să lucrați și să corectați codul sau dacă aveți alte restricții (cum ar fi blocarea, ...).
adăugat autor Alexei Levenkov, sursa
Take (2) numește metoda LINQ care nu le elimină din BlockingCollection. Trebuie să le împerecheați în ordinea în care au fost adăugate? De exemplu, în cazul în care colecția are a, b, c, d un fir poate procesa a & c și alt proces b & d sau trebuie să procesați a & b apoi c & d?
adăugat autor mike z, sursa
@RubberMallet Am întrebat pentru că am fost pe punctul de a sugera Rx ( msdn.microsoft.com /en-us/data/gg577609.aspx ). Dar văzând că este vorba de potrivire am sugerat să folosesc un ConcurrentQueue și ori de câte ori cineva dorește să înceapă un joc, verifică dacă există ceva în coadă (de exemplu cu TryDequeue), dacă nu, adaugă la coadă.
adăugat autor Brunner, sursa
@RubberMallet Ei bine, o mașină de stat mică le-ar putea adăuga ca Tuple , dar asta ar trebui să fie sincronizat ... Și cum funcționează operațiunea? Ai putea să aștepți 100 ms înainte de a procesa utilizatorii?
adăugat autor Brunner, sursa
Există ceva care să vă împiedice să le adăugați în primul rând ca Tuple?
adăugat autor Brunner, sursa

4 răspunsuri

Cred că cea mai ușoară soluție aici este de a folosi blocarea: veți avea o singură încuietoare pentru toți consumatorii (producătorii nu vor folosi nici un fel de încuietori), ceea ce vă va face să luați întotdeauna utilizatorii în ordinea corectă:

User firstUser;
User secondUser;

lock (consumerLock)
{
    firstUser = userQueue.Take();
    secondUser = userQueue.Take();
}

Process(firstUser, secondUser);

O altă opțiune ar fi aceea de a avea două cozi: una pentru utilizatori unici și una pentru perechi de utilizatori și un proces care le transferă de la prima coadă la alta.

Dacă nu vă deranjează să risipiți un alt fir, puteți face acest lucru cu două BlockingCollection s:

while (true)
{
    var firstUser = incomingUsers.Take();
    var secondUser = incomingUsers.Take();

    userPairs.Add(Tuple.Create(firstUser, secondUser));
}

Nu trebuie să vă faceți griji cu privire la blocarea aici, deoarece coada de comandă pentru utilizatorii unici va avea doar un singur consumator, iar consumatorii de perechi pot folosi acum Take() simplu.

If you do care about wasting a thread and can use TPL Dataflow, you can use BatchBlock, which combines incoming items into batches of n items, where n is configured at the time of creation of the block, so you can set it to 2.

0
adăugat

Dacă nu aș putea pune perechi de utilizatori în colecție din vreun motiv, aș folosi ConcurrentQueue și încerc să încerc 2 elemente în același timp, dacă pot obține doar unul - pune-l înapoi. Așteptați după cum este necesar.

0
adăugat
Am avut inițial acest lucru într-o ConcurrentQueue, dar am schimbat după ce am citit ceva. Cred că ideea utilizării unei BlockingCollection a fost într-o încercare de a implementa cumva un consumator/producător. Mă gândesc că nu există o modalitate evident evidentă de a realiza acest lucru și că, de fapt, sugestia dvs. va funcționa între timp. Multumesc pentru ajutor!
adăugat autor Chuck D, sursa

Iată ce aș face în Codul dur:

ConcurrentQueuequeue = new ConcurrentQueue(); //can use a BlockingCollection too (as it's just a blocking ConcurrentQueue by default anyway)

public void OnUserStartedGame(User joiningUser)
{
   User waitingUser;
   if (this.gameQueue.TryDequeue(out waitingUser)) //if there's someone waiting, we'll get him
      this.MatchUsers(waitingUser, joiningUser);
   else
      this.QueueUser(joiningUser); //it doesn't matter if there's already someone in the queue by now because, well, we are using a queue and it will sort itself out.
}

private void QueueUser(User user)
{
   this.gameQueue.Enqueue(user);
}

private void MatchUsers(User first, User second)
{
   //not sure what you do here
}

Ideea de bază este că dacă cineva dorește să înceapă un joc și există cineva în coada ta, îi potriviți și începeți un joc - dacă nu există nimeni, adăugați-i la coadă. În cel mai bun caz, veți avea un singur utilizator în coadă simultan, dar dacă nu, nu este prea rău fie pentru că, deoarece ceilalți utilizatori încep jocurile, cele așteptate vor fi eliminate treptat și nu vor fi adăugate altele noi până când coada nu este goală din nou.

0
adăugat
Sunt de acord că nu este "perfect" și cred că aș putea să mă sondoneze în fiecare secundă pentru a se asigura că situația menționată mai sus nu se întâmplă, dar cred că abordarea lui @Brunner este optimă având în vedere circumstanțele. Este mult mai curat decât să votezi în mod constant coada pentru doi jucători.
adăugat autor Chuck D, sursa
Cred că aceasta este în mod logic cel mai bun mod de abordare. Mulțumiri!
adăugat autor Chuck D, sursa
Nu am observat că ați menționat-o și sunt de acord că este foarte posibil că aceasta nu este o problemă. Ceea ce vreau sa spun prin "nu FIFO" este ca daca utilizatorii 1 si 2 vin "in acelasi timp", ambele ar putea fi adaugate la coada. Apoi, atunci când utilizatorul 3 vine de-a lungul, el va fi în concordanță cu utilizatorul 1 și utilizatorul 2 va rămâne în coada de așteptare. Dar asta nu este FIFO: utilizatorul 3 nu poate fi procesat înainte de utilizatorul 2.
adăugat autor svick, sursa
Acest cod are o condiție de cursă, ceea ce înseamnă că nu este întotdeauna FIFO. Dacă coada este goală și doi sau mai mulți utilizatori încep un joc în același timp, toți pot ajunge în coadă și utilizatorii nu vor fi potriviți până când nu se mai alătură mai mulți utilizatori.
adăugat autor svick, sursa
@svick Nu aș numi o condiție a cursei, nu este o defecțiune fatală - am menționat, de asemenea, că în răspuns, de două ori. Motivul pentru care nu văd că este o problemă este că atâta timp cât există destui oameni care încep jocurile, coada se va goli destul de repede. În plus, intervalul pentru o astfel de coliziune este foarte mic și posibilitatea de a se întâmpla este foarte slabă în singurul caz care contează (nu mulți jucători care încep jocurile). Și nu sunt sigur ce vrei să spui prin faptul că nu este FIFO, este o coadă și dacă sunt adăugate "simultan" atunci nu contează prea mult dacă
adăugat autor Brunner, sursa
@svick ah, văd ce vrei să spui - Cred că este o întrebare dacă sunteți dispus să tranzacționați simplitatea cu corectitudinea absolută sau nu
adăugat autor Brunner, sursa

Poate asta poate ajuta

public static IList TakeMulti(this BlockingCollection me, int count = 100) where T : class
{
    T last = null;
    if (me.Count == 0)
    {
        last = me.Take();//blocking when queue is empty
    }

    var result = new List(count);

    if (last != null)
    {
        result.Add(last);
    }

    //if you want to take more item on this time.
    //if (me.Count < count/2)
    //{
   //   Thread.Sleep(1000);
    //}

    while (me.Count > 0 && result.Count <= count)
    {
        result.Add(me.Take());
    }

    return result;
}
0
adăugat