diff --git a/csharp/runner/SnippetRunner/Program.cs b/csharp/runner/SnippetRunner/Program.cs index c69c04c..862ff70 100644 --- a/csharp/runner/SnippetRunner/Program.cs +++ b/csharp/runner/SnippetRunner/Program.cs @@ -39,448 +39,448 @@ DirectoryInfo? snippetDir = null; DirectoryInfo? runnerDir = null; switch (dir.Name) -{ - case "snippets": - snippetDir = dir; - break; - case "runner": - runnerDir = dir; - break; - case "csharp": - csharpDir = dir; - break; - default: - HandleWrongDirectory(); +{ + case "snippets": + snippetDir = dir; + break; + case "runner": + runnerDir = dir; + break; + case "csharp": + csharpDir = dir; + break; + default: + HandleWrongDirectory(); break; } // if no snippet dir, try to find the csharp dir from the runner dir if (snippetDir == null && runnerDir != null) -{ - csharpDir = Directory.GetParent(runnerDir.FullName); - if (!"csharp".Equals(csharpDir?.Name, Ordinal)) - { - HandleWrongDirectory(); +{ + csharpDir = Directory.GetParent(runnerDir.FullName); + if (!"csharp".Equals(csharpDir?.Name, Ordinal)) + { + HandleWrongDirectory(); } } // if no snippet dir, try to find it using the csharp dir if (snippetDir == null && csharpDir != null) -{ - snippetDir = new DirectoryInfo(Path.Combine(csharpDir.FullName, "snippets")); - if (!snippetDir.Exists) - { - HandleWrongDirectory(); +{ + snippetDir = new DirectoryInfo(Path.Combine(csharpDir.FullName, "snippets")); + if (!snippetDir.Exists) + { + HandleWrongDirectory(); } } if (snippetDir == null) -{ - HandleWrongDirectory(); - Environment.Exit(1); +{ + HandleWrongDirectory(); + Environment.Exit(1); return; } try -{ - SortedDictionary> snippetsMap - = GetSnippetsMap(snippetDir); - - SortedDictionary> snippetOptions - = new SortedDictionary>(); - foreach (KeyValuePair> entry in snippetsMap) - { - string group = entry.Key; - IDictionary snippetMap = entry.Value; - List<(string, string, string)> tuples - = new List<(string, string, string)>(snippetMap.Count); - - foreach (KeyValuePair subEntry in snippetMap) - { - string snippet = subEntry.Key; - string snippetPath = subEntry.Value; - tuples.Add((group, snippet, snippetPath)); - } - snippetOptions.Add(group, tuples.AsReadOnly()); - } - - foreach (KeyValuePair> entry in snippetsMap) - { - string group = entry.Key; - IDictionary snippetMap = entry.Value; - foreach (KeyValuePair subEntry in snippetMap) - { - string snippet = subEntry.Key; - string snippetPath = subEntry.Value; - List<(string, string, string)> tuples = new List<(string, string, string)>(1); - tuples.Add((group, snippet, snippetPath)); - snippetOptions.Add(snippet, tuples.AsReadOnly()); - } - } - - if (args.Length == 0) - { - PrintUsage(snippetsMap); - Environment.Exit(1); - } - - // check for settings in the environment - string? settings - = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); - - // validate the settings if we have them - if (settings != null) - { - settings = settings.Trim(); - JsonObject? settingsJson = null; - try - { - settingsJson = JsonNode.Parse(settings)?.AsObject(); - if (settingsJson == null) - { - throw new ArgumentNullException("Setting must be a JSON object: " + settings); - } - } - catch (Exception e) - { - Console.Error.WriteLine(e); - Console.Error.WriteLine("The provided Senzing settings were not valid JSON:"); - Console.Error.WriteLine(); - Environment.Exit(1); - throw; - } - } - - // validate the SENZING_DIR - InstallLocations? installLocations = null; - try - { - installLocations = InstallLocations.FindLocations(); - - } - catch (Exception e) - { - Console.Error.WriteLine(e); - Environment.Exit(1); - throw; - } - if (installLocations == null) - { - Console.Error.WriteLine("Could not find the Senzing installation."); - Console.Error.WriteLine("Try setting the SENZING_DIR environment variable."); - Environment.Exit(1); - return; - } - - List<(string, string)> snippets = new List<(string, string)>(100); - for (int index = 0; index < args.Length; index++) - { - string arg = args[index]; - if (arg.Equals("all", Ordinal)) - { - foreach (IDictionary snippetMap in snippetsMap.Values) - { - foreach (KeyValuePair entry in snippetMap) - { - string snippet = entry.Key; - string snippetPath = entry.Value; - if (!snippets.Contains((snippet, snippetPath))) - { - snippets.Add((snippet, snippetPath)); - } - } - } - continue; - } - if (!snippetOptions.ContainsKey(arg)) - { - Console.Error.WriteLine("Unrecognized code snippet or snippet group: " + arg); - Environment.Exit(1); - } - IList<(string, string, string)> tuples = snippetOptions[arg]; - foreach ((string group, string snippet, string path) in tuples) - { - if (!snippets.Contains((snippet, path))) - { - snippets.Add((snippet, path)); - } - } - } - - // check if we do not have settings and if not setup a temporary repository - if (settings == null) - { - settings = SetupTempRepository(installLocations); - } - - long defaultConfigID; - - SzEnvironment env = SzCoreEnvironment.NewBuilder().Settings(settings).Build(); - try - { - SzConfigManager configMgr = env.GetConfigManager(); - defaultConfigID = configMgr.GetDefaultConfigID(); - - } - catch (SzException e) - { - Console.Error.WriteLine(e); - Environment.Exit(1); - return; - - } - finally - { - env.Destroy(); - } - - foreach ((string snippet, string snippetPath) in snippets) - { - Console.WriteLine(); - Stopwatch stopwatch = Stopwatch.StartNew(); - Dictionary properties = new Dictionary(); - string resourceName = $"""{assemblyName}.Resources.{snippet}.properties"""; - LoadProperties(properties, resourceName); - Console.WriteLine("Preparing repository for " + snippet + "..."); - env = SzCoreEnvironment.NewBuilder().Settings(settings).Build(); - try - { - // first purge the repository - SzDiagnostic diagnostic = env.GetDiagnostic(); - diagnostic.PurgeRepository(); - - // now set the configuration - SzConfigManager configMgr = env.GetConfigManager(); - // check if we need to configure sources - if (properties.ContainsKey(SourceKeyPrefix + 0)) - { - SzConfig config = configMgr.CreateConfig(); - for (int index = 0; - properties.ContainsKey(SourceKeyPrefix + index); - index++) - { - string sourceKey = SourceKeyPrefix + index; - string source = properties[sourceKey]; - source = source.Trim(); - Console.WriteLine("Adding data source: " + source); - config.RegisterDataSource(source); - } - string snippetConfig = config.Export(); - - // register the config - configMgr.SetDefaultConfig(snippetConfig); - } - else - { - // set the default config to the initial default - configMgr.SetDefaultConfigID(defaultConfigID); - } - - // check if there are files we need to load - if (properties.ContainsKey(LoadKeyPrefix + 0)) - { - SzEngine engine = env.GetEngine(); - for (int index = 0; properties.ContainsKey(LoadKeyPrefix + index); index++) - { - string loadKey = LoadKeyPrefix + index; - string fileName = properties[loadKey]; - fileName = fileName.Trim(); - Console.WriteLine("Loading records from file resource: " + fileName); - Stream? stream = assembly.GetManifestResourceStream(fileName); - if (stream == null) - { - throw new ArgumentException( - "Missing resource (" + fileName + ") for load file (" - + loadKey + ") for snippet (" + snippet + ")"); - } - StreamReader rdr = new StreamReader(stream, Encoding.UTF8); - try - { - for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) - { - line = line.Trim(); - if (line.Length == 0) continue; - if (line.StartsWith('#')) continue; - JsonObject? record = JsonNode.Parse(line)?.AsObject(); - if (record == null) - { - throw new JsonException("Failed to parse line as JSON: " + line); - } - string dataSource = record.ContainsKey(DataSource) - ? record[DataSource]?.GetValue() ?? TestSource : TestSource; - string? recordID = record.ContainsKey(RecordID) - ? record[RecordID]?.GetValue() : null; - engine.AddRecord(dataSource, recordID, line, SzNoFlags); - } - } - finally - { - rdr.Close(); - stream.Close(); - } - - } - } - - } - catch (SzException e) - { - Console.Error.WriteLine(e); - Environment.Exit(1); - return; - } - finally - { - env.Destroy(); - } - long duration = stopwatch.ElapsedMilliseconds; - Console.WriteLine("Prepared repository for " + snippet + ". (" + duration + "ms)"); - - ExecuteSnippet(snippet, snippetPath, installLocations, settings, properties); - } - +{ + SortedDictionary> snippetsMap + = GetSnippetsMap(snippetDir); + + SortedDictionary> snippetOptions + = new SortedDictionary>(); + foreach (KeyValuePair> entry in snippetsMap) + { + string group = entry.Key; + IDictionary snippetMap = entry.Value; + List<(string, string, string)> tuples + = new List<(string, string, string)>(snippetMap.Count); + + foreach (KeyValuePair subEntry in snippetMap) + { + string snippet = subEntry.Key; + string snippetPath = subEntry.Value; + tuples.Add((group, snippet, snippetPath)); + } + snippetOptions.Add(group, tuples.AsReadOnly()); + } + + foreach (KeyValuePair> entry in snippetsMap) + { + string group = entry.Key; + IDictionary snippetMap = entry.Value; + foreach (KeyValuePair subEntry in snippetMap) + { + string snippet = subEntry.Key; + string snippetPath = subEntry.Value; + List<(string, string, string)> tuples = new List<(string, string, string)>(1); + tuples.Add((group, snippet, snippetPath)); + snippetOptions.Add(snippet, tuples.AsReadOnly()); + } + } + + if (args.Length == 0) + { + PrintUsage(snippetsMap); + Environment.Exit(1); + } + + // check for settings in the environment + string? settings + = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); + + // validate the settings if we have them + if (settings != null) + { + settings = settings.Trim(); + JsonObject? settingsJson = null; + try + { + settingsJson = JsonNode.Parse(settings)?.AsObject(); + if (settingsJson == null) + { + throw new ArgumentNullException("Setting must be a JSON object: " + settings); + } + } + catch (Exception e) + { + Console.Error.WriteLine(e); + Console.Error.WriteLine("The provided Senzing settings were not valid JSON:"); + Console.Error.WriteLine(); + Environment.Exit(1); + throw; + } + } + + // validate the SENZING_DIR + InstallLocations? installLocations = null; + try + { + installLocations = InstallLocations.FindLocations(); + + } + catch (Exception e) + { + Console.Error.WriteLine(e); + Environment.Exit(1); + throw; + } + if (installLocations == null) + { + Console.Error.WriteLine("Could not find the Senzing installation."); + Console.Error.WriteLine("Try setting the SENZING_DIR environment variable."); + Environment.Exit(1); + return; + } + + List<(string, string)> snippets = new List<(string, string)>(100); + for (int index = 0; index < args.Length; index++) + { + string arg = args[index]; + if (arg.Equals("all", Ordinal)) + { + foreach (IDictionary snippetMap in snippetsMap.Values) + { + foreach (KeyValuePair entry in snippetMap) + { + string snippet = entry.Key; + string snippetPath = entry.Value; + if (!snippets.Contains((snippet, snippetPath))) + { + snippets.Add((snippet, snippetPath)); + } + } + } + continue; + } + if (!snippetOptions.ContainsKey(arg)) + { + Console.Error.WriteLine("Unrecognized code snippet or snippet group: " + arg); + Environment.Exit(1); + } + IList<(string, string, string)> tuples = snippetOptions[arg]; + foreach ((string group, string snippet, string path) in tuples) + { + if (!snippets.Contains((snippet, path))) + { + snippets.Add((snippet, path)); + } + } + } + + // check if we do not have settings and if not setup a temporary repository + if (settings == null) + { + settings = SetupTempRepository(installLocations); + } + + long defaultConfigID; + + SzEnvironment env = SzCoreEnvironment.NewBuilder().Settings(settings).Build(); + try + { + SzConfigManager configMgr = env.GetConfigManager(); + defaultConfigID = configMgr.GetDefaultConfigID(); + + } + catch (SzException e) + { + Console.Error.WriteLine(e); + Environment.Exit(1); + return; + + } + finally + { + env.Destroy(); + } + + foreach ((string snippet, string snippetPath) in snippets) + { + Console.WriteLine(); + Stopwatch stopwatch = Stopwatch.StartNew(); + Dictionary properties = new Dictionary(); + string resourceName = $"""{assemblyName}.Resources.{snippet}.properties"""; + LoadProperties(properties, resourceName); + Console.WriteLine("Preparing repository for " + snippet + "..."); + env = SzCoreEnvironment.NewBuilder().Settings(settings).Build(); + try + { + // first purge the repository + SzDiagnostic diagnostic = env.GetDiagnostic(); + diagnostic.PurgeRepository(); + + // now set the configuration + SzConfigManager configMgr = env.GetConfigManager(); + // check if we need to configure sources + if (properties.ContainsKey(SourceKeyPrefix + 0)) + { + SzConfig config = configMgr.CreateConfig(); + for (int index = 0; + properties.ContainsKey(SourceKeyPrefix + index); + index++) + { + string sourceKey = SourceKeyPrefix + index; + string source = properties[sourceKey]; + source = source.Trim(); + Console.WriteLine("Adding data source: " + source); + config.RegisterDataSource(source); + } + string snippetConfig = config.Export(); + + // register the config + configMgr.SetDefaultConfig(snippetConfig); + } + else + { + // set the default config to the initial default + configMgr.SetDefaultConfigID(defaultConfigID); + } + + // check if there are files we need to load + if (properties.ContainsKey(LoadKeyPrefix + 0)) + { + SzEngine engine = env.GetEngine(); + for (int index = 0; properties.ContainsKey(LoadKeyPrefix + index); index++) + { + string loadKey = LoadKeyPrefix + index; + string fileName = properties[loadKey]; + fileName = fileName.Trim(); + Console.WriteLine("Loading records from file resource: " + fileName); + Stream? stream = assembly.GetManifestResourceStream(fileName); + if (stream == null) + { + throw new ArgumentException( + "Missing resource (" + fileName + ") for load file (" + + loadKey + ") for snippet (" + snippet + ")"); + } + StreamReader rdr = new StreamReader(stream, Encoding.UTF8); + try + { + for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) + { + line = line.Trim(); + if (line.Length == 0) continue; + if (line.StartsWith('#')) continue; + JsonObject? record = JsonNode.Parse(line)?.AsObject(); + if (record == null) + { + throw new JsonException("Failed to parse line as JSON: " + line); + } + string dataSource = record.ContainsKey(DataSource) + ? record[DataSource]?.GetValue() ?? TestSource : TestSource; + string? recordID = record.ContainsKey(RecordID) + ? record[RecordID]?.GetValue() : null; + engine.AddRecord(dataSource, recordID, line, SzNoFlags); + } + } + finally + { + rdr.Close(); + stream.Close(); + } + + } + } + + } + catch (SzException e) + { + Console.Error.WriteLine(e); + Environment.Exit(1); + return; + } + finally + { + env.Destroy(); + } + long duration = stopwatch.ElapsedMilliseconds; + Console.WriteLine("Prepared repository for " + snippet + ". (" + duration + "ms)"); + + ExecuteSnippet(snippet, snippetPath, installLocations, settings, properties); + } + Console.WriteLine(); } catch (Exception e) -{ - Console.Error.WriteLine(e); - Environment.Exit(1); +{ + Console.Error.WriteLine(e); + Environment.Exit(1); throw; } static void LoadProperties(IDictionary properties, String resourceName) -{ - Assembly assembly = Assembly.GetExecutingAssembly(); - Stream? stream = assembly.GetManifestResourceStream(resourceName); - if (stream != null) - { - StreamReader rdr = new StreamReader(stream, Encoding.UTF8); - try - { - for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) - { - if (line.Trim().Length == 0) continue; - if (line.StartsWith('#')) continue; - if (line.StartsWith('!')) continue; - int index = line.IndexOf('=', Ordinal); - if (index < 1) continue; - string key = line.Substring(0, index).Trim(); - string value = ""; - if (index < line.Length - 1) - { - value = line.Substring(index + 1); - } - value = value.Trim(); - while (value.EndsWith('\\')) - { - line = rdr.ReadLine(); - if (line == null) break; - line = line.Trim(); - value = string.Concat(value.AsSpan(0, value.Length - 1), line); - } - properties[key] = value; - } - } - finally - { - rdr.Close(); - stream.Close(); - } +{ + Assembly assembly = Assembly.GetExecutingAssembly(); + Stream? stream = assembly.GetManifestResourceStream(resourceName); + if (stream != null) + { + StreamReader rdr = new StreamReader(stream, Encoding.UTF8); + try + { + for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) + { + if (line.Trim().Length == 0) continue; + if (line.StartsWith('#')) continue; + if (line.StartsWith('!')) continue; + int index = line.IndexOf('=', Ordinal); + if (index < 1) continue; + string key = line.Substring(0, index).Trim(); + string value = ""; + if (index < line.Length - 1) + { + value = line.Substring(index + 1); + } + value = value.Trim(); + while (value.EndsWith('\\')) + { + line = rdr.ReadLine(); + if (line == null) break; + line = line.Trim(); + value = string.Concat(value.AsSpan(0, value.Length - 1), line); + } + properties[key] = value; + } + } + finally + { + rdr.Close(); + stream.Close(); + } } } static SortedDictionary> GetSnippetsMap(DirectoryInfo snippetDir) -{ - SortedDictionary> snippetsMap - = new SortedDictionary>(); - - foreach (string dir in Directory.GetDirectories(snippetDir.FullName)) - { - string? group = Path.GetFileName(dir); - if (group == null) - { - continue; - } - snippetsMap.TryGetValue(group, out SortedDictionary? snippetMap); - if (snippetMap == null) - { - snippetMap = new SortedDictionary(); - snippetsMap.Add(group, snippetMap); - } - - foreach (string subdir in Directory.GetDirectories(dir)) - { - string? snippet = Path.GetFileName(subdir); - if (snippet == null) - { - continue; - } - string csprojPath = Path.Combine(subdir, snippet + ".csproj"); - if (!File.Exists(csprojPath)) - { - continue; - } - snippetMap.Add(group + "." + snippet, subdir); - } - } +{ + SortedDictionary> snippetsMap + = new SortedDictionary>(); + + foreach (string dir in Directory.GetDirectories(snippetDir.FullName)) + { + string? group = Path.GetFileName(dir); + if (group == null) + { + continue; + } + snippetsMap.TryGetValue(group, out SortedDictionary? snippetMap); + if (snippetMap == null) + { + snippetMap = new SortedDictionary(); + snippetsMap.Add(group, snippetMap); + } + + foreach (string subdir in Directory.GetDirectories(dir)) + { + string? snippet = Path.GetFileName(subdir); + if (snippet == null) + { + continue; + } + string csprojPath = Path.Combine(subdir, snippet + ".csproj"); + if (!File.Exists(csprojPath)) + { + continue; + } + snippetMap.Add(group + "." + snippet, subdir); + } + } return snippetsMap; } static void PrintUsage(SortedDictionary> snippetsMap) -{ - Assembly assembly = Assembly.GetExecutingAssembly(); - string? assemblyName = assembly.GetName().Name; - Console.Error.WriteLine($"""dotnet run --project {assemblyName} [ all | | ]*"""); - Console.Error.WriteLine(); - Console.Error.WriteLine(" - Specifying no arguments will print this message"); - Console.Error.WriteLine(" - Specifying \"all\" will run all snippets"); - Console.Error.WriteLine(" - Specifying one or more groups will run all snippets in those groups"); - Console.Error.WriteLine(" - Specifying one or more snippets will run those snippet"); - Console.Error.WriteLine(); - Console.Error.WriteLine("Examples:"); - Console.Error.WriteLine(); - Console.Error.WriteLine($""" dotnet run --project {assemblyName} all"""); - Console.Error.WriteLine(); - Console.Error.WriteLine($""" dotnet run --project {assemblyName} loading.LoadRecords loading.LoadViaFutures"""); - Console.Error.WriteLine(); - Console.Error.WriteLine($""" dotnet run --project {assemblyName} initialization deleting loading.LoadRecords"""); - Console.Error.WriteLine(); - Console.Error.WriteLine("Snippet Group Names:"); - foreach (string group in snippetsMap.Keys) - { - Console.Error.WriteLine(" - " + group); - } - Console.Error.WriteLine(); - Console.Error.WriteLine("Snippet Names:"); - foreach (IDictionary snippetMap in snippetsMap.Values) - { - foreach (string snippet in snippetMap.Keys) - { - Console.Error.WriteLine(" - " + snippet); - } - } +{ + Assembly assembly = Assembly.GetExecutingAssembly(); + string? assemblyName = assembly.GetName().Name; + Console.Error.WriteLine($"""dotnet run --project {assemblyName} [ all | | ]*"""); + Console.Error.WriteLine(); + Console.Error.WriteLine(" - Specifying no arguments will print this message"); + Console.Error.WriteLine(" - Specifying \"all\" will run all snippets"); + Console.Error.WriteLine(" - Specifying one or more groups will run all snippets in those groups"); + Console.Error.WriteLine(" - Specifying one or more snippets will run those snippet"); + Console.Error.WriteLine(); + Console.Error.WriteLine("Examples:"); + Console.Error.WriteLine(); + Console.Error.WriteLine($""" dotnet run --project {assemblyName} all"""); + Console.Error.WriteLine(); + Console.Error.WriteLine($""" dotnet run --project {assemblyName} loading.LoadRecords loading.LoadViaFutures"""); + Console.Error.WriteLine(); + Console.Error.WriteLine($""" dotnet run --project {assemblyName} initialization deleting loading.LoadRecords"""); + Console.Error.WriteLine(); + Console.Error.WriteLine("Snippet Group Names:"); + foreach (string group in snippetsMap.Keys) + { + Console.Error.WriteLine(" - " + group); + } + Console.Error.WriteLine(); + Console.Error.WriteLine("Snippet Names:"); + foreach (IDictionary snippetMap in snippetsMap.Values) + { + foreach (string snippet in snippetMap.Keys) + { + Console.Error.WriteLine(" - " + snippet); + } + } Console.Error.WriteLine(); } static void HandleWrongDirectory() -{ - Console.Error.WriteLine( - "Must be run from the csharp, csharp/runner or csharp/snippets directory"); +{ + Console.Error.WriteLine( + "Must be run from the csharp, csharp/runner or csharp/snippets directory"); Environment.Exit(1); } static void SetupEnvironment(ProcessStartInfo startInfo, InstallLocations installLocations, string settings) -{ - System.Collections.IDictionary origEnv = Environment.GetEnvironmentVariables(); - foreach (DictionaryEntry entry in origEnv) - { - startInfo.Environment[entry.Key?.ToString() ?? ""] - = entry.Value?.ToString() ?? ""; - } +{ + System.Collections.IDictionary origEnv = Environment.GetEnvironmentVariables(); + foreach (DictionaryEntry entry in origEnv) + { + startInfo.Environment[entry.Key?.ToString() ?? ""] + = entry.Value?.ToString() ?? ""; + } startInfo.Environment["SENZING_ENGINE_CONFIGURATION_JSON"] = settings; } @@ -489,155 +489,155 @@ static void ExecuteSnippet(string snippet, InstallLocations senzingInstall, string settings, IDictionary properties) -{ - ProcessStartInfo startInfo = new ProcessStartInfo( - "dotnet", - "run --project " + snippetPath); - SetupEnvironment(startInfo, senzingInstall, settings); - startInfo.WindowStyle = ProcessWindowStyle.Hidden; - startInfo.UseShellExecute = false; - startInfo.RedirectStandardInput = true; - - Console.WriteLine(); - Console.WriteLine("---------------------------------------"); - Console.WriteLine("Executing " + snippet + "..."); - Stopwatch stopWatch = Stopwatch.StartNew(); - - Process? process = Process.Start(startInfo); - if (process == null) - { - throw new ArgumentNullException("Failed to execute snippet; " + snippet); - } - - if (properties != null && properties.ContainsKey(InputKeyPrefix + 0)) - { - // sleep for 1 second to give the process a chance to start up - Thread.Sleep(1000); - for (int index = 0; - properties.ContainsKey(InputKeyPrefix + index); - index++) - { - string inputLine = properties[InputKeyPrefix + index]; - Console.WriteLine(inputLine); - Console.Out.Flush(); - - inputLine = (inputLine == null) ? "" : inputLine.Trim(); - process.StandardInput.WriteLine(inputLine); - process.StandardInput.Flush(); - } - } - int exitValue = 0; - int expectedExitValue = 0; - if (properties != null && properties.ContainsKey(DestroyAfterKey)) - { - string propValue = properties[DestroyAfterKey]; - int delay = Int32.Parse(propValue, CultureInfo.InvariantCulture); - bool exited = process.WaitForExit(delay); - if (!exited && !process.HasExited) - { - expectedExitValue = (Environment.OSVersion.Platform == PlatformID.Win32NT) - ? 1 : SigtermExitCode; - Console.WriteLine(); - Console.WriteLine("Runner destroying " + snippet + " process..."); - - - ProcessStartInfo killStartInfo - = (Environment.OSVersion.Platform == PlatformID.Win32NT) - ? new ProcessStartInfo("taskkill", ["/F", "/PID", "" + process.Id]) - : new ProcessStartInfo("kill", "" + process.Id); - - startInfo.WindowStyle = ProcessWindowStyle.Hidden; - startInfo.UseShellExecute = false; - Process? killer = Process.Start(killStartInfo); - if (killer == null) - { - process.Kill(true); - process.WaitForExit(); - } - else - { - killer.WaitForExit(); - process.WaitForExit(); - } - } - exitValue = process.ExitCode; - } - else - { - // wait indefinitely for the process to terminate - process.WaitForExit(); - exitValue = process.ExitCode; - } - - if (exitValue != expectedExitValue) - { - throw new Exception("Failed to execute snippet; " + snippet - + " (" + exitValue + ")"); - } - stopWatch.Stop(); - int duration = stopWatch.Elapsed.Milliseconds; +{ + ProcessStartInfo startInfo = new ProcessStartInfo( + "dotnet", + "run --project " + snippetPath); + SetupEnvironment(startInfo, senzingInstall, settings); + startInfo.WindowStyle = ProcessWindowStyle.Hidden; + startInfo.UseShellExecute = false; + startInfo.RedirectStandardInput = true; + + Console.WriteLine(); + Console.WriteLine("---------------------------------------"); + Console.WriteLine("Executing " + snippet + "..."); + Stopwatch stopWatch = Stopwatch.StartNew(); + + Process? process = Process.Start(startInfo); + if (process == null) + { + throw new ArgumentNullException("Failed to execute snippet; " + snippet); + } + + if (properties != null && properties.ContainsKey(InputKeyPrefix + 0)) + { + // sleep for 1 second to give the process a chance to start up + Thread.Sleep(1000); + for (int index = 0; + properties.ContainsKey(InputKeyPrefix + index); + index++) + { + string inputLine = properties[InputKeyPrefix + index]; + Console.WriteLine(inputLine); + Console.Out.Flush(); + + inputLine = (inputLine == null) ? "" : inputLine.Trim(); + process.StandardInput.WriteLine(inputLine); + process.StandardInput.Flush(); + } + } + int exitValue = 0; + int expectedExitValue = 0; + if (properties != null && properties.ContainsKey(DestroyAfterKey)) + { + string propValue = properties[DestroyAfterKey]; + int delay = Int32.Parse(propValue, CultureInfo.InvariantCulture); + bool exited = process.WaitForExit(delay); + if (!exited && !process.HasExited) + { + expectedExitValue = (Environment.OSVersion.Platform == PlatformID.Win32NT) + ? 1 : SigtermExitCode; + Console.WriteLine(); + Console.WriteLine("Runner destroying " + snippet + " process..."); + + + ProcessStartInfo killStartInfo + = (Environment.OSVersion.Platform == PlatformID.Win32NT) + ? new ProcessStartInfo("taskkill", ["/F", "/PID", "" + process.Id]) + : new ProcessStartInfo("kill", "" + process.Id); + + startInfo.WindowStyle = ProcessWindowStyle.Hidden; + startInfo.UseShellExecute = false; + Process? killer = Process.Start(killStartInfo); + if (killer == null) + { + process.Kill(true); + process.WaitForExit(); + } + else + { + killer.WaitForExit(); + process.WaitForExit(); + } + } + exitValue = process.ExitCode; + } + else + { + // wait indefinitely for the process to terminate + process.WaitForExit(); + exitValue = process.ExitCode; + } + + if (exitValue != expectedExitValue) + { + throw new Exception("Failed to execute snippet; " + snippet + + " (" + exitValue + ")"); + } + stopWatch.Stop(); + int duration = stopWatch.Elapsed.Milliseconds; Console.WriteLine("Executed " + snippet + ". (" + duration + "ms)"); } static string SetupTempRepository(InstallLocations senzingInstall) -{ - DirectoryInfo? supportDir = senzingInstall.SupportDirectory; - DirectoryInfo? resourcesDir = senzingInstall.ResourceDirectory; - DirectoryInfo? templatesDir = senzingInstall.TemplatesDirectory; - DirectoryInfo? configDir = senzingInstall.ConfigDirectory; - if (supportDir == null || configDir == null - || resourcesDir == null || templatesDir == null) - { - throw new Exception( - "At least one of the required directories is missing from " - + "the installation. installLocations=[ " - + senzingInstall + " ]"); - } - - DirectoryInfo schemaDir = new DirectoryInfo( - Path.Combine(resourcesDir.FullName, "schema")); - string schemaFile = Path.Combine( - schemaDir.FullName, "szcore-schema-sqlite-create.sql"); - string configFile = Path.Combine( - templatesDir.FullName, "g2config.json"); - - // lay down the database schema - string databaseFile = Path.Combine( - Path.GetTempPath(), "G2C-" + Path.GetRandomFileName() + ".db"); - String jdbcUrl = "jdbc:sqlite:" + databaseFile; - - SqliteConnection? sqlite = null; - try - { - String connectSpec = "Data Source=" + databaseFile; - sqlite = new SqliteConnection(connectSpec); - sqlite.Open(); - SqliteCommand cmd = sqlite.CreateCommand(); - - string[] sqlLines = File.ReadAllLines(schemaFile, Encoding.UTF8); - - foreach (string sql in sqlLines) - { - if (sql.Trim().Length == 0) continue; +{ + DirectoryInfo? supportDir = senzingInstall.SupportDirectory; + DirectoryInfo? resourcesDir = senzingInstall.ResourceDirectory; + DirectoryInfo? templatesDir = senzingInstall.TemplatesDirectory; + DirectoryInfo? configDir = senzingInstall.ConfigDirectory; + if (supportDir == null || configDir == null + || resourcesDir == null || templatesDir == null) + { + throw new Exception( + "At least one of the required directories is missing from " + + "the installation. installLocations=[ " + + senzingInstall + " ]"); + } + + DirectoryInfo schemaDir = new DirectoryInfo( + Path.Combine(resourcesDir.FullName, "schema")); + string schemaFile = Path.Combine( + schemaDir.FullName, "szcore-schema-sqlite-create.sql"); + string configFile = Path.Combine( + templatesDir.FullName, "g2config.json"); + + // lay down the database schema + string databaseFile = Path.Combine( + Path.GetTempPath(), "G2C-" + Path.GetRandomFileName() + ".db"); + String jdbcUrl = "jdbc:sqlite:" + databaseFile; + + SqliteConnection? sqlite = null; + try + { + String connectSpec = "Data Source=" + databaseFile; + sqlite = new SqliteConnection(connectSpec); + sqlite.Open(); + SqliteCommand cmd = sqlite.CreateCommand(); + + string[] sqlLines = File.ReadAllLines(schemaFile, Encoding.UTF8); + + foreach (string sql in sqlLines) + { + if (sql.Trim().Length == 0) continue; #pragma warning disable CA2100 - cmd.CommandText = sql.Trim(); + cmd.CommandText = sql.Trim(); #pragma warning restore CA2100 - cmd.ExecuteNonQuery(); - } - } - finally - { - if (sqlite != null) - { - sqlite.Close(); - } - } - - string supportPath = supportDir.FullName.Replace("\\", "\\\\", Ordinal); - string configPath = configDir.FullName.Replace("\\", "\\\\", Ordinal); - string resourcePath = resourcesDir.FullName.Replace("\\", "\\\\", Ordinal); - string baseConfig = File.ReadAllText(configFile).Replace("\\", "\\\\", Ordinal); - string databasePath = databaseFile.Replace("\\", "\\\\", Ordinal); + cmd.ExecuteNonQuery(); + } + } + finally + { + if (sqlite != null) + { + sqlite.Close(); + } + } + + string supportPath = supportDir.FullName.Replace("\\", "\\\\", Ordinal); + string configPath = configDir.FullName.Replace("\\", "\\\\", Ordinal); + string resourcePath = resourcesDir.FullName.Replace("\\", "\\\\", Ordinal); + string baseConfig = File.ReadAllText(configFile).Replace("\\", "\\\\", Ordinal); + string databasePath = databaseFile.Replace("\\", "\\\\", Ordinal); string settings = $$""" { "PIPELINE": { @@ -649,23 +649,23 @@ static string SetupTempRepository(InstallLocations senzingInstall) "CONNECTION": "sqlite3://na:na@{{databasePath}}" } } - """.Trim(); - - SzEnvironment env = SzCoreEnvironment.NewBuilder().Settings(settings).Build(); - try - { - env.GetConfigManager().SetDefaultConfig(baseConfig); - - } - catch (Exception) - { - Console.Error.WriteLine(settings); - throw; - } - finally - { - env.Destroy(); - } - + """.Trim(); + + SzEnvironment env = SzCoreEnvironment.NewBuilder().Settings(settings).Build(); + try + { + env.GetConfigManager().SetDefaultConfig(baseConfig); + + } + catch (Exception) + { + Console.Error.WriteLine(settings); + throw; + } + finally + { + env.Destroy(); + } + return settings; } diff --git a/csharp/snippets/deleting/DeleteViaFutures/Program.cs b/csharp/snippets/deleting/DeleteViaFutures/Program.cs index 31a5d12..789a5f5 100644 --- a/csharp/snippets/deleting/DeleteViaFutures/Program.cs +++ b/csharp/snippets/deleting/DeleteViaFutures/Program.cs @@ -17,8 +17,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -54,226 +54,226 @@ TaskScheduler taskScheduler // create a reader StreamReader rdr = new StreamReader(fs, Encoding.UTF8); try -{ - - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - int lineNumber = 0; - bool eof = false; - - while (!eof) - { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.Count < MaximumBacklog) - { - // read the next line - string? line = rdr.ReadLine(); - lineNumber++; - - // check for EOF - if (line == null) - { - eof = true; - break; - } - - // trim the line - line = line.Trim(); - - // skip any blank lines - if (line.Length == 0) continue; - - // skip any commented lines - if (line.StartsWith('#')) continue; - - // construct the Record instance - Record record = new Record(lineNumber, line); - - try - { - // parse the line as a JSON object - JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); - if (recordJson == null) - { - // parsed JSON null - throw new SzBadInputException("Record must be a JSON object: " + line); - } - - // extract the data source code and record ID - string? dataSourceCode = recordJson[DataSource]?.GetValue(); - string? recordID = recordJson[RecordID]?.GetValue(); - - Task task = factory.StartNew(() => - { - // call the DeleteRecord() function with no flags - engine.DeleteRecord(dataSourceCode, recordID); - }, - CancellationToken.None, - TaskCreationOptions.None, - taskScheduler); - - // add the future to the pending future list - pendingFutures.Add((task, record)); - - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, lineNumber, line); - errorCount++; // increment the error count - } - } - - do - { - // handle any pending futures WITHOUT blocking to reduce the backlog - HandlePendingFutures(pendingFutures, false); - - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.Count >= MaximumBacklog) - { - Thread.Sleep(PauseTimeout); - } - } while (pendingFutures.Count >= MaximumBacklog); - } - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future +{ + + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + int lineNumber = 0; + bool eof = false; + + while (!eof) + { + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.Count < MaximumBacklog) + { + // read the next line + string? line = rdr.ReadLine(); + lineNumber++; + + // check for EOF + if (line == null) + { + eof = true; + break; + } + + // trim the line + line = line.Trim(); + + // skip any blank lines + if (line.Length == 0) continue; + + // skip any commented lines + if (line.StartsWith('#')) continue; + + // construct the Record instance + Record record = new Record(lineNumber, line); + + try + { + // parse the line as a JSON object + JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); + if (recordJson == null) + { + // parsed JSON null + throw new SzBadInputException("Record must be a JSON object: " + line); + } + + // extract the data source code and record ID + string? dataSourceCode = recordJson[DataSource]?.GetValue(); + string? recordID = recordJson[RecordID]?.GetValue(); + + Task task = factory.StartNew(() => + { + // call the DeleteRecord() function with no flags + engine.DeleteRecord(dataSourceCode, recordID); + }, + CancellationToken.None, + TaskCreationOptions.None, + taskScheduler); + + // add the future to the pending future list + pendingFutures.Add((task, record)); + + } + catch (SzBadInputException e) + { + LogFailedRecord(Error, e, lineNumber, line); + errorCount++; // increment the error count + } + } + + do + { + // handle any pending futures WITHOUT blocking to reduce the backlog + HandlePendingFutures(pendingFutures, false); + + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.Count >= MaximumBacklog) + { + Thread.Sleep(PauseTimeout); + } + } while (pendingFutures.Count >= MaximumBacklog); + } + + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future HandlePendingFutures(pendingFutures, true); } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - rdr.Close(); - fs.Close(); - - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - - Console.WriteLine(); - Console.WriteLine("Successful delete operations : " + successCount); - Console.WriteLine("Failed delete operations : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine(retryCount + " deletions to be retried in " + retryFile); - } +{ + rdr.Close(); + fs.Close(); + + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + + Console.WriteLine(); + Console.WriteLine("Successful delete operations : " + successCount); + Console.WriteLine("Failed delete operations : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine(retryCount + " deletions to be retried in " + retryFile); + } Console.Out.Flush(); } static void HandlePendingFutures(IList<(Task, Record)> pendingFutures, bool blocking) -{ - // loop through the pending futures - for (int index = 0; index < pendingFutures.Count; index++) - { - // get the next pending future - (Task task, Record record) = pendingFutures[index]; - - // if not blocking and this one is not done then continue - if (!blocking && !task.IsCompleted) continue; - - // remove the pending future from the list - pendingFutures.RemoveAt(index--); - - try - { - try - { - // wait for completion -- if non-blocking then this - // task is already completed and this will just - // throw any exception that might have occurred - if (blocking && !task.IsCompleted) - { - task.Wait(); - } - - // if we get here then increment the success count - successCount++; - - } - catch (AggregateException e) - when (e.InnerException is TaskCanceledException - || e.InnerException is ThreadInterruptedException) - { - throw new SzRetryableException(e.InnerException); - } - catch (ThreadInterruptedException e) - { - throw new SzRetryableException(e.InnerException); - } - catch (AggregateException e) - { - if (e.InnerException != null) - { - // get the inner exception - throw e.InnerException; - } - else - { - throw; - } - } - - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, record.LineNumber, record.Line); - errorCount++; // increment the error count - - } - catch (SzRetryableException e) - { - // handle thread interruption and cancellation as retries - LogFailedRecord(Warning, e, record.LineNumber, record.Line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(record.Line); - } - - } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedRecord(Critical, e, record.LineNumber, record.Line); - errorCount++; - throw; // rethrow since exception is critical - } +{ + // loop through the pending futures + for (int index = 0; index < pendingFutures.Count; index++) + { + // get the next pending future + (Task task, Record record) = pendingFutures[index]; + + // if not blocking and this one is not done then continue + if (!blocking && !task.IsCompleted) continue; + + // remove the pending future from the list + pendingFutures.RemoveAt(index--); + + try + { + try + { + // wait for completion -- if non-blocking then this + // task is already completed and this will just + // throw any exception that might have occurred + if (blocking && !task.IsCompleted) + { + task.Wait(); + } + + // if we get here then increment the success count + successCount++; + + } + catch (AggregateException e) + when (e.InnerException is TaskCanceledException + || e.InnerException is ThreadInterruptedException) + { + throw new SzRetryableException(e.InnerException); + } + catch (ThreadInterruptedException e) + { + throw new SzRetryableException(e.InnerException); + } + catch (AggregateException e) + { + if (e.InnerException != null) + { + // get the inner exception + throw e.InnerException; + } + else + { + throw; + } + } + + } + catch (SzBadInputException e) + { + LogFailedRecord(Error, e, record.LineNumber, record.Line); + errorCount++; // increment the error count + + } + catch (SzRetryableException e) + { + // handle thread interruption and cancellation as retries + LogFailedRecord(Warning, e, record.LineNumber, record.Line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(record.Line); + } + + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedRecord(Critical, e, record.LineNumber, record.Line); + errorCount++; + throw; // rethrow since exception is critical + } } } @@ -291,46 +291,46 @@ static void LogFailedRecord(string errorType, Exception exception, int lineNumber, string recordJson) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine( - "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " - + lineNumber + ": "); - Console.Error.WriteLine(recordJson); - Console.Error.WriteLine(exception); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine( + "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + + lineNumber + ": "); + Console.Error.WriteLine(recordJson); + Console.Error.WriteLine(exception); Console.Error.Flush(); } public partial class Program -{ - private const string DefaultFilePath = "../../resources/data/del-500.jsonl"; - - private const string RetryPrefix = "retry-"; - - private const string RetrySuffix = ".jsonl"; - - private const string DataSource = "DATA_SOURCE"; - - private const string RecordID = "RECORD_ID"; - - private const int ThreadCount = 8; - - private const int BacklogFactor = 10; - - private const int MaximumBacklog = ThreadCount * BacklogFactor; - - private const int PauseTimeout = 100; - - private const string Error = "ERROR"; - - private const string Warning = "WARNING"; - - private const string Critical = "CRITICAL"; - - private static int errorCount; - private static int successCount; - private static int retryCount; - private static FileInfo? retryFile; +{ + private const string DefaultFilePath = "../../resources/data/del-500.jsonl"; + + private const string RetryPrefix = "retry-"; + + private const string RetrySuffix = ".jsonl"; + + private const string DataSource = "DATA_SOURCE"; + + private const string RecordID = "RECORD_ID"; + + private const int ThreadCount = 8; + + private const int BacklogFactor = 10; + + private const int MaximumBacklog = ThreadCount * BacklogFactor; + + private const int PauseTimeout = 100; + + private const string Error = "ERROR"; + + private const string Warning = "WARNING"; + + private const string Critical = "CRITICAL"; + + private static int errorCount; + private static int successCount; + private static int retryCount; + private static FileInfo? retryFile; private static StreamWriter? retryWriter; } diff --git a/csharp/snippets/loading/LoadRecords/Program.cs b/csharp/snippets/loading/LoadRecords/Program.cs index 4ab5c1a..28a648e 100644 --- a/csharp/snippets/loading/LoadRecords/Program.cs +++ b/csharp/snippets/loading/LoadRecords/Program.cs @@ -10,8 +10,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -27,44 +27,44 @@ .Build(); try -{ - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - // loop through the example records and add them to the repository - foreach (KeyValuePair<(string, string), string> pair in GetRecords()) - { - (string dataSourceCode, string recordID) = pair.Key; - string recordDefinition = pair.Value; - - // call the addRecord() function with no flags - engine.AddRecord(dataSourceCode, recordID, recordDefinition, SzNoFlags); - - Console.WriteLine("Record " + recordID + " added"); - Console.Out.Flush(); +{ + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + // loop through the example records and add them to the repository + foreach (KeyValuePair<(string, string), string> pair in GetRecords()) + { + (string dataSourceCode, string recordID) = pair.Key; + string recordDefinition = pair.Value; + + // call the addRecord() function with no flags + engine.AddRecord(dataSourceCode, recordID, recordDefinition, SzNoFlags); + + Console.WriteLine("Record " + recordID + " added"); + Console.Out.Flush(); } } catch (SzException e) -{ - // handle any exception that may have occurred - Console.Error.WriteLine("Senzing Error Message : " + e.Message); - Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); - Console.Error.WriteLine(e); +{ + // handle any exception that may have occurred + Console.Error.WriteLine("Senzing Error Message : " + e.Message); + Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); + Console.Error.WriteLine(e); throw; } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - // IMPORTANT: make sure to destroy the environment +{ + // IMPORTANT: make sure to destroy the environment env.Destroy(); } @@ -77,12 +77,12 @@ /// to string JSON text values describing the records to be added. /// static IDictionary<(string, string), string> GetRecords() -{ - IDictionary<(string, string), string> records - = new SortedDictionary<(string, string), string>(); - - records.Add( - ("TEST", "1001"), +{ + IDictionary<(string, string), string> records + = new SortedDictionary<(string, string), string>(); + + records.Add( + ("TEST", "1001"), """ { "DATA_SOURCE": "TEST", @@ -97,10 +97,10 @@ "PHONE_NUMBER": "702-919-1300", "EMAIL_ADDRESS": "bsmith@work.com" } - """); - - records.Add( - ("TEST", "1002"), + """); + + records.Add( + ("TEST", "1002"), """ { "DATA_SOURCE": "TEST", @@ -118,10 +118,10 @@ "PHONE_TYPE": "MOBILE", "PHONE_NUMBER": "702-919-1300" } - """); - - records.Add( - ("TEST", "1003"), + """); + + records.Add( + ("TEST", "1003"), """ { "DATA_SOURCE": "TEST", @@ -133,10 +133,10 @@ "DATE_OF_BIRTH": "12/11/1978", "EMAIL_ADDRESS": "bsmith@work.com" } - """); - - records.Add( - ("TEST", "1004"), + """); + + records.Add( + ("TEST", "1004"), """ { "DATA_SOURCE": "TEST", @@ -151,10 +151,10 @@ "ADDR_POSTAL_CODE": "89132", "EMAIL_ADDRESS": "bsmith@work.com" } - """); - - records.Add( - ("TEST", "1005"), + """); + + records.Add( + ("TEST", "1005"), """ { "DATA_SOURCE": "TEST", @@ -171,7 +171,7 @@ "ADDR_STATE": "NV", "ADDR_POSTAL_CODE": "89132" } - """); - + """); + return records; } diff --git a/csharp/snippets/loading/LoadViaFutures/Program.cs b/csharp/snippets/loading/LoadViaFutures/Program.cs index 672ecb2..1f4eac3 100644 --- a/csharp/snippets/loading/LoadViaFutures/Program.cs +++ b/csharp/snippets/loading/LoadViaFutures/Program.cs @@ -17,8 +17,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -54,228 +54,228 @@ TaskScheduler taskScheduler // create a reader StreamReader rdr = new StreamReader(fs, Encoding.UTF8); try -{ - - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - int lineNumber = 0; - bool eof = false; - - while (!eof) - { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.Count < MaximumBacklog) - { - // read the next line - string? line = rdr.ReadLine(); - lineNumber++; - - // check for EOF - if (line == null) - { - eof = true; - break; - } - - // trim the line - line = line.Trim(); - - // skip any blank lines - if (line.Length == 0) continue; - - // skip any commented lines - if (line.StartsWith('#')) continue; - - // construct the Record instance - Record record = new Record(lineNumber, line); - - try - { - // parse the line as a JSON object - JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); - if (recordJson == null) - { - // parsed JSON null - throw new SzBadInputException("Record must be a JSON object: " + line); - } - - // extract the data source code and record ID - string? dataSourceCode = recordJson[DataSource]?.GetValue(); - string? recordID = recordJson[RecordID]?.GetValue(); - - Task task = factory.StartNew(() => - { - // call the addRecord() function with no flags - engine.AddRecord(dataSourceCode, recordID, record.Line); - }, - CancellationToken.None, - TaskCreationOptions.None, - taskScheduler); - - // add the future to the pending future list - pendingFutures.Add((task, record)); - - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, lineNumber, line); - errorCount++; // increment the error count - } - } - - do - { - // handle any pending futures WITHOUT blocking to reduce the backlog - HandlePendingFutures(pendingFutures, false); - - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.Count >= MaximumBacklog) - { - Thread.Sleep(PauseTimeout); - } - } while (pendingFutures.Count >= MaximumBacklog); - } - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future +{ + + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + int lineNumber = 0; + bool eof = false; + + while (!eof) + { + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.Count < MaximumBacklog) + { + // read the next line + string? line = rdr.ReadLine(); + lineNumber++; + + // check for EOF + if (line == null) + { + eof = true; + break; + } + + // trim the line + line = line.Trim(); + + // skip any blank lines + if (line.Length == 0) continue; + + // skip any commented lines + if (line.StartsWith('#')) continue; + + // construct the Record instance + Record record = new Record(lineNumber, line); + + try + { + // parse the line as a JSON object + JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); + if (recordJson == null) + { + // parsed JSON null + throw new SzBadInputException("Record must be a JSON object: " + line); + } + + // extract the data source code and record ID + string? dataSourceCode = recordJson[DataSource]?.GetValue(); + string? recordID = recordJson[RecordID]?.GetValue(); + + Task task = factory.StartNew(() => + { + // call the addRecord() function with no flags + engine.AddRecord(dataSourceCode, recordID, record.Line); + }, + CancellationToken.None, + TaskCreationOptions.None, + taskScheduler); + + // add the future to the pending future list + pendingFutures.Add((task, record)); + + } + catch (SzBadInputException e) + { + LogFailedRecord(Error, e, lineNumber, line); + errorCount++; // increment the error count + } + } + + do + { + // handle any pending futures WITHOUT blocking to reduce the backlog + HandlePendingFutures(pendingFutures, false); + + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.Count >= MaximumBacklog) + { + Thread.Sleep(PauseTimeout); + } + } while (pendingFutures.Count >= MaximumBacklog); + } + + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future HandlePendingFutures(pendingFutures, true); } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - // close the reader - rdr.Close(); - - // close the file stream - fs.Close(); - - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - - Console.WriteLine(); - Console.WriteLine("Records successfully added : " + successCount); - Console.WriteLine("Records failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine(retryCount + " records to be retried in " + retryFile); - } +{ + // close the reader + rdr.Close(); + + // close the file stream + fs.Close(); + + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + + Console.WriteLine(); + Console.WriteLine("Records successfully added : " + successCount); + Console.WriteLine("Records failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine(retryCount + " records to be retried in " + retryFile); + } Console.Out.Flush(); } static void HandlePendingFutures(IList<(Task, Record)> pendingFutures, bool blocking) -{ - // loop through the pending futures - for (int index = 0; index < pendingFutures.Count; index++) - { - // get the next pending future - (Task task, Record record) = pendingFutures[index]; - - // if not blocking and this one is not done then continue - if (!blocking && !task.IsCompleted) continue; - - // remove the pending future from the list - pendingFutures.RemoveAt(index--); - - try - { - try - { - // wait for completion -- if non-blocking then this - // task is already completed and this will just - // throw any exception that might have occurred - if (blocking && !task.IsCompleted) - { - task.Wait(); - } - - // if we get here then increment the success count - successCount++; - - } - catch (AggregateException e) - when (e.InnerException is TaskCanceledException - || e.InnerException is ThreadInterruptedException) - { - throw new SzRetryableException(e.InnerException); - } - catch (ThreadInterruptedException e) - { - throw new SzRetryableException(e.InnerException); - } - catch (AggregateException e) - { - if (e.InnerException != null) - { - // get the inner exception - throw e.InnerException; - } - else - { - throw; - } - } - - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, record.LineNumber, record.Line); - errorCount++; // increment the error count - - } - catch (SzRetryableException e) - { - // handle thread interruption and cancellation as retries - LogFailedRecord(Warning, e, record.LineNumber, record.Line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(record.Line); - } - - } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedRecord(Critical, e, record.LineNumber, record.Line); - errorCount++; - throw; // rethrow since exception is critical - } +{ + // loop through the pending futures + for (int index = 0; index < pendingFutures.Count; index++) + { + // get the next pending future + (Task task, Record record) = pendingFutures[index]; + + // if not blocking and this one is not done then continue + if (!blocking && !task.IsCompleted) continue; + + // remove the pending future from the list + pendingFutures.RemoveAt(index--); + + try + { + try + { + // wait for completion -- if non-blocking then this + // task is already completed and this will just + // throw any exception that might have occurred + if (blocking && !task.IsCompleted) + { + task.Wait(); + } + + // if we get here then increment the success count + successCount++; + + } + catch (AggregateException e) + when (e.InnerException is TaskCanceledException + || e.InnerException is ThreadInterruptedException) + { + throw new SzRetryableException(e.InnerException); + } + catch (ThreadInterruptedException e) + { + throw new SzRetryableException(e.InnerException); + } + catch (AggregateException e) + { + if (e.InnerException != null) + { + // get the inner exception + throw e.InnerException; + } + else + { + throw; + } + } + + } + catch (SzBadInputException e) + { + LogFailedRecord(Error, e, record.LineNumber, record.Line); + errorCount++; // increment the error count + + } + catch (SzRetryableException e) + { + // handle thread interruption and cancellation as retries + LogFailedRecord(Warning, e, record.LineNumber, record.Line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(record.Line); + } + + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedRecord(Critical, e, record.LineNumber, record.Line); + errorCount++; + throw; // rethrow since exception is critical + } } } @@ -293,46 +293,46 @@ static void LogFailedRecord(string errorType, Exception exception, int lineNumber, string recordJson) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine( - "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " - + lineNumber + ": "); - Console.Error.WriteLine(recordJson); - Console.Error.WriteLine(exception); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine( + "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + + lineNumber + ": "); + Console.Error.WriteLine(recordJson); + Console.Error.WriteLine(exception); Console.Error.Flush(); } public partial class Program -{ - private const string DefaultFilePath = "../../resources/data/load-500.jsonl"; - - private const string RetryPrefix = "retry-"; - - private const string RetrySuffix = ".jsonl"; - - private const string DataSource = "DATA_SOURCE"; - - private const string RecordID = "RECORD_ID"; - - private const int ThreadCount = 8; - - private const int BacklogFactor = 10; - - private const int MaximumBacklog = ThreadCount * BacklogFactor; - - private const int PauseTimeout = 100; - - private const string Error = "ERROR"; - - private const string Warning = "WARNING"; - - private const string Critical = "CRITICAL"; - - private static int errorCount; - private static int successCount; - private static int retryCount; - private static FileInfo? retryFile; +{ + private const string DefaultFilePath = "../../resources/data/load-500.jsonl"; + + private const string RetryPrefix = "retry-"; + + private const string RetrySuffix = ".jsonl"; + + private const string DataSource = "DATA_SOURCE"; + + private const string RecordID = "RECORD_ID"; + + private const int ThreadCount = 8; + + private const int BacklogFactor = 10; + + private const int MaximumBacklog = ThreadCount * BacklogFactor; + + private const int PauseTimeout = 100; + + private const string Error = "ERROR"; + + private const string Warning = "WARNING"; + + private const string Critical = "CRITICAL"; + + private static int errorCount; + private static int successCount; + private static int retryCount; + private static FileInfo? retryFile; private static StreamWriter? retryWriter; } diff --git a/csharp/snippets/loading/LoadWithInfoViaFutures/Program.cs b/csharp/snippets/loading/LoadWithInfoViaFutures/Program.cs index b283080..c25a32b 100644 --- a/csharp/snippets/loading/LoadWithInfoViaFutures/Program.cs +++ b/csharp/snippets/loading/LoadWithInfoViaFutures/Program.cs @@ -17,8 +17,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -55,129 +55,129 @@ TaskScheduler taskScheduler // create a reader StreamReader rdr = new StreamReader(fs, Encoding.UTF8); try -{ - - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - int lineNumber = 0; - bool eof = false; - - while (!eof) - { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.Count < MaximumBacklog) - { - // read the next line - string? line = rdr.ReadLine(); - lineNumber++; - - // check for EOF - if (line == null) - { - eof = true; - break; - } - - // trim the line - line = line.Trim(); - - // skip any blank lines - if (line.Length == 0) continue; - - // skip any commented lines - if (line.StartsWith('#')) continue; - - // construct the Record instance - Record record = new Record(lineNumber, line); - - try - { - // parse the line as a JSON object - JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); - if (recordJson == null) - { - // parsed JSON null - throw new SzBadInputException("Record must be a JSON object: " + line); - } - - // extract the data source code and record ID - string? dataSourceCode = recordJson[DataSource]?.GetValue(); - string? recordID = recordJson[RecordID]?.GetValue(); - - Task task = factory.StartNew(() => - { - // call the addRecord() function with info flags - return engine.AddRecord( - dataSourceCode, recordID, record.Line, SzWithInfo); - }, - CancellationToken.None, - TaskCreationOptions.None, - taskScheduler); - - // add the future to the pending future list - pendingFutures.Add((task, record)); - - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, lineNumber, line); - errorCount++; // increment the error count - } - } - - do - { - // handle any pending futures WITHOUT blocking to reduce the backlog - HandlePendingFutures(engine, pendingFutures, false); - - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.Count >= MaximumBacklog) - { - Thread.Sleep(PauseTimeout); - } - } while (pendingFutures.Count >= MaximumBacklog); - } - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future +{ + + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + int lineNumber = 0; + bool eof = false; + + while (!eof) + { + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.Count < MaximumBacklog) + { + // read the next line + string? line = rdr.ReadLine(); + lineNumber++; + + // check for EOF + if (line == null) + { + eof = true; + break; + } + + // trim the line + line = line.Trim(); + + // skip any blank lines + if (line.Length == 0) continue; + + // skip any commented lines + if (line.StartsWith('#')) continue; + + // construct the Record instance + Record record = new Record(lineNumber, line); + + try + { + // parse the line as a JSON object + JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); + if (recordJson == null) + { + // parsed JSON null + throw new SzBadInputException("Record must be a JSON object: " + line); + } + + // extract the data source code and record ID + string? dataSourceCode = recordJson[DataSource]?.GetValue(); + string? recordID = recordJson[RecordID]?.GetValue(); + + Task task = factory.StartNew(() => + { + // call the addRecord() function with info flags + return engine.AddRecord( + dataSourceCode, recordID, record.Line, SzWithInfo); + }, + CancellationToken.None, + TaskCreationOptions.None, + taskScheduler); + + // add the future to the pending future list + pendingFutures.Add((task, record)); + + } + catch (SzBadInputException e) + { + LogFailedRecord(Error, e, lineNumber, line); + errorCount++; // increment the error count + } + } + + do + { + // handle any pending futures WITHOUT blocking to reduce the backlog + HandlePendingFutures(engine, pendingFutures, false); + + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.Count >= MaximumBacklog) + { + Thread.Sleep(PauseTimeout); + } + } while (pendingFutures.Count >= MaximumBacklog); + } + + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future HandlePendingFutures(engine, pendingFutures, true); } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - rdr.Close(); - fs.Close(); - - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - - Console.WriteLine(); - Console.WriteLine("Records successfully added : " + successCount); - Console.WriteLine("Total entities created : " + entityIDSet.Count); - Console.WriteLine("Records failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine(retryCount + " records to be retried in " + retryFile); - } +{ + rdr.Close(); + fs.Close(); + + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + + Console.WriteLine(); + Console.WriteLine("Records successfully added : " + successCount); + Console.WriteLine("Total entities created : " + entityIDSet.Count); + Console.WriteLine("Records failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine(retryCount + " records to be retried in " + retryFile); + } Console.Out.Flush(); } @@ -185,99 +185,99 @@ TaskScheduler taskScheduler static void HandlePendingFutures(SzEngine engine, IList<(Task, Record)> pendingFutures, bool blocking) -{ - // loop through the pending futures - for (int index = 0; index < pendingFutures.Count; index++) - { - // get the next pending future - (Task task, Record record) = pendingFutures[index]; - - // if not blocking and this one is not done then continue - if (!blocking && !task.IsCompleted) continue; - - // remove the pending future from the list - pendingFutures.RemoveAt(index--); - - try - { - try - { - // this will block if the task is not yet completed, - // however we only get here with a pending task if - // the blocking parameter is true - string info = task.Result; - - // if we get here then increment the success count - successCount++; - - // process the info - ProcessInfo(engine, info); - - } - catch (AggregateException e) - when (e.InnerException is TaskCanceledException - || e.InnerException is ThreadInterruptedException) - { - throw new SzRetryableException(e.InnerException); - } - catch (ThreadInterruptedException e) - { - throw new SzRetryableException(e.InnerException); - } - catch (AggregateException e) - { - if (e.InnerException != null) - { - // get the inner exception - throw e.InnerException; - } - else - { - throw; - } - } - - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, record.LineNumber, record.Line); - errorCount++; // increment the error count - - } - catch (SzRetryableException e) - { - // handle thread interruption and cancellation as retries - LogFailedRecord(Warning, e, record.LineNumber, record.Line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(record.Line); - } - - } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedRecord(Critical, e, record.LineNumber, record.Line); - errorCount++; - throw; // rethrow since exception is critical - } +{ + // loop through the pending futures + for (int index = 0; index < pendingFutures.Count; index++) + { + // get the next pending future + (Task task, Record record) = pendingFutures[index]; + + // if not blocking and this one is not done then continue + if (!blocking && !task.IsCompleted) continue; + + // remove the pending future from the list + pendingFutures.RemoveAt(index--); + + try + { + try + { + // this will block if the task is not yet completed, + // however we only get here with a pending task if + // the blocking parameter is true + string info = task.Result; + + // if we get here then increment the success count + successCount++; + + // process the info + ProcessInfo(engine, info); + + } + catch (AggregateException e) + when (e.InnerException is TaskCanceledException + || e.InnerException is ThreadInterruptedException) + { + throw new SzRetryableException(e.InnerException); + } + catch (ThreadInterruptedException e) + { + throw new SzRetryableException(e.InnerException); + } + catch (AggregateException e) + { + if (e.InnerException != null) + { + // get the inner exception + throw e.InnerException; + } + else + { + throw; + } + } + + } + catch (SzBadInputException e) + { + LogFailedRecord(Error, e, record.LineNumber, record.Line); + errorCount++; // increment the error count + + } + catch (SzRetryableException e) + { + // handle thread interruption and cancellation as retries + LogFailedRecord(Warning, e, record.LineNumber, record.Line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(record.Line); + } + + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedRecord(Critical, e, record.LineNumber, record.Line); + errorCount++; + throw; // rethrow since exception is critical + } } } @@ -295,39 +295,39 @@ static void HandlePendingFutures(SzEngine engine, /// The to use. /// The info message static void ProcessInfo(SzEngine engine, string info) -{ - JsonObject? jsonObject = JsonNode.Parse(info)?.AsObject(); - if (jsonObject == null) return; - if (!jsonObject.ContainsKey(AffectedEntities)) return; - - JsonArray? affectedArr = jsonObject[AffectedEntities]?.AsArray(); - if (affectedArr == null) return; - - for (int index = 0; index < affectedArr.Count; index++) - { - JsonObject? affected = affectedArr[index]?.AsObject(); - long entityID = affected?[EntityID]?.GetValue() ?? 0L; - if (entityID == 0L) continue; - - try - { - engine.GetEntity(entityID, null); - entityIDSet.Add(entityID); - - } - catch (SzNotFoundException) - { - entityIDSet.Remove(entityID); - - } - catch (SzException e) - { - // simply log the exception, do not rethrow - Console.Error.WriteLine(); - Console.Error.WriteLine("**** FAILED TO RETRIEVE ENTITY: " + entityID); - Console.Error.WriteLine(e); - Console.Error.Flush(); - } +{ + JsonObject? jsonObject = JsonNode.Parse(info)?.AsObject(); + if (jsonObject == null) return; + if (!jsonObject.ContainsKey(AffectedEntities)) return; + + JsonArray? affectedArr = jsonObject[AffectedEntities]?.AsArray(); + if (affectedArr == null) return; + + for (int index = 0; index < affectedArr.Count; index++) + { + JsonObject? affected = affectedArr[index]?.AsObject(); + long entityID = affected?[EntityID]?.GetValue() ?? 0L; + if (entityID == 0L) continue; + + try + { + engine.GetEntity(entityID, null); + entityIDSet.Add(entityID); + + } + catch (SzNotFoundException) + { + entityIDSet.Remove(entityID); + + } + catch (SzException e) + { + // simply log the exception, do not rethrow + Console.Error.WriteLine(); + Console.Error.WriteLine("**** FAILED TO RETRIEVE ENTITY: " + entityID); + Console.Error.WriteLine(e); + Console.Error.Flush(); + } } } @@ -345,56 +345,56 @@ static void LogFailedRecord(string errorType, Exception exception, int lineNumber, string recordJson) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine( - "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " - + lineNumber + ": "); - Console.Error.WriteLine(recordJson); - Console.Error.WriteLine(exception); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine( + "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + + lineNumber + ": "); + Console.Error.WriteLine(recordJson); + Console.Error.WriteLine(exception); Console.Error.Flush(); } public partial class Program -{ - private const string DefaultFilePath = "../../resources/data/load-500.jsonl"; - - private const string RetryPrefix = "retry-"; - - private const string RetrySuffix = ".jsonl"; - - private const string DataSource = "DATA_SOURCE"; - - private const string RecordID = "RECORD_ID"; - - private const string AffectedEntities = "AFFECTED_ENTITIES"; - - private const string EntityID = "ENTITY_ID"; - - private const int ThreadCount = 8; - - private const int BacklogFactor = 10; - - private const int MaximumBacklog = ThreadCount * BacklogFactor; - - private const int PauseTimeout = 100; - - private const string Error = "ERROR"; - - private const string Warning = "WARNING"; - - private const string Critical = "CRITICAL"; - - private static int errorCount; - - private static int successCount; - - private static int retryCount; - - private static FileInfo? retryFile; - - private static StreamWriter? retryWriter; - +{ + private const string DefaultFilePath = "../../resources/data/load-500.jsonl"; + + private const string RetryPrefix = "retry-"; + + private const string RetrySuffix = ".jsonl"; + + private const string DataSource = "DATA_SOURCE"; + + private const string RecordID = "RECORD_ID"; + + private const string AffectedEntities = "AFFECTED_ENTITIES"; + + private const string EntityID = "ENTITY_ID"; + + private const int ThreadCount = 8; + + private const int BacklogFactor = 10; + + private const int MaximumBacklog = ThreadCount * BacklogFactor; + + private const int PauseTimeout = 100; + + private const string Error = "ERROR"; + + private const string Warning = "WARNING"; + + private const string Critical = "CRITICAL"; + + private static int errorCount; + + private static int successCount; + + private static int retryCount; + + private static FileInfo? retryFile; + + private static StreamWriter? retryWriter; + private static readonly ISet entityIDSet = new HashSet(); } diff --git a/csharp/snippets/loading/LoadWithStatsViaLoop/Program.cs b/csharp/snippets/loading/LoadWithStatsViaLoop/Program.cs index 5072b19..5249a1a 100644 --- a/csharp/snippets/loading/LoadWithStatsViaLoop/Program.cs +++ b/csharp/snippets/loading/LoadWithStatsViaLoop/Program.cs @@ -13,8 +13,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -36,142 +36,142 @@ // create a reader StreamReader rdr = new StreamReader(fs, Encoding.UTF8); try -{ - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - int lineNumber = 0; - - // loop through the example records and add them to the repository - for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) - { - // increment the line number - lineNumber++; - - // trim the line - line = line.Trim(); - - // skip any blank lines - if (line.Length == 0) continue; - - // skip any commented lines - if (line.StartsWith('#')) continue; - - try - { - // parse the line as a JSON object - JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); - if (recordJson == null) - { - // parsed JSON null - throw new SzBadInputException("Record must be a JSON object: " + line); - } - - // extract the data source code and record ID - string? dataSourceCode = recordJson[DataSource]?.GetValue(); - string? recordID = recordJson[RecordID]?.GetValue(); - - // call the addRecord() function with no flags - engine.AddRecord(dataSourceCode, recordID, line, SzNoFlags); - - successCount++; - - // check if it is time obtain stats - if ((successCount % StatsInterval) == 0) - { - try - { - string stats = engine.GetStats(); - if (stats.Length > StatsTruncate) - { - stats = string.Concat(stats.AsSpan(0, StatsTruncate), " ..."); - } - Console.WriteLine("* STATS: " + stats); - - } - catch (SzException e) - { - // trap the stats exception so it is not misinterpreted - // as an exception from engine.addRecord() - Console.WriteLine("**** FAILED TO OBTAIN STATS: " + e); - } - } - - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, lineNumber, line); - errorCount++; // increment the error count - - } - catch (SzRetryableException e) - { - LogFailedRecord(Warning, e, lineNumber, line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(line); - } - - } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedRecord(Critical, e, lineNumber, line); - errorCount++; - throw; // rethrow since exception is critical - } +{ + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + int lineNumber = 0; + + // loop through the example records and add them to the repository + for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) + { + // increment the line number + lineNumber++; + + // trim the line + line = line.Trim(); + + // skip any blank lines + if (line.Length == 0) continue; + + // skip any commented lines + if (line.StartsWith('#')) continue; + + try + { + // parse the line as a JSON object + JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); + if (recordJson == null) + { + // parsed JSON null + throw new SzBadInputException("Record must be a JSON object: " + line); + } + + // extract the data source code and record ID + string? dataSourceCode = recordJson[DataSource]?.GetValue(); + string? recordID = recordJson[RecordID]?.GetValue(); + + // call the addRecord() function with no flags + engine.AddRecord(dataSourceCode, recordID, line, SzNoFlags); + + successCount++; + + // check if it is time obtain stats + if ((successCount % StatsInterval) == 0) + { + try + { + string stats = engine.GetStats(); + if (stats.Length > StatsTruncate) + { + stats = string.Concat(stats.AsSpan(0, StatsTruncate), " ..."); + } + Console.WriteLine("* STATS: " + stats); + + } + catch (SzException e) + { + // trap the stats exception so it is not misinterpreted + // as an exception from engine.addRecord() + Console.WriteLine("**** FAILED TO OBTAIN STATS: " + e); + } + } + + } + catch (SzBadInputException e) + { + LogFailedRecord(Error, e, lineNumber, line); + errorCount++; // increment the error count + + } + catch (SzRetryableException e) + { + LogFailedRecord(Warning, e, lineNumber, line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(line); + } + + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedRecord(Critical, e, lineNumber, line); + errorCount++; + throw; // rethrow since exception is critical + } } } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - rdr.Close(); - - fs.Close(); - - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - - Console.WriteLine(); - Console.WriteLine("Records successfully added : " + successCount); - Console.WriteLine("Records failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine(retryCount + " records to be retried in " + retryFile); - } +{ + rdr.Close(); + + fs.Close(); + + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + + Console.WriteLine(); + Console.WriteLine("Records successfully added : " + successCount); + Console.WriteLine("Records failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine(retryCount + " records to be retried in " + retryFile); + } Console.Out.Flush(); } @@ -189,41 +189,41 @@ static void LogFailedRecord(string errorType, Exception exception, int lineNumber, string recordJson) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine( - "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " - + lineNumber + ": "); - Console.Error.WriteLine(recordJson); - Console.Error.WriteLine(exception); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine( + "** " + errorType + " ** FAILED TO ADD RECORD AT LINE " + + lineNumber + ": "); + Console.Error.WriteLine(recordJson); + Console.Error.WriteLine(exception); Console.Error.Flush(); } public partial class Program -{ - private const string DefaultFilePath = "../../resources/data/load-500.jsonl"; - - private const string RetryPrefix = "retry-"; - - private const string RetrySuffix = ".jsonl"; - - private const string DataSource = "DATA_SOURCE"; - - private const string RecordID = "RECORD_ID"; - - private const string Error = "ERROR"; - - private const string Warning = "WARNING"; - - private const string Critical = "CRITICAL"; - - private const int StatsInterval = 100; - - private const int StatsTruncate = 70; - - private static int errorCount; - private static int successCount; - private static int retryCount; - private static FileInfo? retryFile; +{ + private const string DefaultFilePath = "../../resources/data/load-500.jsonl"; + + private const string RetryPrefix = "retry-"; + + private const string RetrySuffix = ".jsonl"; + + private const string DataSource = "DATA_SOURCE"; + + private const string RecordID = "RECORD_ID"; + + private const string Error = "ERROR"; + + private const string Warning = "WARNING"; + + private const string Critical = "CRITICAL"; + + private const int StatsInterval = 100; + + private const int StatsTruncate = 70; + + private static int errorCount; + private static int successCount; + private static int retryCount; + private static FileInfo? retryFile; private static StreamWriter? retryWriter; } \ No newline at end of file diff --git a/csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs b/csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs index e56868e..6859175 100644 --- a/csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs +++ b/csharp/snippets/redo/LoadWithRedoViaLoop/Program.cs @@ -13,8 +13,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -30,145 +30,144 @@ .Build(); try -{ - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - // loop through the input files - foreach (string filePath in InputFiles) - { - FileStream fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); - - StreamReader rdr = new StreamReader(fs, Encoding.UTF8); - - try - { - int lineNumber = 0; - // loop through the example records and add them to the repository - for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) - { - // increment the line number - lineNumber++; - - // trim the line - line = line.Trim(); - - // skip any blank lines - if (line.Length == 0) continue; - - // skip any commented lines - if (line.StartsWith('#')) continue; - - try - { - // parse the line as a JSON object - JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); - if (recordJson == null) - { - // parsed JSON null - throw new SzBadInputException("Record must be a JSON object: " + line); - } - - // extract the data source code and record ID - string? dataSourceCode = recordJson[DataSource]?.GetValue(); - string? recordID = recordJson[RecordID]?.GetValue(); - - // call the addRecord() function with info flags - engine.AddRecord(dataSourceCode, recordID, line, SzNoFlags); - - successCount++; - } - catch (SzBadInputException e) - { - LogFailedRecord(Error, e, filePath, lineNumber, line); - errorCount++; // increment the error count - - } - catch (SzRetryableException e) - { - LogFailedRecord(Warning, e, filePath, lineNumber, line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - TrackRetryRecord(line); - - } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedRecord(Critical, e, filePath, lineNumber, line); - errorCount++; - throw; // rethrow since exception is critical - } - } - } - finally - { - rdr.Close(); - fs.Close(); - } - } - - // now that we have loaded the records, check for redos and handle them - while (engine.CountRedoRecords() > 0) - { - // get the next redo record - string redo = engine.GetRedoRecord(); - - try - { - // process the redo record - engine.ProcessRedoRecord(redo, SzNoFlags); - - // increment the redone count - redoneCount++; - - } - catch (SzRetryableException e) - { - LogFailedRedo(Warning, e, redo); - errorCount++; - retryCount++; - TrackRetryRecord(redo); - - } - catch (Exception e) - { - LogFailedRedo(Critical, e, redo); - errorCount++; - throw; - } +{ + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + // loop through the input files + foreach (string filePath in InputFiles) + { + FileStream fs = new FileStream(filePath, FileMode.Open, FileAccess.Read); + + StreamReader rdr = new StreamReader(fs, Encoding.UTF8); + + try + { + int lineNumber = 0; + // loop through the example records and add them to the repository + for (string? line = rdr.ReadLine(); line != null; line = rdr.ReadLine()) + { + // increment the line number + lineNumber++; + + // trim the line + line = line.Trim(); + + // skip any blank lines + if (line.Length == 0) continue; + + // skip any commented lines + if (line.StartsWith('#')) continue; + + try + { + // parse the line as a JSON object + JsonObject? recordJson = JsonNode.Parse(line)?.AsObject(); + if (recordJson == null) + { + // parsed JSON null + throw new SzBadInputException("Record must be a JSON object: " + line); + } + + // extract the data source code and record ID + string? dataSourceCode = recordJson[DataSource]?.GetValue(); + string? recordID = recordJson[RecordID]?.GetValue(); + + // call the addRecord() function with info flags + engine.AddRecord(dataSourceCode, recordID, line, SzNoFlags); + + successCount++; + } + catch (SzBadInputException e) + { + LogFailedRecord(Error, e, filePath, lineNumber, line); + errorCount++; // increment the error count + + } + catch (SzRetryableException e) + { + LogFailedRecord(Warning, e, filePath, lineNumber, line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + TrackRetryRecord(line); + + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedRecord(Critical, e, filePath, lineNumber, line); + errorCount++; + throw; // rethrow since exception is critical + } + } + } + finally + { + rdr.Close(); + fs.Close(); + } + } + + // now that we have loaded the records, check for redos and handle them + for (string redo = engine.GetRedoRecord(); + redo != null; + redo = engine.GetRedoRecord()) + { + try + { + // process the redo record + engine.ProcessRedoRecord(redo, SzNoFlags); + + // increment the redone count + redoneCount++; + + } + catch (SzRetryableException e) + { + LogFailedRedo(Warning, e, redo); + errorCount++; + retryCount++; + TrackRetryRecord(redo); + + } + catch (Exception e) + { + LogFailedRedo(Critical, e, redo); + errorCount++; + throw; + } } } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - - Console.WriteLine(); - Console.WriteLine("Records successfully added : " + successCount); - Console.WriteLine("Redos successfully processed : " + redoneCount); - Console.WriteLine("Records failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine(retryCount + " records to be retried in " + retryFile); - } +{ + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + + Console.WriteLine(); + Console.WriteLine("Records successfully added : " + successCount); + Console.WriteLine("Redos successfully processed : " + redoneCount); + Console.WriteLine("Records failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine(retryCount + " records to be retried in " + retryFile); + } Console.Out.Flush(); } @@ -180,24 +179,24 @@ /// The JSON text defining the record to be retried /// static void TrackRetryRecord(string recordJson) -{ - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(recordJson); +{ + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(recordJson); } } @@ -217,15 +216,15 @@ static void LogFailedRecord(string errorType, string filePath, int lineNumber, string recordJson) -{ - string fileName = Path.GetFileName(filePath); - - Console.Error.WriteLine(); - Console.Error.WriteLine( - "** " + errorType + " ** FAILED TO ADD RECORD IN " + fileName - + " AT LINE " + lineNumber + ": "); - Console.Error.WriteLine(recordJson); - Console.Error.WriteLine(exception); +{ + string fileName = Path.GetFileName(filePath); + + Console.Error.WriteLine(); + Console.Error.WriteLine( + "** " + errorType + " ** FAILED TO ADD RECORD IN " + fileName + + " AT LINE " + lineNumber + ": "); + Console.Error.WriteLine(recordJson); + Console.Error.WriteLine(exception); Console.Error.Flush(); } @@ -239,36 +238,36 @@ static void LogFailedRecord(string errorType, static void LogFailedRedo(string errorType, Exception exception, string redoRecord) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); - Console.Error.WriteLine(redoRecord); - Console.Error.WriteLine(exception); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); + Console.Error.WriteLine(redoRecord); + Console.Error.WriteLine(exception); Console.Error.Flush(); } public partial class Program -{ - private static readonly IList InputFiles = new ReadOnlyCollection( +{ + private static readonly IList InputFiles = new ReadOnlyCollection( new string[] { "../../resources/data/truthset/customers.jsonl", "../../resources/data/truthset/reference.jsonl", - "../../resources/data/truthset/watchlist.jsonl" - }); - - private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; - private const string DataSource = "DATA_SOURCE"; - private const string RecordID = "RECORD_ID"; - private const string Error = "ERROR"; - private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; - - // setup some class-wide variables - private static int errorCount; - private static int successCount; - private static int redoneCount; - private static int retryCount; - private static FileInfo? retryFile; + "../../resources/data/truthset/watchlist.jsonl" + }); + + private const string RetryPrefix = "retry-"; + private const string RetrySuffix = ".jsonl"; + private const string DataSource = "DATA_SOURCE"; + private const string RecordID = "RECORD_ID"; + private const string Error = "ERROR"; + private const string Warning = "WARNING"; + private const string Critical = "CRITICAL"; + + // setup some class-wide variables + private static int errorCount; + private static int successCount; + private static int redoneCount; + private static int retryCount; + private static FileInfo? retryFile; private static StreamWriter? retryWriter; } diff --git a/csharp/snippets/redo/RedoContinuous/Program.cs b/csharp/snippets/redo/RedoContinuous/Program.cs index 550c313..2f43e33 100644 --- a/csharp/snippets/redo/RedoContinuous/Program.cs +++ b/csharp/snippets/redo/RedoContinuous/Program.cs @@ -14,8 +14,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -31,102 +31,102 @@ .Build(); AppDomain.CurrentDomain.ProcessExit += (s, e) => -{ - // IMPORTANT: make sure to destroy the environment - env.Destroy(); +{ + // IMPORTANT: make sure to destroy the environment + env.Destroy(); OutputRedoStatistics(); }; try -{ - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - while (true) - { - // get the next redo record - string redo = engine.GetRedoRecord(); - - // check if no redo records are available - if (redo == null) - { - OutputRedoStatistics(); - Console.WriteLine(); - Console.WriteLine( - "No redo records to process. Pausing for " - + RedoPauseDescription + "...."); - Console.WriteLine("Press CTRL-C to exit."); - try - { - Thread.Sleep(RedoPauseTimeout); - } - catch (ThreadInterruptedException) - { - // ignore the exception - } - continue; - } - - try - { - // process the redo record - engine.ProcessRedoRecord(redo, SzNoFlags); - - // increment the redone count - redoneCount++; - - } - catch (SzRetryableException e) - { - LogFailedRedo(Warning, e, redo); - errorCount++; - retryCount++; - TrackRetryRecord(redo); - - } - catch (Exception e) - { - LogFailedRedo(Critical, e, redo); - errorCount++; - throw; - } +{ + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + while (true) + { + // get the next redo record + string redo = engine.GetRedoRecord(); + + // check if no redo records are available + if (redo == null) + { + OutputRedoStatistics(); + Console.WriteLine(); + Console.WriteLine( + "No redo records to process. Pausing for " + + RedoPauseDescription + "...."); + Console.WriteLine("Press CTRL-C to exit."); + try + { + Thread.Sleep(RedoPauseTimeout); + } + catch (ThreadInterruptedException) + { + // ignore the exception + } + continue; + } + + try + { + // process the redo record + engine.ProcessRedoRecord(redo, SzNoFlags); + + // increment the redone count + redoneCount++; + + } + catch (SzRetryableException e) + { + LogFailedRedo(Warning, e, redo); + errorCount++; + retryCount++; + TrackRetryRecord(redo); + + } + catch (Exception e) + { + LogFailedRedo(Critical, e, redo); + errorCount++; + throw; + } } } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - // normally we would call env.destroy() here, but we have registered - // a shutdown hook to do that since termination will typically occur - // via CTRL-C being pressed, and the shutdown hook will still run if - // we get an exception +{ + // normally we would call env.destroy() here, but we have registered + // a shutdown hook to do that since termination will typically occur + // via CTRL-C being pressed, and the shutdown hook will still run if + // we get an exception } static void OutputRedoStatistics() -{ - Console.WriteLine(); - Console.WriteLine("Redos successfully processed : " + redoneCount); - Console.WriteLine("Total failed records/redos : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine( - retryCount + " records/redos to be retried in " + retryFile); - } +{ + Console.WriteLine(); + Console.WriteLine("Redos successfully processed : " + redoneCount); + Console.WriteLine("Total failed records/redos : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine( + retryCount + " records/redos to be retried in " + retryFile); + } Console.Out.Flush(); } @@ -138,24 +138,24 @@ static void OutputRedoStatistics() /// The JSON text defining the record to be retried /// static void TrackRetryRecord(string recordJson) -{ - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(recordJson); +{ + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(recordJson); } } @@ -169,30 +169,30 @@ static void TrackRetryRecord(string recordJson) static void LogFailedRedo(string errorType, Exception exception, string redoRecord) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); - Console.Error.WriteLine(redoRecord); - Console.Error.WriteLine(exception); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); + Console.Error.WriteLine(redoRecord); + Console.Error.WriteLine(exception); Console.Error.Flush(); } public partial class Program -{ - private const string RedoPauseDescription = "30 seconds"; - - private const int RedoPauseTimeout = 30000; - - private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; - private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; - - // setup some class-wide variables - private static int errorCount; - private static int redoneCount; - private static int retryCount; - private static FileInfo? retryFile; +{ + private const string RedoPauseDescription = "30 seconds"; + + private const int RedoPauseTimeout = 30000; + + private const string RetryPrefix = "retry-"; + private const string RetrySuffix = ".jsonl"; + private const string Warning = "WARNING"; + private const string Critical = "CRITICAL"; + + // setup some class-wide variables + private static int errorCount; + private static int redoneCount; + private static int retryCount; + private static FileInfo? retryFile; private static StreamWriter? retryWriter; } diff --git a/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs b/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs index 47abcb3..5d34686 100644 --- a/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs +++ b/csharp/snippets/redo/RedoContinuousViaFutures/Program.cs @@ -16,8 +16,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -47,184 +47,187 @@ TaskScheduler taskScheduler IList<(Task, string)> pendingFutures = new List<(Task, string)>(MaximumBacklog); AppDomain.CurrentDomain.ProcessExit += (s, e) => -{ +{ #pragma warning disable CA1031 // Need to catch all exceptions here - try - { - HandlePendingFutures(pendingFutures, true); - } - catch (Exception exception) - { - Console.Error.WriteLine(exception); - } + try + { + HandlePendingFutures(pendingFutures, true); + } + catch (Exception exception) + { + Console.Error.WriteLine(exception); + } #pragma warning restore CA1031 // Need to catch all exceptions here - - // IMPORTANT: make sure to destroy the environment - env.Destroy(); + + // IMPORTANT: make sure to destroy the environment + env.Destroy(); OutputRedoStatistics(); }; try -{ - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - while (true) - { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.Count < MaximumBacklog) - { - - // get the next redo record - string redo = engine.GetRedoRecord(); - - // check if no redo records are available - if (redo == null) break; - - Task task = factory.StartNew(() => - { - engine.ProcessRedoRecord(redo, SzNoFlags); - }, - CancellationToken.None, - TaskCreationOptions.None, - taskScheduler); - - // add the future to the pending future list - pendingFutures.Add((task, redo)); - } - - do - { - // handle any pending futures WITHOUT blocking to reduce the backlog - HandlePendingFutures(pendingFutures, false); - - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.Count >= MaximumBacklog) - { - try - { - Thread.Sleep(HandlePauseTimeout); - - } - catch (ThreadInterruptedException) - { - // do nothing - } - } - } while (pendingFutures.Count >= MaximumBacklog); - - // check if there are no redo records right now - if (engine.CountRedoRecords() == 0) - { - OutputRedoStatistics(); - Console.WriteLine(); - Console.WriteLine( - "No redo records to process. Pausing for " - + RedoPauseDescription + "...."); - Console.WriteLine("Press CTRL-C to exit."); - try - { - Thread.Sleep(RedoPauseTimeout); - } - catch (ThreadInterruptedException) - { - // ignore the exception - } - continue; - } +{ + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + while (true) + { + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.Count < MaximumBacklog) + { + + // get the next redo record + string redo = engine.GetRedoRecord(); + + // check if no redo records are available + if (redo == null) break; + + Task task = factory.StartNew(() => + { + engine.ProcessRedoRecord(redo, SzNoFlags); + }, + CancellationToken.None, + TaskCreationOptions.None, + taskScheduler); + + // add the future to the pending future list + pendingFutures.Add((task, redo)); + } + + do + { + // handle any pending futures WITHOUT blocking to reduce the backlog + HandlePendingFutures(pendingFutures, false); + + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.Count >= MaximumBacklog) + { + try + { + Thread.Sleep(HandlePauseTimeout); + + } + catch (ThreadInterruptedException) + { + // do nothing + } + } + } while (pendingFutures.Count >= MaximumBacklog); + + // check if there are no redo records right now + // NOTE: we do NOT want to call countRedoRecords() in a loop that + // is processing redo records, we call it here AFTER we believe + // have processed all pending redos to confirm still zero + if (engine.CountRedoRecords() == 0) + { + OutputRedoStatistics(); + Console.WriteLine(); + Console.WriteLine( + "No redo records to process. Pausing for " + + RedoPauseDescription + "...."); + Console.WriteLine("Press CTRL-C to exit."); + try + { + Thread.Sleep(RedoPauseTimeout); + } + catch (ThreadInterruptedException) + { + // ignore the exception + } + continue; + } } } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - // normally we would call env.destroy() here, but we have registered - // a shutdown hook to do that since termination will typically occur - // via CTRL-C being pressed, and the shutdown hook will still run if - // we get an exception +{ + // normally we would call env.destroy() here, but we have registered + // a shutdown hook to do that since termination will typically occur + // via CTRL-C being pressed, and the shutdown hook will still run if + // we get an exception } static void HandlePendingFutures(IList<(Task, string)> pendingFutures, bool blocking) -{ - // loop through the pending futures - for (int index = 0; index < pendingFutures.Count; index++) - { - // get the next pending future - (Task task, string redoRecord) = pendingFutures[index]; - - // if not blocking and this one is not done then continue - if (!blocking && !task.IsCompleted) continue; - - // remove the pending future from the list - pendingFutures.RemoveAt(index--); - - try - { - try - { - // wait for completion -- if non-blocking then this - // task is already completed and this will just - // throw any exception that might have occurred - if (blocking && !task.IsCompleted) - { - task.Wait(); - } - - // if we get here then increment the success count - redoneCount++; - - } - catch (AggregateException e) - when (e.InnerException is TaskCanceledException - || e.InnerException is ThreadInterruptedException) - { - throw new SzRetryableException(e.InnerException); - } - catch (ThreadInterruptedException e) - { - throw new SzRetryableException(e.InnerException); - } - catch (AggregateException e) - { - if (e.InnerException != null) - { - // get the inner exception - throw e.InnerException; - } - else - { - throw; - } - } - - } - catch (SzRetryableException e) - { - // handle thread interruption and cancellation as retries - LogFailedRedo(Warning, e, redoRecord); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - TrackRetryRecord(redoRecord); - } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedRedo(Critical, e, redoRecord); - errorCount++; - throw; // rethrow since exception is critical - } +{ + // loop through the pending futures + for (int index = 0; index < pendingFutures.Count; index++) + { + // get the next pending future + (Task task, string redoRecord) = pendingFutures[index]; + + // if not blocking and this one is not done then continue + if (!blocking && !task.IsCompleted) continue; + + // remove the pending future from the list + pendingFutures.RemoveAt(index--); + + try + { + try + { + // wait for completion -- if non-blocking then this + // task is already completed and this will just + // throw any exception that might have occurred + if (blocking && !task.IsCompleted) + { + task.Wait(); + } + + // if we get here then increment the success count + redoneCount++; + + } + catch (AggregateException e) + when (e.InnerException is TaskCanceledException + || e.InnerException is ThreadInterruptedException) + { + throw new SzRetryableException(e.InnerException); + } + catch (ThreadInterruptedException e) + { + throw new SzRetryableException(e.InnerException); + } + catch (AggregateException e) + { + if (e.InnerException != null) + { + // get the inner exception + throw e.InnerException; + } + else + { + throw; + } + } + + } + catch (SzRetryableException e) + { + // handle thread interruption and cancellation as retries + LogFailedRedo(Warning, e, redoRecord); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + TrackRetryRecord(redoRecord); + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedRedo(Critical, e, redoRecord); + errorCount++; + throw; // rethrow since exception is critical + } } } @@ -236,24 +239,24 @@ static void HandlePendingFutures(IList<(Task, string)> pendingFutures, bool bloc /// The JSON text defining the record to be retried /// static void TrackRetryRecord(string recordJson) -{ - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(recordJson); +{ + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(recordJson); } } @@ -267,58 +270,58 @@ static void TrackRetryRecord(string recordJson) static void LogFailedRedo(string errorType, Exception exception, string redoRecord) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); - Console.Error.WriteLine(redoRecord); - Console.Error.WriteLine(exception); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); + Console.Error.WriteLine(redoRecord); + Console.Error.WriteLine(exception); Console.Error.Flush(); } static void OutputRedoStatistics() -{ - Console.WriteLine(); - Console.WriteLine("Redos successfully processed : " + redoneCount); - Console.WriteLine("Total failed records/redos : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine( - retryCount + " records/redos to be retried in " + retryFile); - } +{ + Console.WriteLine(); + Console.WriteLine("Redos successfully processed : " + redoneCount); + Console.WriteLine("Total failed records/redos : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine( + retryCount + " records/redos to be retried in " + retryFile); + } Console.Out.Flush(); } public partial class Program -{ - private const string RedoPauseDescription = "30 seconds"; - - private const int RedoPauseTimeout = 30000; - - private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; - private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; - - // setup some class-wide variables - private static int errorCount; - private static int redoneCount; - private static int retryCount; - private static FileInfo? retryFile; - private static StreamWriter? retryWriter; - - private const int ThreadCount = 8; - - private const int BacklogFactor = 10; - - private const int MaximumBacklog = ThreadCount * BacklogFactor; - +{ + private const string RedoPauseDescription = "30 seconds"; + + private const int RedoPauseTimeout = 30000; + + private const string RetryPrefix = "retry-"; + private const string RetrySuffix = ".jsonl"; + private const string Warning = "WARNING"; + private const string Critical = "CRITICAL"; + + // setup some class-wide variables + private static int errorCount; + private static int redoneCount; + private static int retryCount; + private static FileInfo? retryFile; + private static StreamWriter? retryWriter; + + private const int ThreadCount = 8; + + private const int BacklogFactor = 10; + + private const int MaximumBacklog = ThreadCount * BacklogFactor; + private const int HandlePauseTimeout = 100; } diff --git a/csharp/snippets/redo/RedoWithInfoContinuous/Program.cs b/csharp/snippets/redo/RedoWithInfoContinuous/Program.cs index d7441aa..3c6691f 100644 --- a/csharp/snippets/redo/RedoWithInfoContinuous/Program.cs +++ b/csharp/snippets/redo/RedoWithInfoContinuous/Program.cs @@ -14,8 +14,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -31,106 +31,106 @@ .Build(); AppDomain.CurrentDomain.ProcessExit += (s, e) => -{ - // IMPORTANT: make sure to destroy the environment - env.Destroy(); +{ + // IMPORTANT: make sure to destroy the environment + env.Destroy(); OutputRedoStatistics(); }; try -{ - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - while (true) - { - // get the next redo record - string redo = engine.GetRedoRecord(); - - // check if no redo records are available - if (redo == null) - { - OutputRedoStatistics(); - Console.WriteLine(); - Console.WriteLine( - "No redo records to process. Pausing for " - + RedoPauseDescription + "...."); - Console.WriteLine("Press CTRL-C to exit."); - try - { - Thread.Sleep(RedoPauseTimeout); - } - catch (ThreadInterruptedException) - { - // ignore the exception - } - continue; - } - - try - { - // process the redo record - string info = engine.ProcessRedoRecord(redo, SzWithInfo); - - // increment the redone count - redoneCount++; - - // process the info - ProcessInfo(engine, info); - - } - catch (SzRetryableException e) - { - LogFailedRedo(Warning, e, redo); - errorCount++; - retryCount++; - TrackRetryRecord(redo); - - } - catch (Exception e) - { - LogFailedRedo(Critical, e, redo); - errorCount++; - throw; - } +{ + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + while (true) + { + // get the next redo record + string redo = engine.GetRedoRecord(); + + // check if no redo records are available + if (redo == null) + { + OutputRedoStatistics(); + Console.WriteLine(); + Console.WriteLine( + "No redo records to process. Pausing for " + + RedoPauseDescription + "...."); + Console.WriteLine("Press CTRL-C to exit."); + try + { + Thread.Sleep(RedoPauseTimeout); + } + catch (ThreadInterruptedException) + { + // ignore the exception + } + continue; + } + + try + { + // process the redo record + string info = engine.ProcessRedoRecord(redo, SzWithInfo); + + // increment the redone count + redoneCount++; + + // process the info + ProcessInfo(engine, info); + + } + catch (SzRetryableException e) + { + LogFailedRedo(Warning, e, redo); + errorCount++; + retryCount++; + TrackRetryRecord(redo); + + } + catch (Exception e) + { + LogFailedRedo(Critical, e, redo); + errorCount++; + throw; + } } } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - // normally we would call env.destroy() here, but we have registered - // a shutdown hook to do that since termination will typically occur - // via CTRL-C being pressed, and the shutdown hook will still run if - // we get an exception +{ + // normally we would call env.destroy() here, but we have registered + // a shutdown hook to do that since termination will typically occur + // via CTRL-C being pressed, and the shutdown hook will still run if + // we get an exception } static void OutputRedoStatistics() -{ - Console.WriteLine(); - Console.WriteLine("Redos successfully processed : " + redoneCount); - Console.WriteLine("Total entities affected : " + entityIDSet.Count); - Console.WriteLine("Total failed records/redos : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine( - retryCount + " records/redos to be retried in " + retryFile); - } +{ + Console.WriteLine(); + Console.WriteLine("Redos successfully processed : " + redoneCount); + Console.WriteLine("Total entities affected : " + entityIDSet.Count); + Console.WriteLine("Total failed records/redos : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine( + retryCount + " records/redos to be retried in " + retryFile); + } Console.Out.Flush(); } @@ -142,24 +142,24 @@ static void OutputRedoStatistics() /// The JSON text defining the record to be retried /// static void TrackRetryRecord(string recordJson) -{ - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(recordJson); +{ + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(recordJson); } } @@ -177,37 +177,37 @@ static void TrackRetryRecord(string recordJson) /// The to use. /// The info message static void ProcessInfo(SzEngine engine, string info) -{ - JsonObject? jsonObject = JsonNode.Parse(info)?.AsObject(); - if (jsonObject == null) return; - if (!jsonObject.ContainsKey(AffectedEntities)) return; - - JsonArray? affectedArr = jsonObject[AffectedEntities]?.AsArray(); - if (affectedArr == null) return; - - for (int index = 0; index < affectedArr.Count; index++) - { - JsonObject? affected = affectedArr[index]?.AsObject(); - long entityID = affected?[EntityID]?.GetValue() ?? 0L; - if (entityID == 0L) continue; - - try - { - engine.GetEntity(entityID, null); - entityIDSet.Add(entityID); - } - catch (SzNotFoundException) - { - entityIDSet.Remove(entityID); - } - catch (SzException e) - { - // simply log the exception, do not rethrow - Console.Error.WriteLine(); - Console.Error.WriteLine("**** FAILED TO RETRIEVE ENTITY: " + entityID); - Console.Error.WriteLine(e); - Console.Error.Flush(); - } +{ + JsonObject? jsonObject = JsonNode.Parse(info)?.AsObject(); + if (jsonObject == null) return; + if (!jsonObject.ContainsKey(AffectedEntities)) return; + + JsonArray? affectedArr = jsonObject[AffectedEntities]?.AsArray(); + if (affectedArr == null) return; + + for (int index = 0; index < affectedArr.Count; index++) + { + JsonObject? affected = affectedArr[index]?.AsObject(); + long entityID = affected?[EntityID]?.GetValue() ?? 0L; + if (entityID == 0L) continue; + + try + { + engine.GetEntity(entityID, null); + entityIDSet.Add(entityID); + } + catch (SzNotFoundException) + { + entityIDSet.Remove(entityID); + } + catch (SzException e) + { + // simply log the exception, do not rethrow + Console.Error.WriteLine(); + Console.Error.WriteLine("**** FAILED TO RETRIEVE ENTITY: " + entityID); + Console.Error.WriteLine(e); + Console.Error.Flush(); + } } } @@ -221,33 +221,33 @@ static void ProcessInfo(SzEngine engine, string info) static void LogFailedRedo(string errorType, Exception exception, string redoRecord) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); - Console.Error.WriteLine(redoRecord); - Console.Error.WriteLine(exception); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: "); + Console.Error.WriteLine(redoRecord); + Console.Error.WriteLine(exception); Console.Error.Flush(); } public partial class Program -{ - private const string RedoPauseDescription = "30 seconds"; - - private const int RedoPauseTimeout = 30000; - - private const string RetryPrefix = "retry-"; - private const string RetrySuffix = ".jsonl"; - private const string Warning = "WARNING"; - private const string Critical = "CRITICAL"; - private const string AffectedEntities = "AFFECTED_ENTITIES"; - private const string EntityID = "ENTITY_ID"; - - // setup some class-wide variables - private static int errorCount; - private static int redoneCount; - private static int retryCount; - private static FileInfo? retryFile; - private static StreamWriter? retryWriter; +{ + private const string RedoPauseDescription = "30 seconds"; + + private const int RedoPauseTimeout = 30000; + + private const string RetryPrefix = "retry-"; + private const string RetrySuffix = ".jsonl"; + private const string Warning = "WARNING"; + private const string Critical = "CRITICAL"; + private const string AffectedEntities = "AFFECTED_ENTITIES"; + private const string EntityID = "ENTITY_ID"; + + // setup some class-wide variables + private static int errorCount; + private static int redoneCount; + private static int retryCount; + private static FileInfo? retryFile; + private static StreamWriter? retryWriter; private static readonly ISet entityIDSet = new HashSet(); } diff --git a/csharp/snippets/searching/SearchRecords/Program.cs b/csharp/snippets/searching/SearchRecords/Program.cs index 8cec146..69d3a73 100644 --- a/csharp/snippets/searching/SearchRecords/Program.cs +++ b/csharp/snippets/searching/SearchRecords/Program.cs @@ -12,8 +12,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -29,67 +29,67 @@ .Build(); try -{ - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - // loop through the example records and add them to the repository - foreach (string criteria in GetSearchCriteria()) - { - // call the searchByAttributes() function with default flags - string result = engine.SearchByAttributes( - criteria, SzSearchByAttributesDefaultFlags); - - JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); - - Console.WriteLine(); - JsonArray? jsonArr = jsonObj?["RESOLVED_ENTITIES"]?.AsArray(); - if (jsonArr == null || jsonArr.Count == 0) - { - Console.WriteLine("No results for criteria: " + criteria); - } - else - { - Console.WriteLine("Results for criteria: " + criteria); - for (int index = 0; index < jsonArr.Count; index++) - { - JsonObject? obj = jsonArr[index]?.AsObject(); - obj = obj?["ENTITY"]?.AsObject(); - obj = obj?["RESOLVED_ENTITY"]?.AsObject(); - if (obj == null) - { - throw new JsonException("Unexpected result format: " + result); - } - long? entityID = obj["ENTITY_ID"]?.GetValue(); - string? name = obj["ENTITY_NAME"]?.GetValue(); - Console.WriteLine(entityID + ": " + name); - } - } - Console.Out.Flush(); +{ + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + // loop through the example records and add them to the repository + foreach (string criteria in GetSearchCriteria()) + { + // call the searchByAttributes() function with default flags + string result = engine.SearchByAttributes( + criteria, SzSearchByAttributesDefaultFlags); + + JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); + + Console.WriteLine(); + JsonArray? jsonArr = jsonObj?["RESOLVED_ENTITIES"]?.AsArray(); + if (jsonArr == null || jsonArr.Count == 0) + { + Console.WriteLine("No results for criteria: " + criteria); + } + else + { + Console.WriteLine("Results for criteria: " + criteria); + for (int index = 0; index < jsonArr.Count; index++) + { + JsonObject? obj = jsonArr[index]?.AsObject(); + obj = obj?["ENTITY"]?.AsObject(); + obj = obj?["RESOLVED_ENTITY"]?.AsObject(); + if (obj == null) + { + throw new JsonException("Unexpected result format: " + result); + } + long? entityID = obj["ENTITY_ID"]?.GetValue(); + string? name = obj["ENTITY_NAME"]?.GetValue(); + Console.WriteLine(entityID + ": " + name); + } + } + Console.Out.Flush(); } } catch (SzException e) -{ - // handle any exception that may have occurred - Console.Error.WriteLine("Senzing Error Message : " + e.Message); - Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); - Console.Error.WriteLine(e); +{ + // handle any exception that may have occurred + Console.Error.WriteLine("Senzing Error Message : " + e.Message); + Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); + Console.Error.WriteLine(e); throw; } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - // IMPORTANT: make sure to destroy the environment +{ + // IMPORTANT: make sure to destroy the environment env.Destroy(); } @@ -102,34 +102,34 @@ /// sets of criteria with which to search. /// static IList GetSearchCriteria() -{ - IList records = new List(); - records.Add( +{ + IList records = new List(); + records.Add( """ { "NAME_FULL": "Susan Moony", "DATE_OF_BIRTH": "15/6/1998", "SSN_NUMBER": "521212123" } - """); - - records.Add( + """); + + records.Add( """ { "NAME_FIRST": "Robert", "NAME_LAST": "Smith", "ADDR_FULL": "123 Main Street Las Vegas NV 89132" } - """); - - records.Add( + """); + + records.Add( """ { "NAME_FIRST": "Makio", "NAME_LAST": "Yamanaka", "ADDR_FULL": "787 Rotary Drive Rotorville FL 78720" } - """); - + """); + return records; } \ No newline at end of file diff --git a/csharp/snippets/searching/SearchViaFutures/Program.cs b/csharp/snippets/searching/SearchViaFutures/Program.cs index 4285514..e824eaa 100644 --- a/csharp/snippets/searching/SearchViaFutures/Program.cs +++ b/csharp/snippets/searching/SearchViaFutures/Program.cs @@ -17,8 +17,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -55,228 +55,228 @@ TaskScheduler taskScheduler // create a reader StreamReader rdr = new StreamReader(fs, Encoding.UTF8); try -{ - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - int lineNumber = 0; - bool eof = false; - - while (!eof) - { - // loop through the example records and queue them up so long - // as we have more records and backlog is not too large - while (pendingFutures.Count < MaximumBacklog) - { - // read the next line - string? line = rdr.ReadLine(); - lineNumber++; - - // check for EOF - if (line == null) - { - eof = true; - break; - } - - // trim the line - line = line.Trim(); - - // skip any blank lines - if (line.Length == 0) continue; - - // skip any commented lines - if (line.StartsWith('#')) continue; - - // construct the Record instance - Criteria criteria = new Criteria(lineNumber, line); - - try - { - Task task = factory.StartNew(() => - { - // call the addRecord() function with no flags - return engine.SearchByAttributes( - criteria.Line, SzSearchByAttributesDefaultFlags); - }, - CancellationToken.None, - TaskCreationOptions.None, - taskScheduler); - - // add the future to the pending future list - pendingFutures.Add((task, criteria)); - - } - catch (SzBadInputException e) - { - LogFailedSearch(Error, e, lineNumber, line); - errorCount++; // increment the error count - } - } - - do - { - // handle any pending futures WITHOUT blocking to reduce the backlog - HandlePendingFutures(pendingFutures, false); - - // if we still have exceeded the backlog size then pause - // briefly before trying again - if (pendingFutures.Count >= MaximumBacklog) - { - Thread.Sleep(PauseTimeout); - } - } while (pendingFutures.Count >= MaximumBacklog); - } - - // after we have submitted all records we need to handle the remaining - // pending futures so this time we block on each future +{ + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + int lineNumber = 0; + bool eof = false; + + while (!eof) + { + // loop through the example records and queue them up so long + // as we have more records and backlog is not too large + while (pendingFutures.Count < MaximumBacklog) + { + // read the next line + string? line = rdr.ReadLine(); + lineNumber++; + + // check for EOF + if (line == null) + { + eof = true; + break; + } + + // trim the line + line = line.Trim(); + + // skip any blank lines + if (line.Length == 0) continue; + + // skip any commented lines + if (line.StartsWith('#')) continue; + + // construct the Record instance + Criteria criteria = new Criteria(lineNumber, line); + + try + { + Task task = factory.StartNew(() => + { + // call the addRecord() function with no flags + return engine.SearchByAttributes( + criteria.Line, SzSearchByAttributesDefaultFlags); + }, + CancellationToken.None, + TaskCreationOptions.None, + taskScheduler); + + // add the future to the pending future list + pendingFutures.Add((task, criteria)); + + } + catch (SzBadInputException e) + { + LogFailedSearch(Error, e, lineNumber, line); + errorCount++; // increment the error count + } + } + + do + { + // handle any pending futures WITHOUT blocking to reduce the backlog + HandlePendingFutures(pendingFutures, false); + + // if we still have exceeded the backlog size then pause + // briefly before trying again + if (pendingFutures.Count >= MaximumBacklog) + { + Thread.Sleep(PauseTimeout); + } + } while (pendingFutures.Count >= MaximumBacklog); + } + + // after we have submitted all records we need to handle the remaining + // pending futures so this time we block on each future HandlePendingFutures(pendingFutures, true); } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - rdr.Close(); - fs.Close(); - - // IMPORTANT: make sure to destroy the environment - env.Destroy(); - - Console.WriteLine(); - Console.WriteLine("Searches successfully completed : " + successCount); - Console.WriteLine("Total entities found via searches : " + foundEntities.Count); - Console.WriteLine("Searches failed with errors : " + errorCount); - - // check on any retry records - if (retryWriter != null) - { - retryWriter.Flush(); - retryWriter.Close(); - } - if (retryCount > 0) - { - Console.WriteLine(retryCount + " searches to be retried in " + retryFile); - } +{ + rdr.Close(); + fs.Close(); + + // IMPORTANT: make sure to destroy the environment + env.Destroy(); + + Console.WriteLine(); + Console.WriteLine("Searches successfully completed : " + successCount); + Console.WriteLine("Total entities found via searches : " + foundEntities.Count); + Console.WriteLine("Searches failed with errors : " + errorCount); + + // check on any retry records + if (retryWriter != null) + { + retryWriter.Flush(); + retryWriter.Close(); + } + if (retryCount > 0) + { + Console.WriteLine(retryCount + " searches to be retried in " + retryFile); + } Console.Out.Flush(); } static void HandlePendingFutures(IList<(Task, Criteria)> pendingFutures, bool blocking) -{ - // loop through the pending futures - for (int index = 0; index < pendingFutures.Count; index++) - { - // get the next pending future - (Task task, Criteria criteria) = pendingFutures[index]; - - // if not blocking and this one is not done then continue - if (!blocking && !task.IsCompleted) continue; - - // remove the pending future from the list - pendingFutures.RemoveAt(index--); - - try - { - try - { - // this will block if the task is not yet completed, - // however we only get here with a pending task if - // the blocking parameter is true - string results = task.Result; - - // if we get here then increment the success count - successCount++; - - // parse the search results - JsonObject? jsonObj = JsonNode.Parse(results)?.AsObject(); - JsonArray? jsonArr = jsonObj?["RESOLVED_ENTITIES"]?.AsArray(); - if (jsonArr != null) - { - for (int index2 = 0; index2 < jsonArr.Count; index2++) - { - JsonObject? obj = jsonArr[index2]?.AsObject(); - obj = obj?["ENTITY"]?.AsObject(); - obj = obj?["RESOLVED_ENTITY"]?.AsObject(); - long? entityID = obj?["ENTITY_ID"]?.GetValue(); - if (entityID != null) - { - foundEntities.Add(entityID ?? 0L); - } - } - } - - } - catch (AggregateException e) - when (e.InnerException is TaskCanceledException - || e.InnerException is ThreadInterruptedException) - { - throw new SzRetryableException(e.InnerException); - } - catch (ThreadInterruptedException e) - { - throw new SzRetryableException(e.InnerException); - } - catch (AggregateException e) - { - if (e.InnerException != null) - { - // get the inner exception - throw e.InnerException; - } - else - { - throw; - } - } - - } - catch (SzBadInputException e) - { - LogFailedSearch(Error, e, criteria.LineNumber, criteria.Line); - errorCount++; // increment the error count - - } - catch (SzRetryableException e) - { - // handle thread interruption and cancellation as retries - LogFailedSearch(Warning, e, criteria.LineNumber, criteria.Line); - errorCount++; // increment the error count - retryCount++; // increment the retry count - - // track the retry record so it can be retried later - if (retryFile == null) - { - retryFile = new FileInfo( - Path.Combine( - Path.GetTempPath(), - RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); - - retryWriter = new StreamWriter( - new FileStream(retryFile.FullName, - FileMode.Open, - FileAccess.Write), - Encoding.UTF8); - } - if (retryWriter != null) - { - retryWriter.WriteLine(criteria.Line); - } - - } - catch (Exception e) - { - // catch any other exception (incl. SzException) here - LogFailedSearch(Critical, e, criteria.LineNumber, criteria.Line); - errorCount++; - throw; // rethrow since exception is critical - } +{ + // loop through the pending futures + for (int index = 0; index < pendingFutures.Count; index++) + { + // get the next pending future + (Task task, Criteria criteria) = pendingFutures[index]; + + // if not blocking and this one is not done then continue + if (!blocking && !task.IsCompleted) continue; + + // remove the pending future from the list + pendingFutures.RemoveAt(index--); + + try + { + try + { + // this will block if the task is not yet completed, + // however we only get here with a pending task if + // the blocking parameter is true + string results = task.Result; + + // if we get here then increment the success count + successCount++; + + // parse the search results + JsonObject? jsonObj = JsonNode.Parse(results)?.AsObject(); + JsonArray? jsonArr = jsonObj?["RESOLVED_ENTITIES"]?.AsArray(); + if (jsonArr != null) + { + for (int index2 = 0; index2 < jsonArr.Count; index2++) + { + JsonObject? obj = jsonArr[index2]?.AsObject(); + obj = obj?["ENTITY"]?.AsObject(); + obj = obj?["RESOLVED_ENTITY"]?.AsObject(); + long? entityID = obj?["ENTITY_ID"]?.GetValue(); + if (entityID != null) + { + foundEntities.Add(entityID ?? 0L); + } + } + } + + } + catch (AggregateException e) + when (e.InnerException is TaskCanceledException + || e.InnerException is ThreadInterruptedException) + { + throw new SzRetryableException(e.InnerException); + } + catch (ThreadInterruptedException e) + { + throw new SzRetryableException(e.InnerException); + } + catch (AggregateException e) + { + if (e.InnerException != null) + { + // get the inner exception + throw e.InnerException; + } + else + { + throw; + } + } + + } + catch (SzBadInputException e) + { + LogFailedSearch(Error, e, criteria.LineNumber, criteria.Line); + errorCount++; // increment the error count + + } + catch (SzRetryableException e) + { + // handle thread interruption and cancellation as retries + LogFailedSearch(Warning, e, criteria.LineNumber, criteria.Line); + errorCount++; // increment the error count + retryCount++; // increment the retry count + + // track the retry record so it can be retried later + if (retryFile == null) + { + retryFile = new FileInfo( + Path.Combine( + Path.GetTempPath(), + RetryPrefix + Path.GetRandomFileName() + RetrySuffix)); + + retryWriter = new StreamWriter( + new FileStream(retryFile.FullName, + FileMode.Open, + FileAccess.Write), + Encoding.UTF8); + } + if (retryWriter != null) + { + retryWriter.WriteLine(criteria.Line); + } + + } + catch (Exception e) + { + // catch any other exception (incl. SzException) here + LogFailedSearch(Critical, e, criteria.LineNumber, criteria.Line); + errorCount++; + throw; // rethrow since exception is critical + } } } @@ -294,44 +294,44 @@ static void LogFailedSearch(string errorType, Exception exception, int lineNumber, string criteriaJson) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine( - "** " + errorType + " ** FAILED TO SEARCH CRITERIA AT LINE " - + lineNumber + ": "); - Console.Error.WriteLine(criteriaJson); - Console.Error.WriteLine(exception); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine( + "** " + errorType + " ** FAILED TO SEARCH CRITERIA AT LINE " + + lineNumber + ": "); + Console.Error.WriteLine(criteriaJson); + Console.Error.WriteLine(exception); Console.Error.Flush(); } public partial class Program -{ - private const string DefaultFilePath = "../../resources/data/search-5K.jsonl"; - - private const string RetryPrefix = "retry-"; - - private const string RetrySuffix = ".jsonl"; - - private const int ThreadCount = 8; - - private const int BacklogFactor = 10; - - private const int MaximumBacklog = ThreadCount * BacklogFactor; - - private const int PauseTimeout = 100; - - private const string Error = "ERROR"; - - private const string Warning = "WARNING"; - - private const string Critical = "CRITICAL"; - - private static int errorCount; - private static int successCount; - private static int retryCount; - private static FileInfo? retryFile; - private static StreamWriter? retryWriter; - +{ + private const string DefaultFilePath = "../../resources/data/search-5K.jsonl"; + + private const string RetryPrefix = "retry-"; + + private const string RetrySuffix = ".jsonl"; + + private const int ThreadCount = 8; + + private const int BacklogFactor = 10; + + private const int MaximumBacklog = ThreadCount * BacklogFactor; + + private const int PauseTimeout = 100; + + private const string Error = "ERROR"; + + private const string Warning = "WARNING"; + + private const string Critical = "CRITICAL"; + + private static int errorCount; + private static int successCount; + private static int retryCount; + private static FileInfo? retryFile; + private static StreamWriter? retryWriter; + private static readonly HashSet foundEntities = new HashSet(); } diff --git a/csharp/snippets/stewardship/ForceResolve/Program.cs b/csharp/snippets/stewardship/ForceResolve/Program.cs index a97c3f9..4c65c2e 100644 --- a/csharp/snippets/stewardship/ForceResolve/Program.cs +++ b/csharp/snippets/stewardship/ForceResolve/Program.cs @@ -14,8 +14,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -31,99 +31,99 @@ .Build(); try -{ - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - IDictionary<(string, string), string> records = GetRecords(); - - // loop through the example records and add them to the repository - foreach (KeyValuePair<(string, string), string> pair in records) - { - (string dataSourceCode, string recordID) = pair.Key; - string recordDefinition = pair.Value; - - // call the addRecord() function with no flags - engine.AddRecord(dataSourceCode, recordID, recordDefinition, SzNoFlags); - - Console.WriteLine("Record " + recordID + " added"); - Console.Out.Flush(); - } - - Console.WriteLine(); - foreach ((string dataSourceCode, string recordID) in records.Keys) - { - string result = engine.GetEntity( - dataSourceCode, recordID, SzEntityBriefDefaultFlags); - - JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); - jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); - long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); - - Console.WriteLine( - "Record " + dataSourceCode + ":" + recordID - + " originally resolves to entity " + entityID); - } - Console.WriteLine(); - Console.WriteLine("Updating records with TRUSTED_ID to force resolve..."); - - string record1 = engine.GetRecord(TestDataSource, "1", SzRecordDefaultFlags); - string record3 = engine.GetRecord(TestDataSource, "3", SzRecordDefaultFlags); - +{ + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + IDictionary<(string, string), string> records = GetRecords(); + + // loop through the example records and add them to the repository + foreach (KeyValuePair<(string, string), string> pair in records) + { + (string dataSourceCode, string recordID) = pair.Key; + string recordDefinition = pair.Value; + + // call the addRecord() function with no flags + engine.AddRecord(dataSourceCode, recordID, recordDefinition, SzNoFlags); + + Console.WriteLine("Record " + recordID + " added"); + Console.Out.Flush(); + } + + Console.WriteLine(); + foreach ((string dataSourceCode, string recordID) in records.Keys) + { + string result = engine.GetEntity( + dataSourceCode, recordID, SzEntityBriefDefaultFlags); + + JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); + jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); + long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); + + Console.WriteLine( + "Record " + dataSourceCode + ":" + recordID + + " originally resolves to entity " + entityID); + } + Console.WriteLine(); + Console.WriteLine("Updating records with TRUSTED_ID to force resolve..."); + + string record1 = engine.GetRecord(TestDataSource, "1", SzRecordDefaultFlags); + string record3 = engine.GetRecord(TestDataSource, "3", SzRecordDefaultFlags); + JsonObject?[] jsonObjects = { JsonNode.Parse(record1)?.AsObject()?["JSON_DATA"]?.AsObject(), JsonNode.Parse(record3)?.AsObject()?["JSON_DATA"]?.AsObject() - }; - foreach (JsonObject? obj in jsonObjects) - { - if (obj == null) - { - throw new JsonException("Parsed record is unexpectedly null: " - + record1 + " / " + record3); - } - obj["TRUSTED_ID_NUMBER"] = JsonNode.Parse("\"TEST_R1-TEST_R3\""); - obj["TRUSTED_ID_TYPE"] = JsonNode.Parse("\"FORCE_RESOLVE\""); - } - engine.AddRecord(TestDataSource, "1", jsonObjects[0]?.ToJsonString()); - engine.AddRecord(TestDataSource, "3", jsonObjects[1]?.ToJsonString()); - - Console.WriteLine(); - - foreach ((string dataSourceCode, string recordID) in records.Keys) - { - string result = engine.GetEntity( - dataSourceCode, recordID, SzEntityBriefDefaultFlags); - - JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); - jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); - long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); - - Console.WriteLine( - "Record " + dataSourceCode + ":" + recordID - + " now resolves to entity " + entityID); - } + }; + foreach (JsonObject? obj in jsonObjects) + { + if (obj == null) + { + throw new JsonException("Parsed record is unexpectedly null: " + + record1 + " / " + record3); + } + obj["TRUSTED_ID_NUMBER"] = JsonNode.Parse("\"TEST_R1-TEST_R3\""); + obj["TRUSTED_ID_TYPE"] = JsonNode.Parse("\"FORCE_RESOLVE\""); + } + engine.AddRecord(TestDataSource, "1", jsonObjects[0]?.ToJsonString()); + engine.AddRecord(TestDataSource, "3", jsonObjects[1]?.ToJsonString()); + + Console.WriteLine(); + + foreach ((string dataSourceCode, string recordID) in records.Keys) + { + string result = engine.GetEntity( + dataSourceCode, recordID, SzEntityBriefDefaultFlags); + + JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); + jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); + long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); + + Console.WriteLine( + "Record " + dataSourceCode + ":" + recordID + + " now resolves to entity " + entityID); + } Console.WriteLine(); } catch (SzException e) -{ - // handle any exception that may have occurred - Console.Error.WriteLine("Senzing Error Message : " + e.Message); - Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); - Console.Error.WriteLine(e); +{ + // handle any exception that may have occurred + Console.Error.WriteLine("Senzing Error Message : " + e.Message); + Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); + Console.Error.WriteLine(e); throw; } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - // IMPORTANT: make sure to destroy the environment +{ + // IMPORTANT: make sure to destroy the environment env.Destroy(); } @@ -136,12 +136,12 @@ /// to string JSON text values describing the records to be added. /// static IDictionary<(string, string), string> GetRecords() -{ - SortedDictionary<(string, string), string> records - = new SortedDictionary<(string, string), string>(); - - records.Add( - ("TEST", "1"), +{ + SortedDictionary<(string, string), string> records + = new SortedDictionary<(string, string), string>(); + + records.Add( + ("TEST", "1"), """ { "DATA_SOURCE": "TEST", @@ -152,10 +152,10 @@ "PHONE_NUMBER": "787-767-2688", "DATE_OF_BIRTH": "1/12/1990" } - """); - - records.Add( - ("TEST", "2"), + """); + + records.Add( + ("TEST", "2"), """ { "DATA_SOURCE": "TEST", @@ -165,10 +165,10 @@ "PHONE_NUMBER": "787-767-2688", "DATE_OF_BIRTH": "5/4/1994" } - """); - - records.Add( - ("TEST", "3"), + """); + + records.Add( + ("TEST", "3"), """ { "DATA_SOURCE": "TEST", @@ -177,13 +177,13 @@ "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", "PHONE_NUMBER": "787-767-2688" } - """); - + """); + return records; } public partial class Program -{ +{ private const string TestDataSource = "Test"; } diff --git a/csharp/snippets/stewardship/ForceUnresolve/Program.cs b/csharp/snippets/stewardship/ForceUnresolve/Program.cs index 25aac09..0729440 100644 --- a/csharp/snippets/stewardship/ForceUnresolve/Program.cs +++ b/csharp/snippets/stewardship/ForceUnresolve/Program.cs @@ -14,8 +14,8 @@ // get the senzing repository settings string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON"); if (settings == null) -{ - Console.Error.WriteLine("Unable to get settings."); +{ + Console.Error.WriteLine("Unable to get settings."); throw new ArgumentException("Unable to get settings"); } @@ -31,103 +31,103 @@ .Build(); try -{ - // get the engine from the environment - SzEngine engine = env.GetEngine(); - - IDictionary<(string, string), string> records = GetRecords(); - - // loop through the example records and add them to the repository - foreach (KeyValuePair<(string, string), string> pair in records) - { - (string dataSourceCode, string recordID) = pair.Key; - string recordDefinition = pair.Value; - - // call the addRecord() function with no flags - engine.AddRecord(dataSourceCode, recordID, recordDefinition, SzNoFlags); - - Console.WriteLine("Record " + recordID + " added"); - Console.Out.Flush(); - } - - Console.WriteLine(); - foreach ((string dataSourceCode, string recordID) in records.Keys) - { - string result = engine.GetEntity( - dataSourceCode, recordID, SzEntityBriefDefaultFlags); - - JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); - jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); - long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); - - Console.WriteLine( - "Record " + dataSourceCode + ":" + recordID - + " originally resolves to entity " + entityID); - } - Console.WriteLine(); - Console.WriteLine("Updating records with TRUSTED_ID to force resolve..."); - - string record4 = engine.GetRecord(TestDataSource, "4", SzRecordDefaultFlags); - string record6 = engine.GetRecord(TestDataSource, "6", SzRecordDefaultFlags); - - JsonObject? obj4 = JsonNode.Parse(record4)?.AsObject(); - JsonObject? obj6 = JsonNode.Parse(record6)?.AsObject(); - - obj4 = obj4?["JSON_DATA"]?.AsObject(); - obj6 = obj6?["JSON_DATA"]?.AsObject(); - - if (obj4 == null || obj6 == null) - { - throw new JsonException("The JSON_DATA parses as null: " - + record4 + " / " + record6); - } - - obj4["TRUSTED_ID_NUMBER"] = JsonNode.Parse("\"TEST_R4-TEST_R6\""); - obj4["TRUSTED_ID_TYPE"] = JsonNode.Parse("\"FORCE_UNRESOLVE\""); - - obj6["TRUSTED_ID_NUMBER"] = JsonNode.Parse("\"TEST_R6-TEST_R4\""); - obj6["TRUSTED_ID_TYPE"] = JsonNode.Parse("\"FORCE_UNRESOLVE\""); - - engine.AddRecord(TestDataSource, "4", obj4.ToJsonString()); - engine.AddRecord(TestDataSource, "6", obj6.ToJsonString()); - - Console.WriteLine(); - - foreach ((string dataSourceCode, string recordID) in records.Keys) - { - string result = engine.GetEntity( - dataSourceCode, recordID, SzEntityBriefDefaultFlags); - - JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); - jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); - long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); - - Console.WriteLine( - "Record " + dataSourceCode + ":" + recordID - + " now resolves to entity " + entityID); - } +{ + // get the engine from the environment + SzEngine engine = env.GetEngine(); + + IDictionary<(string, string), string> records = GetRecords(); + + // loop through the example records and add them to the repository + foreach (KeyValuePair<(string, string), string> pair in records) + { + (string dataSourceCode, string recordID) = pair.Key; + string recordDefinition = pair.Value; + + // call the addRecord() function with no flags + engine.AddRecord(dataSourceCode, recordID, recordDefinition, SzNoFlags); + + Console.WriteLine("Record " + recordID + " added"); + Console.Out.Flush(); + } + + Console.WriteLine(); + foreach ((string dataSourceCode, string recordID) in records.Keys) + { + string result = engine.GetEntity( + dataSourceCode, recordID, SzEntityBriefDefaultFlags); + + JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); + jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); + long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); + + Console.WriteLine( + "Record " + dataSourceCode + ":" + recordID + + " originally resolves to entity " + entityID); + } + Console.WriteLine(); + Console.WriteLine("Updating records with TRUSTED_ID to force resolve..."); + + string record4 = engine.GetRecord(TestDataSource, "4", SzRecordDefaultFlags); + string record6 = engine.GetRecord(TestDataSource, "6", SzRecordDefaultFlags); + + JsonObject? obj4 = JsonNode.Parse(record4)?.AsObject(); + JsonObject? obj6 = JsonNode.Parse(record6)?.AsObject(); + + obj4 = obj4?["JSON_DATA"]?.AsObject(); + obj6 = obj6?["JSON_DATA"]?.AsObject(); + + if (obj4 == null || obj6 == null) + { + throw new JsonException("The JSON_DATA parses as null: " + + record4 + " / " + record6); + } + + obj4["TRUSTED_ID_NUMBER"] = JsonNode.Parse("\"TEST_R4-TEST_R6\""); + obj4["TRUSTED_ID_TYPE"] = JsonNode.Parse("\"FORCE_UNRESOLVE\""); + + obj6["TRUSTED_ID_NUMBER"] = JsonNode.Parse("\"TEST_R6-TEST_R4\""); + obj6["TRUSTED_ID_TYPE"] = JsonNode.Parse("\"FORCE_UNRESOLVE\""); + + engine.AddRecord(TestDataSource, "4", obj4.ToJsonString()); + engine.AddRecord(TestDataSource, "6", obj6.ToJsonString()); + + Console.WriteLine(); + + foreach ((string dataSourceCode, string recordID) in records.Keys) + { + string result = engine.GetEntity( + dataSourceCode, recordID, SzEntityBriefDefaultFlags); + + JsonObject? jsonObj = JsonNode.Parse(result)?.AsObject(); + jsonObj = jsonObj?["RESOLVED_ENTITY"]?.AsObject(); + long? entityID = jsonObj?["ENTITY_ID"]?.GetValue(); + + Console.WriteLine( + "Record " + dataSourceCode + ":" + recordID + + " now resolves to entity " + entityID); + } Console.WriteLine(); } catch (SzException e) -{ - // handle any exception that may have occurred - Console.Error.WriteLine("Senzing Error Message : " + e.Message); - Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); - Console.Error.WriteLine(e); +{ + // handle any exception that may have occurred + Console.Error.WriteLine("Senzing Error Message : " + e.Message); + Console.Error.WriteLine("Senzing Error Code : " + e.ErrorCode); + Console.Error.WriteLine(e); throw; } catch (Exception e) -{ - Console.Error.WriteLine(); - Console.Error.WriteLine("*** Terminated due to critical error ***"); - Console.Error.WriteLine(e); - Console.Error.Flush(); +{ + Console.Error.WriteLine(); + Console.Error.WriteLine("*** Terminated due to critical error ***"); + Console.Error.WriteLine(e); + Console.Error.Flush(); throw; } finally -{ - // IMPORTANT: make sure to destroy the environment +{ + // IMPORTANT: make sure to destroy the environment env.Destroy(); } @@ -140,12 +140,12 @@ /// to string JSON text values describing the records to be added. /// static IDictionary<(string, string), string> GetRecords() -{ - SortedDictionary<(string, string), string> records - = new SortedDictionary<(string, string), string>(); - - records.Add( - ("TEST", "4"), +{ + SortedDictionary<(string, string), string> records + = new SortedDictionary<(string, string), string>(); + + records.Add( + ("TEST", "4"), """ { "DATA_SOURCE": "TEST", @@ -155,10 +155,10 @@ "SSN_NUMBER": "767-87-7678", "DATE_OF_BIRTH": "1/12/1990" } - """); - - records.Add( - ("TEST", "5"), + """); + + records.Add( + ("TEST", "5"), """ { "DATA_SOURCE": "TEST", @@ -168,10 +168,10 @@ "SSN_NUMBER": "767-87-7678", "DATE_OF_BIRTH": "1/12/1990" } - """); - - records.Add( - ("TEST", "6"), + """); + + records.Add( + ("TEST", "6"), """ { "DATA_SOURCE": "TEST", @@ -180,13 +180,13 @@ "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", "PHONE_NUMBER": "202-787-7678" } - """); - + """); + return records; } public partial class Program -{ +{ private const string TestDataSource = "Test"; } diff --git a/java/snippets/redo/LoadWithRedoViaLoop.java b/java/snippets/redo/LoadWithRedoViaLoop.java index bf853b4..0e8ad45 100644 --- a/java/snippets/redo/LoadWithRedoViaLoop.java +++ b/java/snippets/redo/LoadWithRedoViaLoop.java @@ -98,10 +98,10 @@ public static void main(String[] args) { } // now that we have loaded the records, check for redos and handle them - while (engine.countRedoRecords() > 0) { - // get the next redo record - String redo = engine.getRedoRecord(); - + for (String redo = engine.getRedoRecord(); + redo != null; + redo = engine.getRedoRecord()) + { try { // process the redo record engine.processRedoRecord(redo, SZ_NO_FLAGS); diff --git a/java/snippets/redo/RedoContinuousViaFutures.java b/java/snippets/redo/RedoContinuousViaFutures.java index eae1d5d..e897d1a 100644 --- a/java/snippets/redo/RedoContinuousViaFutures.java +++ b/java/snippets/redo/RedoContinuousViaFutures.java @@ -103,6 +103,9 @@ public static void main(String[] args) { } while (pendingFutures.size() >= MAXIMUM_BACKLOG); // check if there are no redo records right now + // NOTE: we do NOT want to call countRedoRecords() in a loop that + // is processing redo records, we call it here AFTER we believe + // have processed all pending redos to confirm still zero if (engine.countRedoRecords() == 0) { outputRedoStatistics(); System.out.println();