From 40834d3643646ac8d4fd0803d50ec4e4b7bfd174 Mon Sep 17 00:00:00 2001 From: ygl Date: Sun, 13 Nov 2022 04:27:18 +0800 Subject: [PATCH] testing async --- realm_cli/LiveDataMsg.cs | 14 +++ realm_cli/Program.cs | 182 +++++++++++++++++++++++++++++++++++---- 2 files changed, 178 insertions(+), 18 deletions(-) create mode 100644 realm_cli/LiveDataMsg.cs diff --git a/realm_cli/LiveDataMsg.cs b/realm_cli/LiveDataMsg.cs new file mode 100644 index 0000000..4998d92 --- /dev/null +++ b/realm_cli/LiveDataMsg.cs @@ -0,0 +1,14 @@ +using Realms; + +namespace realm_cli; + +public class LiveDataMsg : RealmObject +{ + [PrimaryKey] + [MapTo("_id")] + public string Id { get; set; } + + public DateTimeOffset? Uts { get; set; } + + public string Text { get; set; } +} \ No newline at end of file diff --git a/realm_cli/Program.cs b/realm_cli/Program.cs index 5fc4388..08ed6e5 100644 --- a/realm_cli/Program.cs +++ b/realm_cli/Program.cs @@ -1,19 +1,30 @@ // See https://aka.ms/new-console-template for more information +using System.Collections.Concurrent; using System.ComponentModel; +using System.Diagnostics; using MongoDB.Bson; +using MongoDB.Bson.Serialization.Serializers; using realm_cli; using Realms; +Realm mem_db = Realm.GetInstance(new InMemoryConfiguration("test"));; +LiveDataMsg msg1 = null; Realm GetDB() { - var cfg = new RealmConfiguration("d:/realm_test.db"); + var cfg = new RealmConfiguration("c:/prjs/realm_test.realm"); cfg.SchemaVersion = 4; - Realm.Compact(cfg); return Realm.GetInstance(cfg); } +Task GetDBAsync() +{ + var cfg = new RealmConfiguration("c:/prjs/realm_test.realm"); + cfg.SchemaVersion = 4; + return Realm.GetInstanceAsync(cfg); +} + Realm GetMemDB() { return Realm.GetInstance(new InMemoryConfiguration("test")); @@ -28,6 +39,129 @@ void PropChanged(object arg, PropertyChangedEventArgs e) } } +void Produce() +{ + var i = 0; + var db = GetDB(); + while (i < 10) + { + + try + { + db.Write(() => + { + var msg = db.All().FirstOrDefault(i => i.Id == "live"); + msg.Uts = DateTimeOffset.Now; + msg.Text = $"text is {i}"; + Console.WriteLine($"produce msg {msg.Uts} {msg.Text} {Thread.CurrentThread.ManagedThreadId}"); + }); + + } + catch (Exception ex) + { + Console.WriteLine(ex); + Console.WriteLine($"current produce thread is {Thread.CurrentThread.ManagedThreadId}"); + } + + + Task.Delay(1000).Wait(); + i += 1; + } + Console.WriteLine($"produce is done"); + db.Dispose(); +} + +async Task ProduceAsync() +{ + var i = 0; + while (i < 10) + { + try + { + using (var db = await GetDBAsync()) + { + await db.WriteAsync(() => + { + var msg = db.All().FirstOrDefault(i => i.Id == "live"); + msg.Uts = DateTimeOffset.Now; + msg.Text = $"text is {i}"; + Console.WriteLine($"produce msg {msg.Uts} {msg.Text} {Thread.CurrentThread.ManagedThreadId}"); + }); + } + } + catch (Exception ex) + { + Console.WriteLine(ex); + Console.WriteLine($"current produce thread is {Thread.CurrentThread.ManagedThreadId}"); + } + + + await Task.Delay(1000); + i += 1; + } + Console.WriteLine($"produce is done"); +} + +void Consume() +{ + + /* + var bufs = new List(); + db.All().SubscribeForNotifications((sender, changes, error) => + { + Console.WriteLine($"{error} {changes}"); + db.Refresh(); + + }); + */ + var db = GetDB(); + var t1 = db.All().FirstOrDefault(i => i.Id == "live"); + + if (t1 == null) {db.Dispose(); + return; + } + Console.WriteLine("listening"); + t1.PropertyChanged += (sender, e) => + { + Console.WriteLine($"notify msg {t1.Uts} {t1.Text}"); + }; + db.Refresh(); + for (var i = 0; i != 40; ++i) + { + db.Refresh(); + Task.Delay(500).Wait(); + //using (var db = GetDB()) + // { + // var msg = db.All().FirstOrDefault(i => i.Id == "live"); + // Console.WriteLine($"consumed {msg.Uts} {msg.Text} {Thread.CurrentThread.ManagedThreadId}"); + // } + // await Task.Delay(500); + //Console.WriteLine($"buf is {bufs.Count}"); + } + db.Dispose(); +} + +async Task ConsumeAsync() +{ + var db = await GetDBAsync(); + var t1 = db.All().FirstOrDefault(i => i.Id == "live"); + if (t1 == null) {db.Dispose(); + return; + } + Console.WriteLine("listening"); + t1.PropertyChanged += (sender, e) => + { + Console.WriteLine($"notify msg {t1.Uts} {t1.Text}"); + }; + db.Refresh(); + for (var i = 0; i != 40; ++i) + { + db.Refresh(); + await Task.Delay(500); + } + db.Dispose(); +} + async Task InsertLiveData() { var b = DateTime.Now; @@ -37,22 +171,17 @@ async Task InsertLiveData() //item.PropertyChanged += PropChanged; // item.ItemId = i; // item.ValueLabel = item.Val.ToString(); - using (var db = GetDB()) + var db = GetDB(); + for (var i = 0; i != 10000; ++i) + { + var item = new LiveData() {ItemId = i, EcuId = 0, Val = new Random().NextSingle()*i}; + db.Write(() => { - await db.WriteAsync((realm => - { - for (var i = 0; i != 10000; ++i) - { - var item = new LiveData() {ItemId = i, EcuId = 0, Val = new Random().NextSingle()*i}; - //item.PropertyChanged += PropChanged; - item.ItemId = i; - item.ValueLabel = $"{i}-test"; - item.DisplayValue = item.Val.ToString(); - realm.Add(item); - } - })); - // } - + item.ItemId = i; + item.ValueLabel = $"{i}-test"; + item.DisplayValue = item.Val.ToString(); + db.Add(item); + }); } Console.WriteLine($"insert needs {(DateTime.Now - b).TotalSeconds} secs"); } @@ -73,7 +202,23 @@ async Task QueryLiveData() return null; } -await InsertLiveData(); +using (var db = GetDB()) +{ + db.Write(() => + { + db.Add(new LiveDataMsg() {Id = "live", Uts = DateTimeOffset.Now, Text = "begin"}, update:true); + }); +} + + + +//var t1 = Task.Run(() => Produce()); +//var t2 = Task.Run(() => Consume()); +var t1 = ProduceAsync(); +var t2 = ConsumeAsync(); +Task.WaitAll(t1, t2); + +/* var copy_obj = await QueryLiveData(); copy_obj.Id = ObjectId.GenerateNewId().ToString(); var tempId = copy_obj.Id; @@ -92,6 +237,7 @@ using (var db = GetDB()) }); } +*/ /* using (var db = GetDB()) {