Թարմացնել
Մեկնաբանություններից.
կա պրոցեսոր՝ հաղորդագրությունները խմբաքանակով մշակելու համար: այն սկսում է մշակվել, երբ բավականաչափ հաղորդագրություններ կան կամ ժամանակն ավարտվում է, այստեղ է հայտնվում ժամանակի չեղարկումը
Սա նշանակում է, որ այն, ինչ իրականում անհրաժեշտ է, հաղորդագրությունները խմբաքանակով խմբավորելու միջոց է և՛ ըստ քանակի, և՛ ժամանակաշրջանի: Երկուսն էլ անելը համեմատաբար հեշտ է:
Այս մեթոդի խմբաքանակները ըստ հաշվարկի: Մեթոդը հաղորդագրություններ է ավելացնում batch
ցուցակին, մինչև սահմանը հասնի, ուղարկում է տվյալները ներքև և մաքրում ցուցակը.
static ChannelReader<Message[]> BatchByCount(this ChannelReader<Message> input, int count, CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
var batch=new List<Message>(count);
await foreach(var msg in input.ReadAllAsync(token))
{
batch.Add(msg);
if(batch.Count==count)
{
await writer.WriteAsync(batch.ToArray());
batch.Clear();
}
}
},token)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
Մի մեթոդ, որը խմբաքանակ է բաժանում ըստ ժամանակաշրջանի, ավելի բարդ է, քանի որ ժմչփը կարող է միևնույն ժամանակ հաղորդագրություն ստանալ: Interlocked.Exchange
-ը փոխարինում է գոյություն ունեցող batch
ցուցակը նորով և ուղարկում խմբաքանակի տվյալները ներքև: :
static ChannelReader<Message[]> BatchByPeriod(this ChannelReader<Message> input, TimeSpan period, CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
var batch=new List<Message>();
Timer t=new Timer(async obj =>{
var data=Interlocked.Exchange(ref batch,new List<Message>());
writer.WriteAsync(data.ToArray());
},null,TimeSpan.Zero,period);
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
batch.Add(msg);
}
},token)
.ContinueWith(t=>{
timer.Dispose();
writer.TryComplete(t.Exception);
});
return channel;
}
Երկուսն էլ անել, ես դեռ աշխատում եմ դրա վրա: Խնդիրն այն է, որ և՛ հաշվառման, և՛ ժամանակաչափի ժամկետի ավարտը կարող է տեղի ունենալ միաժամանակ: Վատագույն դեպքում, lock(batch)
-ը կարող է օգտագործվել՝ ապահովելու համար, որ միայն շարանը կամ օղակը կարող են ուղարկել տվյալները ներքև
Բնօրինակ պատասխան
Ալիքները չեն արտահոսում, երբ ճիշտ օգտագործվում են, ինչպես ցանկացած այլ տարա: Ալիքը ասինխրոն հերթ չէ և հաստատ արգելափակող չէ: Դա շատ տարբեր կոնստրուկտ է, բոլորովին այլ արտահայտություններով: Դա ավելի բարձր մակարդակի կոնտեյներ է, որը օգտագործում է հերթեր: Շատ լավ պատճառ կա, որ կան ChannelReader և ChannelWriter առանձին դասեր:
Տիպիկ սցենարն այն է, որ հրատարակիչը ստեղծի և տիրանա ալիքին: Միայն հրատարակիչը կարող է գրել այդ ալիքին և զանգահարել Complete()
ին: Channel
-ը չի իրականացնում IDisposable
-ը, ուստի այն չի կարող տնօրինվել: Հրատարակիչը բաժանորդներին տրամադրում է միայն ChannelReader
:
Բաժանորդները տեսնում են միայն ChannelReader
և կարդում են դրանից մինչև այն ավարտվի: Օգտագործելով ReadAllAsync
բաժանորդը կարող է շարունակել կարդալ ChannelReader-ից մինչև այն ավարտվի:
Սա բնորոշ օրինակ է.
ChannelReader<Message> Producer(CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<Message>();
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
for(int i=0;i<100;i++)
{
//Check for cancellation
if(token.IsCancellationRequested)
{
return;
}
//Simulate some work
await Task.Delay(100);
await writer.WriteAsync(new Message(...));
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
//This casts to a ChannelReader
return channel;
}
Բաժանորդին աշխատելու համար անհրաժեշտ է միայն ChannelReader
: Օգտագործելով ChannelReader.ReadAllAsync բաժանորդին անհրաժեշտ է միայն await foreach
հաղորդագրությունները մշակելու համար.
async Task Subscriber(ChannelReader<Message> input,CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
//Use the message
}
}
Բաժանորդը կարող է արտադրել իր սեփական հաղորդագրությունները՝ վերադարձնելով ChannelReader: Եվ այստեղ ամեն ինչ շատ հետաքրքիր է դառնում, քանի որ Subscriber
մեթոդը դառնում է քայլ շղթայված քայլերի խողովակաշարում: Եթե մեթոդները փոխարկենք ընդլայնման մեթոդների ChannelReader
-ում, մենք հեշտությամբ կարող ենք ստեղծել մի ամբողջ խողովակաշար:
Եկեք ստեղծենք որոշ թվեր.
ChannelReader<int> Generate(int nums,CancellationToken token=default)
{
var channel=Channel.CreateBounded<int>(10);
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
for(int i=0;i<nums;i++)
{
//Check for cancellation
if(token.IsCancellationRequested)
{
return;
}
await writer.WriteAsync(i*7);
await Task.Delay(100);
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
//This casts to a ChannelReader
return channel;
}
Այնուհետև կրկնապատկեք և քառակուսիացրեք դրանք.
ChannelReader<double> Double(this ChannelReader<int> input,CancellationToken token=default)
{
var channel=Channel.CreateBounded<double>(10);
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(2.0*msg);
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
ChannelReader<double> Root(this ChannelReader<double> input,CancellationToken token=default)
{
var channel=Channel.CreateBounded<double>(10);
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(Math.Sqrt(msg));
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
Եվ վերջապես տպեք դրանք
async Task Print(this ChannelReader<double> input,CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
Console.WriteLine(msg);
}
}
Այժմ մենք կարող ենք խողովակաշար կառուցել
await Generate(100)
.Double()
.Square()
.Print();
Եվ բոլոր քայլերին ավելացրեք չեղարկման նշան.
using var cts=new CancellationTokenSource();
await Generate(100,cts.Token)
.Double(cts.Token)
.Square(cts.Token)
.Print(cts.Token);
Հիշողության օգտագործումը կարող է աճել, եթե մեկ քայլով հաղորդագրություններ ստացվեն ավելի արագ, քան դրանք սպառվում են երկար ժամանակ: Սա հեշտությամբ կարգավորվում է՝ օգտագործելով սահմանափակ ալիքի փոխարեն: Այս կերպ, եթե մեթոդը չափազանց դանդաղ է, բոլոր նախորդ մեթոդները պետք է սպասեն մինչև նոր տվյալներ հրապարակելը:
19.05.2021
ContinueWith
մեթոդը ցույց է տալիս, որ օրինակը. պատրաստ չէ արտադրությանը։ Նաև, անկեղծ ասած, կարծում եմ, որ ձեր պատասխանն առանձնապես չի վերաբերում ՕՊ-ի հարցին: OP-ը փորձում է լուծել խնդիրը, ենթադրել է, որChannel<T>
-ը լավ գործիք կլինի խնդիրը լուծելու համար, և պարզել է, որ դրա օգտագործումը հանգեցրել է հիշողության անսպասելի արտահոսքի: Նրանց սովորեցնելը, թե ինչպես ճիշտ օգտագործելChannel<T>
-ը, դժվար թե օգնի նրանց լուծել առկա խնդիրը: 19.05.2021ContinueWith
ը: Եվyou're using the wrong tool
-ը լավագույն պատասխանն է, երբ մեկն օգտագործում է սխալ գործիք, կամ բացատրում է, թե ինչու .NET թիմը չի փոփոխում ալիքները՝ ոչ պատշաճ օգտագործման համար: ԵվShow don't tell
շատ ավելի լավ է, քանyou're doing it wrong
ասելը 20.05.2021ContinueWith
-ն առանցTaskScheduler
-ը հստակ նշելու հետ կապված խնդիրը որևէ կոնկրետ ձևով կապված չէ ալիքների հետ: Նաև կրակի և մոռանալու առաջադրանքները որոշ չափով սխալ է թվում: Այն ստեղծում է չնկատված բացառությունների հնարավորություն, թեև, իհարկե, ձեր օրինակում այս հնարավորությունը փոքր է: Ինչ վերաբերում է ալիքներին աշխատանքի համար սխալ գործիք լինելուն, ո՞րը կարող է լինել ճիշտ գործիքը: Դուք գիտե՞ք որևէ async հերթ, որն առաջարկում է չեղարկում և ժամանակի դադար, որը չի արտահոսում հիշողությունը: 20.05.2021ChannelReader<T>
s-ի վրա հիմնված ամուր խողովակաշարի շրջանակ ստեղծելը կորած գործ է: Այս դասը միանգամյա օգտագործման չէ, ուստի այն չի կարող օգտվել շարահյուսական աջակցությունից՝foreach
հանգույցից հետո թվարկվողները հեռացնելու համար: Սա նշանակում է, որPrint
օպերատորի սխալը չի կարող հեշտությամբ տարածվել ետ՝ չեղյալ համարելուGenerate
,Double
ևSquare
օպերատորների կողմից ստեղծվածTask.Run
առաջադրանքները: Այս առաջադրանքները կխրվեն, դրանք չեն վերամշակվի և կմնան հիշողության մեջ, մինչև գործընթացը ավարտվի: 20.05.2021Task.Run
ը: Btw Ի՞նչ առումով կասեք, որChannel<T>
-ը տարբերվում էBufferBlock<T>
-ից: Վերջինս թույլ է տալիս նույնքան հեշտությամբ տարբերակել գրողի և ընթերցողի միջև՝ իրITargetBlock<T>
ևISourceBlock<T>
միջերեսների միջոցով: Ես հասկանում եմ, որ ալիքների վաճառքի կետը արագությունն ու արդյունավետությունն է, և խելամիտ սցենարով պոտենցիալ հսկայական հիշողության արտահոսք թույլ տալը տապալում է իր գոյության հիմնական նպատակը: 20.05.2021ChannelReader
-ի ներքին գործերի մեջ: Սովորելու համար, թե ինչպես գրել արտահոսող կոդ: Ասինխրոն հերթի ներդրումը լուծված խնդիր է առնվազն մեկ տասնամյակի ընթացքում, և նորի ստեղծումը, որն ավելի արդյունավետ է, բացառությամբ այն դեպքերի, երբ դա այդպես չէ, այնքան էլ իմաստ չունի: 21.05.2021why should I delve
, քանի որ այդպես ես գտա, որSingleReader
-ը օպտիմալացնում է AsyncOperation-ի օգտագործումը: Խնդրահարույց պատասխանների շատ կարող է բարելավվել՝ իրականում կարդալով առասպելական ձեռնարկները, հոդվածները և դասընթացները: Համոզված եմ, որ Սթիվեն Թուբը, Դեյվիդ Ֆաուլերը, Մարկ Գրավելը, Սթիվ Գորդոնը ապուշ չեն: Նրանք արտադրում են շատ արագ ծրագրեր: Ինչո՞ւ ենթադրել, որ նրանք հիմար բան են արել: Ավելի տրամաբանական է ենթադրել, որ մենք չենք հասկանում, թե ինչպես են օգտագործվում իրերը 21.05.2021ChannelReader<T>.ReadAsync
մեթոդի ստատուս քվոն, ուստի չեմ երկարացնի վեճը։ Կարծում եմ, որ մենք երկուսս էլ հստակ ասել ենք մեր կետերը: 21.05.2021var reader = new AsyncOperation<T>(parent._runContinuationsAsynchronously | cancellationToken.CanBeCanceled, cancellationToken); parent._blockedReaders.EnqueueTail(reader); return reader.ValueTaskOfT;
ընթերցողը հերթագրվում է նույնիսկ այն չեղարկված: 21.05.2021Channel
-ը. կամուրջ հրատարակիչների և սպառողների միջև՝ օգտագործելովpull
մոդելը, այլ ոչ թե անկախ հավաքածու՝ հարցումներով: Դիզայներներն ամեն ինչ արեցին, որպեսզի դժվար դարձնեն ալիքը որպես մեկ կոնտեյներ օգտագործելը: Բացի այդ, ձեր իրական խնդիրն արդեն լուծվում է տարբեր գրադարանների կողմից: Ինչը, եթե չարաշահվի, կհանգեցնի նաև հիշողության օգտագործման ավելացման 24.05.2021